From ad311a62d4020e3fd7a7048e9d25bd5735206c81 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 17 Oct 2017 06:29:29 +0200 Subject: Recover OutputUHDFeedback failure --- src/OutputUHD.cpp | 16 ++++-- src/OutputUHD.h | 2 +- src/OutputUHDFeedback.cpp | 125 ++++++++++++++++++++++++++-------------------- src/OutputUHDFeedback.h | 19 +++---- 4 files changed, 96 insertions(+), 66 deletions(-) (limited to 'src') diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 10e605f..e1fe9dd 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -281,7 +281,8 @@ OutputUHD::OutputUHD( myUsrp->set_rx_gain(myConf.rxgain); etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", myUsrp->get_rx_gain()); - uhdFeedback.setup(myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); + uhdFeedback = std::make_shared( + myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); MDEBUG("OutputUHD:UHD ready.\n"); } @@ -451,7 +452,7 @@ int OutputUHD::process(Buffer* dataIn) async_rx_thread.join(); first_run = true; - etiLog.level(error) << "OutputUHD: Error, UHD worker failed"; + etiLog.level(error) << "OutputUHD UHD worker failed"; throw std::runtime_error("UHD worker failed"); } @@ -460,7 +461,16 @@ int OutputUHD::process(Buffer* dataIn) "OutputUHD: dropping one frame with invalid FCT"; } else { - uhdFeedback.set_tx_frame(frame.buf, frame.ts); + try { + uhdFeedback->set_tx_frame(frame.buf, frame.ts); + } + catch (const runtime_error& e) { + etiLog.level(warn) << + "OutputUHD: Feedback server failed, restarting..."; + + uhdFeedback = std::make_shared( + myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); + } size_t num_frames = frames.push_wait_if_full(frame, FRAMES_MAX_SIZE); diff --git a/src/OutputUHD.h b/src/OutputUHD.h index f1ae09c..dfa471d 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -165,7 +165,7 @@ class OutputUHD: public ModOutput, public RemoteControllable { std::shared_ptr mySyncBarrier; bool first_run = true; bool gps_fix_verified = false; - OutputUHDFeedback uhdFeedback; + std::shared_ptr uhdFeedback; private: // Resize the internal delay buffer according to the dabMode and diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index cfb74b7..68783f2 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -51,19 +51,16 @@ DESCRIPTION: using namespace std; typedef std::complex complexf; -OutputUHDFeedback::OutputUHDFeedback() +OutputUHDFeedback::OutputUHDFeedback( + uhd::usrp::multi_usrp::sptr usrp, + uint16_t port, + uint32_t sampleRate) { - m_running.store(false); -} - -void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate) -{ - m_usrp = usrp; + m_port = port; m_sampleRate = sampleRate; - burstRequest.state = BurstRequestState::None; + m_usrp = usrp; - if (port) { - m_port = port; + if (m_port) { m_running.store(true); rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); @@ -76,16 +73,24 @@ OutputUHDFeedback::~OutputUHDFeedback() m_running.store(false); rx_burst_thread.interrupt(); - burst_tcp_thread.interrupt(); + if (rx_burst_thread.joinable()) { + rx_burst_thread.join(); + } - rx_burst_thread.join(); - burst_tcp_thread.join(); + burst_tcp_thread.interrupt(); + if (burst_tcp_thread.joinable()) { + burst_tcp_thread.join(); + } } void OutputUHDFeedback::set_tx_frame( const std::vector &buf, const struct frame_timestamp &buf_ts) { + if (not m_running) { + throw runtime_error("OutputUHDFeedback not running"); + } + boost::mutex::scoped_lock lock(burstRequest.mutex); if (buf.size() % sizeof(complexf) != 0) { @@ -128,62 +133,76 @@ void OutputUHDFeedback::set_tx_frame( void OutputUHDFeedback::ReceiveBurstThread() { - set_thread_name("uhdreceiveburst"); + try { + set_thread_name("uhdreceiveburst"); - uhd::stream_args_t stream_args("fc32"); //complex floats - auto rxStream = m_usrp->get_rx_stream(stream_args); + uhd::stream_args_t stream_args("fc32"); //complex floats + auto rxStream = m_usrp->get_rx_stream(stream_args); - while (m_running) { - boost::mutex::scoped_lock lock(burstRequest.mutex); - while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { - if (not m_running) break; - burstRequest.mutex_notification.wait(lock); - } + while (m_running) { + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { + if (not m_running) break; + burstRequest.mutex_notification.wait(lock); + } - if (not m_running) break; + if (not m_running) break; - uhd::stream_cmd_t cmd( - uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); - cmd.num_samps = burstRequest.num_samples; - cmd.stream_now = false; + uhd::stream_cmd_t cmd( + uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); + cmd.num_samps = burstRequest.num_samples; + cmd.stream_now = false; - double pps = burstRequest.rx_pps / 16384000.0; - cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); + double pps = burstRequest.rx_pps / 16384000.0; + cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); - // We need to free the mutex while we recv(), because otherwise we block the - // TX thread - lock.unlock(); + // We need to free the mutex while we recv(), because otherwise we block the + // TX thread + lock.unlock(); - const double usrp_time = m_usrp->get_time_now().get_real_secs(); - const double cmd_time = cmd.time_spec.get_real_secs(); + const double usrp_time = m_usrp->get_time_now().get_real_secs(); + const double cmd_time = cmd.time_spec.get_real_secs(); - rxStream->issue_stream_cmd(cmd); + rxStream->issue_stream_cmd(cmd); - uhd::rx_metadata_t md; + uhd::rx_metadata_t md; - std::vector buf(cmd.num_samps * sizeof(complexf)); + std::vector buf(cmd.num_samps * sizeof(complexf)); - const double timeout = 60; - size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); + const double timeout = 60; + size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); - lock.lock(); - burstRequest.rx_samples = std::move(buf); - burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); + lock.lock(); + burstRequest.rx_samples = std::move(buf); + burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); - // The recv might have happened at another time than requested - burstRequest.rx_second = md.time_spec.get_full_secs(); - burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; + // The recv might have happened at another time than requested + burstRequest.rx_second = md.time_spec.get_full_secs(); + burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; - etiLog.level(debug) << "DPD: acquired " << samples_read << " RX feedback samples " << - "at time " << burstRequest.tx_second << " + " << - std::fixed << burstRequest.tx_pps / 16384000.0 << - " Delta=" << cmd_time - usrp_time; + etiLog.level(debug) << "DPD: acquired " << samples_read << + " RX feedback samples " << + "at time " << burstRequest.tx_second << " + " << + std::fixed << burstRequest.tx_pps / 16384000.0 << + " Delta=" << cmd_time - usrp_time; - burstRequest.state = BurstRequestState::Acquired; + burstRequest.state = BurstRequestState::Acquired; - lock.unlock(); - burstRequest.mutex_notification.notify_one(); + lock.unlock(); + burstRequest.mutex_notification.notify_one(); + } + } + catch (const runtime_error &e) { + etiLog.level(error) << "DPD Feedback RX runtime error: " << e.what(); + } + catch (const std::exception &e) { + etiLog.level(error) << "DPD Feedback RX exception: " << e.what(); + } + catch (...) { + etiLog.level(error) << "DPD Feedback RX unknown exception!"; } + + m_running.store(false); } void OutputUHDFeedback::ServeFeedback() @@ -337,7 +356,7 @@ void OutputUHDFeedback::ServeFeedbackThread() boost::this_thread::sleep(boost::posix_time::seconds(5)); } - m_running = false; + m_running.store(false); } #endif diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index da0d487..80d287f 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -61,23 +61,23 @@ struct UHDReceiveBurstRequest { mutable boost::mutex mutex; boost::condition_variable mutex_notification; - BurstRequestState state; + BurstRequestState state = BurstRequestState::None; // In the SaveTransmit states, num_samples complexf samples are saved into // the vectors - size_t num_samples; + size_t num_samples = 0; // The timestamp of the first sample of the TX buffers - uint32_t tx_second; - uint32_t tx_pps; // in units of 1/16384000s + uint32_t tx_second = 0; + uint32_t tx_pps = 0; // in units of 1/16384000s // Samples contain complexf, but since our internal representation is uint8_t // we keep it like that std::vector tx_samples; // The timestamp of the first sample of the RX buffers - uint32_t rx_second; - uint32_t rx_pps; + uint32_t rx_second = 0; + uint32_t rx_pps = 0; std::vector rx_samples; // Also, actually complexf }; @@ -85,13 +85,14 @@ struct UHDReceiveBurstRequest { // Serve TX samples and RX feedback samples over a TCP connection class OutputUHDFeedback { public: - OutputUHDFeedback(); + OutputUHDFeedback( + uhd::usrp::multi_usrp::sptr usrp, + uint16_t port, + uint32_t sampleRate); OutputUHDFeedback(const OutputUHDFeedback& other) = delete; OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete; ~OutputUHDFeedback(); - void setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate); - void set_tx_frame(const std::vector &buf, const struct frame_timestamp& ts); -- cgit v1.2.3