From 4d8310ae0ffe1f78a2b8623d55f63ae504ff1aa8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 4 Nov 2017 08:36:03 +0100 Subject: Make DPD Feedback server SDRDevice-agnostic --- src/output/Feedback.cpp | 356 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 src/output/Feedback.cpp (limited to 'src/output/Feedback.cpp') diff --git a/src/output/Feedback.cpp b/src/output/Feedback.cpp new file mode 100644 index 0000000..f0bbd98 --- /dev/null +++ b/src/output/Feedback.cpp @@ -0,0 +1,356 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + +DESCRIPTION: + This presents a TCP socket to an external tool which calculates + a Digital Predistortion model from a short sequence of transmit + samples and corresponding receive samples. +*/ + +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include "output/Feedback.h" +#include "Utils.h" +#include "Socket.h" + +using namespace std; + +namespace Output { + +DPDFeedbackServer::DPDFeedbackServer( + std::shared_ptr device, + uint16_t port, + uint32_t sampleRate) : + m_port(port), + m_sampleRate(sampleRate), + m_device(device) +{ + if (m_port) { + m_running.store(true); + + rx_burst_thread = boost::thread( + &DPDFeedbackServer::ReceiveBurstThread, this); + + burst_tcp_thread = boost::thread( + &DPDFeedbackServer::ServeFeedbackThread, this); + } +} + +DPDFeedbackServer::~DPDFeedbackServer() +{ + m_running.store(false); + + rx_burst_thread.interrupt(); + if (rx_burst_thread.joinable()) { + rx_burst_thread.join(); + } + + burst_tcp_thread.interrupt(); + if (burst_tcp_thread.joinable()) { + burst_tcp_thread.join(); + } +} + +void DPDFeedbackServer::set_tx_frame( + const std::vector &buf, + const struct frame_timestamp &buf_ts) +{ + if (not m_running) { + throw runtime_error("DPDFeedbackServer not running"); + } + + boost::mutex::scoped_lock lock(burstRequest.mutex); + + if (buf.size() % sizeof(complexf) != 0) { + throw std::logic_error("Buffer for tx frame has incorrect size"); + } + + if (burstRequest.state == BurstRequestState::SaveTransmitFrame) { + const size_t n = std::min( + burstRequest.num_samples * sizeof(complexf), buf.size()); + + burstRequest.num_samples = n / sizeof(complexf); + + burstRequest.tx_samples.clear(); + burstRequest.tx_samples.resize(n); + // A frame will always begin with the NULL symbol, which contains + // no power. Instead of taking n samples at the beginning of the + // frame, we take them at the end and adapt the timestamp accordingly. + + const size_t start_ix = buf.size() - n; + copy(buf.begin() + start_ix, buf.end(), burstRequest.tx_samples.begin()); + + frame_timestamp ts = buf_ts; + ts += (1.0 * start_ix) / (sizeof(complexf) * m_sampleRate); + + burstRequest.tx_second = ts.timestamp_sec; + burstRequest.tx_pps = ts.timestamp_pps; + + // Prepare the next state + burstRequest.rx_second = ts.timestamp_sec; + burstRequest.rx_pps = ts.timestamp_pps; + burstRequest.state = BurstRequestState::SaveReceiveFrame; + + lock.unlock(); + burstRequest.mutex_notification.notify_one(); + } + else { + lock.unlock(); + } +} + +void DPDFeedbackServer::ReceiveBurstThread() +{ + try { + set_thread_name("dpdreceiveburst"); + + while (m_running) { + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { + if (not m_running) break; + burstRequest.mutex_notification.wait(lock); + } + + if (not m_running) break; + + const size_t num_samps = burstRequest.num_samples; + + frame_timestamp ts; + ts.timestamp_sec = burstRequest.rx_second; + ts.timestamp_pps = burstRequest.rx_pps; + ts.timestamp_valid = true; + + // We need to free the mutex while we recv(), because otherwise we block the + // TX thread + lock.unlock(); + + const double device_time = m_device->get_real_secs(); + const double cmd_time = ts.get_real_secs(); + + std::vector buf(num_samps * sizeof(complexf)); + + const double timeout = 60; + size_t samples_read = m_device->receive_frame( + reinterpret_cast(buf.data()), + num_samps, ts, timeout); + + lock.lock(); + burstRequest.rx_samples = std::move(buf); + burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); + + // The recv might have happened at another time than requested + burstRequest.rx_second = ts.timestamp_sec; + burstRequest.rx_pps = ts.timestamp_pps; + + etiLog.level(debug) << "DPD: acquired " << samples_read << + " RX feedback samples " << + "at time " << burstRequest.tx_second << " + " << + std::fixed << burstRequest.tx_pps / 16384000.0 << + " Delta=" << cmd_time - device_time; + + burstRequest.state = BurstRequestState::Acquired; + + lock.unlock(); + burstRequest.mutex_notification.notify_one(); + } + } + catch (const runtime_error &e) { + etiLog.level(error) << "DPD Feedback RX runtime error: " << e.what(); + } + catch (const std::exception &e) { + etiLog.level(error) << "DPD Feedback RX exception: " << e.what(); + } + catch (...) { + etiLog.level(error) << "DPD Feedback RX unknown exception!"; + } + + m_running.store(false); +} + +void DPDFeedbackServer::ServeFeedback() +{ + TCPSocket m_server_sock; + m_server_sock.listen(m_port); + + etiLog.level(info) << "DPD Feedback server listening on port " << m_port; + + while (m_running) { + struct sockaddr_in client; + TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client); + + if (not client_sock.valid()) { + // No connection request received + continue; + } + + uint8_t request_version = 0; + ssize_t read = client_sock.recv(&request_version, 1, 0); + if (!read) break; // done reading + if (read < 0) { + etiLog.level(info) << + "DPD Feedback Server Client read request version failed: " << strerror(errno); + break; + } + + if (request_version != 1) { + etiLog.level(info) << "DPD Feedback Server wrong request version"; + break; + } + + uint32_t num_samples = 0; + read = client_sock.recv(&num_samples, 4, 0); + if (!read) break; // done reading + if (read < 0) { + etiLog.level(info) << + "DPD Feedback Server Client read num samples failed"; + break; + } + + // We are ready to issue the request now + { + boost::mutex::scoped_lock lock(burstRequest.mutex); + burstRequest.num_samples = num_samples; + burstRequest.state = BurstRequestState::SaveTransmitFrame; + + lock.unlock(); + } + + // Wait for the result to be ready + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::Acquired) { + if (not m_running) break; + burstRequest.mutex_notification.wait(lock); + } + + burstRequest.state = BurstRequestState::None; + lock.unlock(); + + burstRequest.num_samples = std::min(burstRequest.num_samples, + std::min( + burstRequest.tx_samples.size() / sizeof(complexf), + burstRequest.rx_samples.size() / sizeof(complexf))); + + uint32_t num_samples_32 = burstRequest.num_samples; + if (client_sock.sendall(&num_samples_32, sizeof(num_samples_32)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send num_samples failed"; + break; + } + + if (client_sock.sendall( + &burstRequest.tx_second, + sizeof(burstRequest.tx_second)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_second failed"; + break; + } + + if (client_sock.sendall( + &burstRequest.tx_pps, + sizeof(burstRequest.tx_pps)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_pps failed"; + break; + } + + const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); + + if (burstRequest.tx_samples.size() < frame_bytes) { + throw logic_error("DPD Feedback burstRequest invalid: not enough TX samples"); + } + + if (client_sock.sendall( + &burstRequest.tx_samples[0], + frame_bytes) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_frame failed"; + break; + } + + if (client_sock.sendall( + &burstRequest.rx_second, + sizeof(burstRequest.rx_second)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_second failed"; + break; + } + + if (client_sock.sendall( + &burstRequest.rx_pps, + sizeof(burstRequest.rx_pps)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_pps failed"; + break; + } + + if (burstRequest.rx_samples.size() < frame_bytes) { + throw logic_error("DPD Feedback burstRequest invalid: not enough RX samples"); + } + + if (client_sock.sendall( + &burstRequest.rx_samples[0], + frame_bytes) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_frame failed"; + break; + } + } +} + +void DPDFeedbackServer::ServeFeedbackThread() +{ + set_thread_name("dpdfeedbackserver"); + + while (m_running) { + try { + ServeFeedback(); + } + catch (const runtime_error &e) { + etiLog.level(error) << "DPD Feedback Server runtime error: " << e.what(); + } + catch (const std::exception &e) { + etiLog.level(error) << "DPD Feedback Server exception: " << e.what(); + } + catch (...) { + etiLog.level(error) << "DPD Feedback Server unknown exception!"; + } + + boost::this_thread::sleep(boost::posix_time::seconds(5)); + } + + m_running.store(false); +} + +} // namespace Output -- cgit v1.2.3