From 2b877e304d52c406720050aa55eed97b6f7869be Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 May 2017 14:22:21 +0200 Subject: Add WIP for OutputUHDFeedback --- src/OutputUHDFeedback.h | 114 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 src/OutputUHDFeedback.h (limited to 'src/OutputUHDFeedback.h') diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h new file mode 100644 index 0000000..31f7547 --- /dev/null +++ b/src/OutputUHDFeedback.h @@ -0,0 +1,114 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + +DESCRIPTION: + This presents a TCP socket to an external tool which calculates + a Digital Predistortion model from a short sequence of transmit + samples and corresponding receive samples. +*/ + +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include +#endif + +#ifdef HAVE_OUTPUT_UHD + +#include +#include +#include +#include +#include +#include + +#include "Log.h" +#include "TimestampDecoder.h" + +enum class BurstRequestState { + None, // To pending request + SaveTransmitFrame, // The TX thread has to save an outgoing frame + SaveReceiveFrame, // The RX thread has to save an incoming frame + Acquired, // Both TX and RX frames are ready +}; + +struct UHDReceiveBurstRequest { + // All fields in this struct are protected + mutable boost::mutex mutex; + boost::condition_variable mutex_notification; + + BurstRequestState state; + + // In the SaveTransmit states, frame_length samples are saved into + // the vectors + size_t frame_length; + + // The timestamp of the first sample of the TX buffers + uint32_t tx_second; + uint32_t tx_pps; // in units of 1/16384000s + + std::vector tx_samples; + + // The timestamp of the first sample of the RX buffers + uint32_t rx_second; + uint32_t rx_pps; + + std::vector rx_samples; +}; + + +class OutputUHDFeedback { + public: + OutputUHDFeedback(); + OutputUHDFeedback(const OutputUHDFeedback& other) = delete; + OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete; + ~OutputUHDFeedback(); + + void setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port); + + void set_tx_frame(const std::vector &buf, + const struct frame_timestamp& ts); + + + private: + // Thread that reacts to burstRequests and receives from the USRP + void ReceiveBurstThread(void); + + // Thread that listens for requests over TCP to get TX and RX feedback + void ServeFeedbackThread(void); + + boost::thread rx_burst_thread; + boost::thread burst_tcp_thread; + + UHDReceiveBurstRequest burstRequest; + + bool running = false; + uint16_t m_port = 0; + uhd::usrp::multi_usrp::sptr myUsrp; +}; + + +#endif // HAVE_OUTPUT_UHD -- cgit v1.2.3 From 4fad4de6ec39b2741f8545ed78aa58ea0a6edc6c Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 12 May 2017 08:28:47 +0200 Subject: UHD Feedback: Do not send the beginning of the frame --- src/OutputUHD.cpp | 2 +- src/OutputUHDFeedback.cpp | 48 ++++++++++++++++++++++++++++++++--------------- src/OutputUHDFeedback.h | 10 +++++----- 3 files changed, 39 insertions(+), 21 deletions(-) (limited to 'src/OutputUHDFeedback.h') diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 6dc8878..6edf7df 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -284,7 +284,7 @@ OutputUHD::OutputUHD( SetDelayBuffer(myConf.dabMode); - uhdFeedback.setup(myUsrp, myConf.dpdFeedbackServerPort); + uhdFeedback.setup(myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); MDEBUG("OutputUHD:UHD ready.\n"); } diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index dfe0f74..73497a1 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -30,7 +30,14 @@ DESCRIPTION: along with ODR-DabMod. If not, see . */ +#ifdef HAVE_CONFIG_H +# include +#endif + +#ifdef HAVE_OUTPUT_UHD + #include +#include #include #include #include "OutputUHDFeedback.h" @@ -41,17 +48,18 @@ typedef std::complex complexf; OutputUHDFeedback::OutputUHDFeedback() { - running = false; + m_running = false; } -void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port) +void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate) { - myUsrp = usrp; + m_usrp = usrp; + m_sampleRate = sampleRate; burstRequest.state = BurstRequestState::None; if (port) { m_port = port; - running = true; + m_running = true; rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this); @@ -60,14 +68,14 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port) OutputUHDFeedback::~OutputUHDFeedback() { - running = false; + m_running = false; rx_burst_thread.join(); burst_tcp_thread.join(); } void OutputUHDFeedback::set_tx_frame( const std::vector &buf, - const struct frame_timestamp& ts) + const struct frame_timestamp &buf_ts) { boost::mutex::scoped_lock lock(burstRequest.mutex); @@ -77,7 +85,15 @@ void OutputUHDFeedback::set_tx_frame( burstRequest.tx_samples.clear(); burstRequest.tx_samples.resize(n); - copy(buf.begin(), buf.begin() + n, burstRequest.tx_samples.begin()); + // A frame will always begin with the NULL symbol, which contains + // no power. Instead of taking n samples at the beginning of the + // frame, we take them at the end and adapt the timestamp accordingly. + + const size_t start_ix = buf.size() - n; + copy(buf.begin() + start_ix, buf.end(), burstRequest.tx_samples.begin()); + + frame_timestamp ts = buf_ts; + ts += (1.0 * start_ix) / (sizeof(complexf) * m_sampleRate); burstRequest.tx_second = ts.timestamp_sec; burstRequest.tx_pps = ts.timestamp_pps; @@ -100,16 +116,16 @@ void OutputUHDFeedback::ReceiveBurstThread() set_thread_name("uhdreceiveburst"); uhd::stream_args_t stream_args("fc32"); //complex floats - auto rxStream = myUsrp->get_rx_stream(stream_args); + auto rxStream = m_usrp->get_rx_stream(stream_args); - while (running) { + while (m_running) { boost::mutex::scoped_lock lock(burstRequest.mutex); while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { - if (not running) break; + if (not m_running) break; burstRequest.mutex_notification.wait(lock); } - if (not running) break; + if (not m_running) break; uhd::stream_cmd_t cmd( uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); @@ -158,7 +174,7 @@ void OutputUHDFeedback::ServeFeedbackThread() throw std::runtime_error("Can't listen TCP socket"); } - while (running) { + while (m_running) { struct sockaddr_in client; socklen_t client_len = sizeof(client); int client_sock = accept(server_sock, @@ -168,7 +184,7 @@ void OutputUHDFeedback::ServeFeedbackThread() throw runtime_error("Could not establish new connection"); } - while (running) { + while (m_running) { uint8_t request_version = 0; int read = recv(client_sock, &request_version, 1, 0); if (!read) break; // done reading @@ -202,7 +218,7 @@ void OutputUHDFeedback::ServeFeedbackThread() // Wait for the result to be ready boost::mutex::scoped_lock lock(burstRequest.mutex); while (burstRequest.state != BurstRequestState::Acquired) { - if (not running) break; + if (not m_running) break; burstRequest.mutex_notification.wait(lock); } @@ -237,9 +253,11 @@ void OutputUHDFeedback::ServeFeedbackThread() etiLog.level(error) << "DPD Feedback Server fault: " << e.what(); } - running = false; + m_running = false; if (server_sock != -1) { close(server_sock); } } + +#endif diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index 31f7547..afc06b0 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -79,7 +79,7 @@ struct UHDReceiveBurstRequest { std::vector rx_samples; }; - +// Serve TX samples and RX feedback samples over a TCP connection class OutputUHDFeedback { public: OutputUHDFeedback(); @@ -87,12 +87,11 @@ class OutputUHDFeedback { OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete; ~OutputUHDFeedback(); - void setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port); + void setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate); void set_tx_frame(const std::vector &buf, const struct frame_timestamp& ts); - private: // Thread that reacts to burstRequests and receives from the USRP void ReceiveBurstThread(void); @@ -105,9 +104,10 @@ class OutputUHDFeedback { UHDReceiveBurstRequest burstRequest; - bool running = false; + bool m_running = false; uint16_t m_port = 0; - uhd::usrp::multi_usrp::sptr myUsrp; + uint32_t m_sampleRate = 0; + uhd::usrp::multi_usrp::sptr m_usrp; }; -- cgit v1.2.3 From 5ac10e37d07cfe723ea4b396f08563889dff5a2b Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 12 May 2017 10:20:23 +0200 Subject: Properly terminate dpd server on ctrl-c --- src/OutputUHDFeedback.cpp | 120 ++++++++++++++++++++++++++++++++++++++-------- src/OutputUHDFeedback.h | 12 +++-- 2 files changed, 107 insertions(+), 25 deletions(-) (limited to 'src/OutputUHDFeedback.h') diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index 73497a1..8584839 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -40,6 +40,7 @@ DESCRIPTION: #include #include #include +#include #include "OutputUHDFeedback.h" #include "Utils.h" @@ -48,7 +49,7 @@ typedef std::complex complexf; OutputUHDFeedback::OutputUHDFeedback() { - m_running = false; + m_running.store(false); } void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate) @@ -59,7 +60,7 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, u if (port) { m_port = port; - m_running = true; + m_running.store(true); rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this); @@ -68,7 +69,11 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, u OutputUHDFeedback::~OutputUHDFeedback() { - m_running = false; + m_running.store(false); + + rx_burst_thread.interrupt(); + burst_tcp_thread.interrupt(); + rx_burst_thread.join(); burst_tcp_thread.join(); } @@ -79,9 +84,13 @@ void OutputUHDFeedback::set_tx_frame( { boost::mutex::scoped_lock lock(burstRequest.mutex); + assert(buf.size() % sizeof(complexf) == 0); + if (burstRequest.state == BurstRequestState::SaveTransmitFrame) { const size_t n = std::min( - burstRequest.frame_length * sizeof(complexf), buf.size()); + burstRequest.num_samples * sizeof(complexf), buf.size()); + + burstRequest.num_samples = n / sizeof(complexf); burstRequest.tx_samples.clear(); burstRequest.tx_samples.resize(n); @@ -129,7 +138,7 @@ void OutputUHDFeedback::ReceiveBurstThread() uhd::stream_cmd_t cmd( uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); - cmd.num_samps = burstRequest.frame_length; + cmd.num_samps = burstRequest.num_samples; cmd.stream_now = false; double pps = burstRequest.rx_pps / 16384000.0; @@ -138,9 +147,10 @@ void OutputUHDFeedback::ReceiveBurstThread() rxStream->issue_stream_cmd(cmd); uhd::rx_metadata_t md; - burstRequest.rx_samples.resize(burstRequest.frame_length * sizeof(complexf)); - rxStream->recv(&burstRequest.rx_samples[0], burstRequest.frame_length, md); + burstRequest.rx_samples.resize(burstRequest.num_samples * sizeof(complexf)); + rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md); + // The recv might have happened at another time than requested burstRequest.rx_second = md.time_spec.get_full_secs(); burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; @@ -151,13 +161,32 @@ void OutputUHDFeedback::ReceiveBurstThread() } } +static int accept_with_timeout(int server_socket, int timeout_ms, struct sockaddr_in *client) +{ + struct pollfd fds[1]; + fds[0].fd = server_socket; + fds[0].events = POLLIN | POLLOUT; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + throw std::runtime_error("TCP Socket accept error: " + to_string(errno)); + } + else if (retval) { + socklen_t client_len = sizeof(struct sockaddr_in); + return accept(server_socket, (struct sockaddr*)&client, &client_len); + } + else { + return -2; + } +} + void OutputUHDFeedback::ServeFeedbackThread() { set_thread_name("uhdservefeedback"); - int server_sock = -1; try { - if ((server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { throw std::runtime_error("Can't create TCP socket"); } @@ -166,31 +195,32 @@ void OutputUHDFeedback::ServeFeedbackThread() addr.sin_port = htons(m_port); addr.sin_addr.s_addr = htonl(INADDR_ANY); - if (bind(server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { throw std::runtime_error("Can't bind TCP socket"); } - if (listen(server_sock, 1) < 0) { + if (listen(m_server_sock, 1) < 0) { throw std::runtime_error("Can't listen TCP socket"); } while (m_running) { struct sockaddr_in client; - socklen_t client_len = sizeof(client); - int client_sock = accept(server_sock, - (struct sockaddr*)&client, &client_len); + int client_sock = accept_with_timeout(m_server_sock, 1000, &client); - if (client_sock < 0) { + if (client_sock == -1) { throw runtime_error("Could not establish new connection"); } + else if (client_sock == -2) { + continue; + } while (m_running) { uint8_t request_version = 0; - int read = recv(client_sock, &request_version, 1, 0); + ssize_t read = recv(client_sock, &request_version, 1, 0); if (!read) break; // done reading if (read < 0) { etiLog.level(info) << - "DPD Feedback Server Client read request verson failed"; + "DPD Feedback Server Client read request version failed"; } if (request_version != 1) { @@ -209,7 +239,7 @@ void OutputUHDFeedback::ServeFeedbackThread() // We are ready to issue the request now { boost::mutex::scoped_lock lock(burstRequest.mutex); - burstRequest.frame_length = num_samples; + burstRequest.num_samples = num_samples; burstRequest.state = BurstRequestState::SaveTransmitFrame; lock.unlock(); @@ -225,6 +255,15 @@ void OutputUHDFeedback::ServeFeedbackThread() burstRequest.state = BurstRequestState::None; lock.unlock(); + if (send(client_sock, + &burstRequest.num_samples, + sizeof(burstRequest.num_samples), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send num_samples failed"; + break; + } + if (send(client_sock, &burstRequest.tx_second, sizeof(burstRequest.tx_second), @@ -243,7 +282,45 @@ void OutputUHDFeedback::ServeFeedbackThread() break; } -#warning "Send buf" + const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); + + assert(burstRequest.tx_samples.size() == frame_bytes); + if (send(client_sock, + &burstRequest.tx_samples[0], + frame_bytes, + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_frame failed"; + break; + } + + if (send(client_sock, + &burstRequest.rx_second, + sizeof(burstRequest.rx_second), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_second failed"; + break; + } + + if (send(client_sock, + &burstRequest.rx_pps, + sizeof(burstRequest.rx_pps), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_pps failed"; + break; + } + + assert(burstRequest.rx_samples.size() == frame_bytes); + if (send(client_sock, + &burstRequest.rx_samples[0], + frame_bytes, + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_frame failed"; + break; + } } close(client_sock); @@ -255,8 +332,9 @@ void OutputUHDFeedback::ServeFeedbackThread() m_running = false; - if (server_sock != -1) { - close(server_sock); + if (m_server_sock != -1) { + close(m_server_sock); + m_server_sock = -1; } } diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index afc06b0..32668b6 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -44,6 +44,7 @@ DESCRIPTION: #include #include #include +#include #include "Log.h" #include "TimestampDecoder.h" @@ -62,21 +63,23 @@ struct UHDReceiveBurstRequest { BurstRequestState state; - // In the SaveTransmit states, frame_length samples are saved into + // In the SaveTransmit states, num_samples complexf samples are saved into // the vectors - size_t frame_length; + size_t num_samples; // The timestamp of the first sample of the TX buffers uint32_t tx_second; uint32_t tx_pps; // in units of 1/16384000s + // Samples contain complexf, but since our internal representation is uint8_t + // we keep it like that std::vector tx_samples; // The timestamp of the first sample of the RX buffers uint32_t rx_second; uint32_t rx_pps; - std::vector rx_samples; + std::vector rx_samples; // Also, actually complexf }; // Serve TX samples and RX feedback samples over a TCP connection @@ -104,7 +107,8 @@ class OutputUHDFeedback { UHDReceiveBurstRequest burstRequest; - bool m_running = false; + std::atomic_bool m_running; + int m_server_sock = -1; uint16_t m_port = 0; uint32_t m_sampleRate = 0; uhd::usrp::multi_usrp::sptr m_usrp; -- cgit v1.2.3