diff options
Diffstat (limited to 'src/OutputUHDFeedback.cpp')
-rw-r--r-- | src/OutputUHDFeedback.cpp | 125 |
1 files changed, 72 insertions, 53 deletions
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index cfb74b7..68783f2 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -51,19 +51,16 @@ DESCRIPTION: using namespace std; typedef std::complex<float> complexf; -OutputUHDFeedback::OutputUHDFeedback() +OutputUHDFeedback::OutputUHDFeedback( + uhd::usrp::multi_usrp::sptr usrp, + uint16_t port, + uint32_t sampleRate) { - m_running.store(false); -} - -void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate) -{ - m_usrp = usrp; + m_port = port; m_sampleRate = sampleRate; - burstRequest.state = BurstRequestState::None; + m_usrp = usrp; - if (port) { - m_port = port; + if (m_port) { m_running.store(true); rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); @@ -76,16 +73,24 @@ OutputUHDFeedback::~OutputUHDFeedback() m_running.store(false); rx_burst_thread.interrupt(); - burst_tcp_thread.interrupt(); + if (rx_burst_thread.joinable()) { + rx_burst_thread.join(); + } - rx_burst_thread.join(); - burst_tcp_thread.join(); + burst_tcp_thread.interrupt(); + if (burst_tcp_thread.joinable()) { + burst_tcp_thread.join(); + } } void OutputUHDFeedback::set_tx_frame( const std::vector<uint8_t> &buf, const struct frame_timestamp &buf_ts) { + if (not m_running) { + throw runtime_error("OutputUHDFeedback not running"); + } + boost::mutex::scoped_lock lock(burstRequest.mutex); if (buf.size() % sizeof(complexf) != 0) { @@ -128,62 +133,76 @@ void OutputUHDFeedback::set_tx_frame( void OutputUHDFeedback::ReceiveBurstThread() { - set_thread_name("uhdreceiveburst"); + try { + set_thread_name("uhdreceiveburst"); - uhd::stream_args_t stream_args("fc32"); //complex floats - auto rxStream = m_usrp->get_rx_stream(stream_args); + uhd::stream_args_t stream_args("fc32"); //complex floats + auto rxStream = m_usrp->get_rx_stream(stream_args); - while (m_running) { - boost::mutex::scoped_lock lock(burstRequest.mutex); - while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { - if (not m_running) break; - burstRequest.mutex_notification.wait(lock); - } + while (m_running) { + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { + if (not m_running) break; + burstRequest.mutex_notification.wait(lock); + } - if (not m_running) break; + if (not m_running) break; - uhd::stream_cmd_t cmd( - uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); - cmd.num_samps = burstRequest.num_samples; - cmd.stream_now = false; + uhd::stream_cmd_t cmd( + uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); + cmd.num_samps = burstRequest.num_samples; + cmd.stream_now = false; - double pps = burstRequest.rx_pps / 16384000.0; - cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); + double pps = burstRequest.rx_pps / 16384000.0; + cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); - // We need to free the mutex while we recv(), because otherwise we block the - // TX thread - lock.unlock(); + // We need to free the mutex while we recv(), because otherwise we block the + // TX thread + lock.unlock(); - const double usrp_time = m_usrp->get_time_now().get_real_secs(); - const double cmd_time = cmd.time_spec.get_real_secs(); + const double usrp_time = m_usrp->get_time_now().get_real_secs(); + const double cmd_time = cmd.time_spec.get_real_secs(); - rxStream->issue_stream_cmd(cmd); + rxStream->issue_stream_cmd(cmd); - uhd::rx_metadata_t md; + uhd::rx_metadata_t md; - std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf)); + std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf)); - const double timeout = 60; - size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); + const double timeout = 60; + size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); - lock.lock(); - burstRequest.rx_samples = std::move(buf); - burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); + lock.lock(); + burstRequest.rx_samples = std::move(buf); + burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); - // The recv might have happened at another time than requested - burstRequest.rx_second = md.time_spec.get_full_secs(); - burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; + // The recv might have happened at another time than requested + burstRequest.rx_second = md.time_spec.get_full_secs(); + burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; - etiLog.level(debug) << "DPD: acquired " << samples_read << " RX feedback samples " << - "at time " << burstRequest.tx_second << " + " << - std::fixed << burstRequest.tx_pps / 16384000.0 << - " Delta=" << cmd_time - usrp_time; + etiLog.level(debug) << "DPD: acquired " << samples_read << + " RX feedback samples " << + "at time " << burstRequest.tx_second << " + " << + std::fixed << burstRequest.tx_pps / 16384000.0 << + " Delta=" << cmd_time - usrp_time; - burstRequest.state = BurstRequestState::Acquired; + burstRequest.state = BurstRequestState::Acquired; - lock.unlock(); - burstRequest.mutex_notification.notify_one(); + lock.unlock(); + burstRequest.mutex_notification.notify_one(); + } + } + catch (const runtime_error &e) { + etiLog.level(error) << "DPD Feedback RX runtime error: " << e.what(); + } + catch (const std::exception &e) { + etiLog.level(error) << "DPD Feedback RX exception: " << e.what(); + } + catch (...) { + etiLog.level(error) << "DPD Feedback RX unknown exception!"; } + + m_running.store(false); } void OutputUHDFeedback::ServeFeedback() @@ -337,7 +356,7 @@ void OutputUHDFeedback::ServeFeedbackThread() boost::this_thread::sleep(boost::posix_time::seconds(5)); } - m_running = false; + m_running.store(false); } #endif |