diff options
| -rw-r--r-- | src/OutputUHD.cpp | 16 | ||||
| -rw-r--r-- | src/OutputUHD.h | 2 | ||||
| -rw-r--r-- | src/OutputUHDFeedback.cpp | 125 | ||||
| -rw-r--r-- | src/OutputUHDFeedback.h | 19 | 
4 files changed, 96 insertions, 66 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 10e605f..e1fe9dd 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -281,7 +281,8 @@ OutputUHD::OutputUHD(      myUsrp->set_rx_gain(myConf.rxgain);      etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", myUsrp->get_rx_gain()); -    uhdFeedback.setup(myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); +    uhdFeedback = std::make_shared<OutputUHDFeedback>( +            myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate);      MDEBUG("OutputUHD:UHD ready.\n");  } @@ -451,7 +452,7 @@ int OutputUHD::process(Buffer* dataIn)              async_rx_thread.join();              first_run = true; -            etiLog.level(error) << "OutputUHD: Error, UHD worker failed"; +            etiLog.level(error) << "OutputUHD UHD worker failed";              throw std::runtime_error("UHD worker failed");          } @@ -460,7 +461,16 @@ int OutputUHD::process(Buffer* dataIn)                  "OutputUHD: dropping one frame with invalid FCT";          }          else { -            uhdFeedback.set_tx_frame(frame.buf, frame.ts); +            try { +                uhdFeedback->set_tx_frame(frame.buf, frame.ts); +            } +            catch (const runtime_error& e) { +                etiLog.level(warn) << +                    "OutputUHD: Feedback server failed, restarting..."; + +                uhdFeedback = std::make_shared<OutputUHDFeedback>( +                        myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); +            }              size_t num_frames = frames.push_wait_if_full(frame,                      FRAMES_MAX_SIZE); diff --git a/src/OutputUHD.h b/src/OutputUHD.h index f1ae09c..dfa471d 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -165,7 +165,7 @@ class OutputUHD: public ModOutput, public RemoteControllable {          std::shared_ptr<boost::barrier> mySyncBarrier;          bool first_run = true;          bool gps_fix_verified = false; -        OutputUHDFeedback uhdFeedback; +        std::shared_ptr<OutputUHDFeedback> uhdFeedback;      private:          // Resize the internal delay buffer according to the dabMode and diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index cfb74b7..68783f2 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -51,19 +51,16 @@ DESCRIPTION:  using namespace std;  typedef std::complex<float> complexf; -OutputUHDFeedback::OutputUHDFeedback() +OutputUHDFeedback::OutputUHDFeedback( +        uhd::usrp::multi_usrp::sptr usrp, +        uint16_t port, +        uint32_t sampleRate)  { -    m_running.store(false); -} - -void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate) -{ -    m_usrp = usrp; +    m_port = port;      m_sampleRate = sampleRate; -    burstRequest.state = BurstRequestState::None; +    m_usrp = usrp; -    if (port) { -        m_port = port; +    if (m_port) {          m_running.store(true);          rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); @@ -76,16 +73,24 @@ OutputUHDFeedback::~OutputUHDFeedback()      m_running.store(false);      rx_burst_thread.interrupt(); -    burst_tcp_thread.interrupt(); +    if (rx_burst_thread.joinable()) { +        rx_burst_thread.join(); +    } -    rx_burst_thread.join(); -    burst_tcp_thread.join(); +    burst_tcp_thread.interrupt(); +    if (burst_tcp_thread.joinable()) { +        burst_tcp_thread.join(); +    }  }  void OutputUHDFeedback::set_tx_frame(          const std::vector<uint8_t> &buf,          const struct frame_timestamp &buf_ts)  { +    if (not m_running) { +        throw runtime_error("OutputUHDFeedback not running"); +    } +      boost::mutex::scoped_lock lock(burstRequest.mutex);      if (buf.size() % sizeof(complexf) != 0) { @@ -128,62 +133,76 @@ void OutputUHDFeedback::set_tx_frame(  void OutputUHDFeedback::ReceiveBurstThread()  { -    set_thread_name("uhdreceiveburst"); +    try { +        set_thread_name("uhdreceiveburst"); -    uhd::stream_args_t stream_args("fc32"); //complex floats -    auto rxStream = m_usrp->get_rx_stream(stream_args); +        uhd::stream_args_t stream_args("fc32"); //complex floats +        auto rxStream = m_usrp->get_rx_stream(stream_args); -    while (m_running) { -        boost::mutex::scoped_lock lock(burstRequest.mutex); -        while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { -            if (not m_running) break; -            burstRequest.mutex_notification.wait(lock); -        } +        while (m_running) { +            boost::mutex::scoped_lock lock(burstRequest.mutex); +            while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { +                if (not m_running) break; +                burstRequest.mutex_notification.wait(lock); +            } -        if (not m_running) break; +            if (not m_running) break; -        uhd::stream_cmd_t cmd( -                uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); -        cmd.num_samps = burstRequest.num_samples; -        cmd.stream_now = false; +            uhd::stream_cmd_t cmd( +                    uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); +            cmd.num_samps = burstRequest.num_samples; +            cmd.stream_now = false; -        double pps = burstRequest.rx_pps / 16384000.0; -        cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); +            double pps = burstRequest.rx_pps / 16384000.0; +            cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); -        // We need to free the mutex while we recv(), because otherwise we block the -        // TX thread -        lock.unlock(); +            // We need to free the mutex while we recv(), because otherwise we block the +            // TX thread +            lock.unlock(); -        const double usrp_time = m_usrp->get_time_now().get_real_secs(); -        const double cmd_time = cmd.time_spec.get_real_secs(); +            const double usrp_time = m_usrp->get_time_now().get_real_secs(); +            const double cmd_time = cmd.time_spec.get_real_secs(); -        rxStream->issue_stream_cmd(cmd); +            rxStream->issue_stream_cmd(cmd); -        uhd::rx_metadata_t md; +            uhd::rx_metadata_t md; -        std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf)); +            std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf)); -        const double timeout = 60; -        size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); +            const double timeout = 60; +            size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); -        lock.lock(); -        burstRequest.rx_samples = std::move(buf); -        burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); +            lock.lock(); +            burstRequest.rx_samples = std::move(buf); +            burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); -        // The recv might have happened at another time than requested -        burstRequest.rx_second = md.time_spec.get_full_secs(); -        burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; +            // The recv might have happened at another time than requested +            burstRequest.rx_second = md.time_spec.get_full_secs(); +            burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; -        etiLog.level(debug) << "DPD: acquired " << samples_read << " RX feedback samples " << -            "at time " << burstRequest.tx_second << " + " << -            std::fixed << burstRequest.tx_pps / 16384000.0 << -            " Delta=" << cmd_time - usrp_time; +            etiLog.level(debug) << "DPD: acquired " << samples_read << +                " RX feedback samples " << +                "at time " << burstRequest.tx_second << " + " << +                std::fixed << burstRequest.tx_pps / 16384000.0 << +                " Delta=" << cmd_time - usrp_time; -        burstRequest.state = BurstRequestState::Acquired; +            burstRequest.state = BurstRequestState::Acquired; -        lock.unlock(); -        burstRequest.mutex_notification.notify_one(); +            lock.unlock(); +            burstRequest.mutex_notification.notify_one(); +        } +    } +    catch (const runtime_error &e) { +        etiLog.level(error) << "DPD Feedback RX runtime error: " << e.what(); +    } +    catch (const std::exception &e) { +        etiLog.level(error) << "DPD Feedback RX exception: " << e.what(); +    } +    catch (...) { +        etiLog.level(error) << "DPD Feedback RX unknown exception!";      } + +    m_running.store(false);  }  void OutputUHDFeedback::ServeFeedback() @@ -337,7 +356,7 @@ void OutputUHDFeedback::ServeFeedbackThread()          boost::this_thread::sleep(boost::posix_time::seconds(5));      } -    m_running = false; +    m_running.store(false);  }  #endif diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index da0d487..80d287f 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -61,23 +61,23 @@ struct UHDReceiveBurstRequest {      mutable boost::mutex mutex;      boost::condition_variable mutex_notification; -    BurstRequestState state; +    BurstRequestState state = BurstRequestState::None;      // In the SaveTransmit states, num_samples complexf samples are saved into      // the vectors -    size_t num_samples; +    size_t num_samples = 0;      // The timestamp of the first sample of the TX buffers -    uint32_t tx_second; -    uint32_t tx_pps; // in units of 1/16384000s +    uint32_t tx_second = 0; +    uint32_t tx_pps = 0; // in units of 1/16384000s      // Samples contain complexf, but since our internal representation is uint8_t      // we keep it like that      std::vector<uint8_t> tx_samples;      // The timestamp of the first sample of the RX buffers -    uint32_t rx_second; -    uint32_t rx_pps; +    uint32_t rx_second = 0; +    uint32_t rx_pps = 0;      std::vector<uint8_t> rx_samples; // Also, actually complexf  }; @@ -85,13 +85,14 @@ struct UHDReceiveBurstRequest {  // Serve TX samples and RX feedback samples over a TCP connection  class OutputUHDFeedback {      public: -        OutputUHDFeedback(); +        OutputUHDFeedback( +                uhd::usrp::multi_usrp::sptr usrp, +                uint16_t port, +                uint32_t sampleRate);          OutputUHDFeedback(const OutputUHDFeedback& other) = delete;          OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete;          ~OutputUHDFeedback(); -        void setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate); -          void set_tx_frame(const std::vector<uint8_t> &buf,                  const struct frame_timestamp& ts);  | 
