aboutsummaryrefslogtreecommitdiffstats
path: root/src/OutputUHDFeedback.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/OutputUHDFeedback.cpp')
-rw-r--r--src/OutputUHDFeedback.cpp125
1 files changed, 72 insertions, 53 deletions
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index cfb74b7..68783f2 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -51,19 +51,16 @@ DESCRIPTION:
using namespace std;
typedef std::complex<float> complexf;
-OutputUHDFeedback::OutputUHDFeedback()
+OutputUHDFeedback::OutputUHDFeedback(
+ uhd::usrp::multi_usrp::sptr usrp,
+ uint16_t port,
+ uint32_t sampleRate)
{
- m_running.store(false);
-}
-
-void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate)
-{
- m_usrp = usrp;
+ m_port = port;
m_sampleRate = sampleRate;
- burstRequest.state = BurstRequestState::None;
+ m_usrp = usrp;
- if (port) {
- m_port = port;
+ if (m_port) {
m_running.store(true);
rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this);
@@ -76,16 +73,24 @@ OutputUHDFeedback::~OutputUHDFeedback()
m_running.store(false);
rx_burst_thread.interrupt();
- burst_tcp_thread.interrupt();
+ if (rx_burst_thread.joinable()) {
+ rx_burst_thread.join();
+ }
- rx_burst_thread.join();
- burst_tcp_thread.join();
+ burst_tcp_thread.interrupt();
+ if (burst_tcp_thread.joinable()) {
+ burst_tcp_thread.join();
+ }
}
void OutputUHDFeedback::set_tx_frame(
const std::vector<uint8_t> &buf,
const struct frame_timestamp &buf_ts)
{
+ if (not m_running) {
+ throw runtime_error("OutputUHDFeedback not running");
+ }
+
boost::mutex::scoped_lock lock(burstRequest.mutex);
if (buf.size() % sizeof(complexf) != 0) {
@@ -128,62 +133,76 @@ void OutputUHDFeedback::set_tx_frame(
void OutputUHDFeedback::ReceiveBurstThread()
{
- set_thread_name("uhdreceiveburst");
+ try {
+ set_thread_name("uhdreceiveburst");
- uhd::stream_args_t stream_args("fc32"); //complex floats
- auto rxStream = m_usrp->get_rx_stream(stream_args);
+ uhd::stream_args_t stream_args("fc32"); //complex floats
+ auto rxStream = m_usrp->get_rx_stream(stream_args);
- while (m_running) {
- boost::mutex::scoped_lock lock(burstRequest.mutex);
- while (burstRequest.state != BurstRequestState::SaveReceiveFrame) {
- if (not m_running) break;
- burstRequest.mutex_notification.wait(lock);
- }
+ while (m_running) {
+ boost::mutex::scoped_lock lock(burstRequest.mutex);
+ while (burstRequest.state != BurstRequestState::SaveReceiveFrame) {
+ if (not m_running) break;
+ burstRequest.mutex_notification.wait(lock);
+ }
- if (not m_running) break;
+ if (not m_running) break;
- uhd::stream_cmd_t cmd(
- uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
- cmd.num_samps = burstRequest.num_samples;
- cmd.stream_now = false;
+ uhd::stream_cmd_t cmd(
+ uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
+ cmd.num_samps = burstRequest.num_samples;
+ cmd.stream_now = false;
- double pps = burstRequest.rx_pps / 16384000.0;
- cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
+ double pps = burstRequest.rx_pps / 16384000.0;
+ cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
- // We need to free the mutex while we recv(), because otherwise we block the
- // TX thread
- lock.unlock();
+ // We need to free the mutex while we recv(), because otherwise we block the
+ // TX thread
+ lock.unlock();
- const double usrp_time = m_usrp->get_time_now().get_real_secs();
- const double cmd_time = cmd.time_spec.get_real_secs();
+ const double usrp_time = m_usrp->get_time_now().get_real_secs();
+ const double cmd_time = cmd.time_spec.get_real_secs();
- rxStream->issue_stream_cmd(cmd);
+ rxStream->issue_stream_cmd(cmd);
- uhd::rx_metadata_t md;
+ uhd::rx_metadata_t md;
- std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf));
+ std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf));
- const double timeout = 60;
- size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout);
+ const double timeout = 60;
+ size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout);
- lock.lock();
- burstRequest.rx_samples = std::move(buf);
- burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
+ lock.lock();
+ burstRequest.rx_samples = std::move(buf);
+ burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
- // The recv might have happened at another time than requested
- burstRequest.rx_second = md.time_spec.get_full_secs();
- burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
+ // The recv might have happened at another time than requested
+ burstRequest.rx_second = md.time_spec.get_full_secs();
+ burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
- etiLog.level(debug) << "DPD: acquired " << samples_read << " RX feedback samples " <<
- "at time " << burstRequest.tx_second << " + " <<
- std::fixed << burstRequest.tx_pps / 16384000.0 <<
- " Delta=" << cmd_time - usrp_time;
+ etiLog.level(debug) << "DPD: acquired " << samples_read <<
+ " RX feedback samples " <<
+ "at time " << burstRequest.tx_second << " + " <<
+ std::fixed << burstRequest.tx_pps / 16384000.0 <<
+ " Delta=" << cmd_time - usrp_time;
- burstRequest.state = BurstRequestState::Acquired;
+ burstRequest.state = BurstRequestState::Acquired;
- lock.unlock();
- burstRequest.mutex_notification.notify_one();
+ lock.unlock();
+ burstRequest.mutex_notification.notify_one();
+ }
+ }
+ catch (const runtime_error &e) {
+ etiLog.level(error) << "DPD Feedback RX runtime error: " << e.what();
+ }
+ catch (const std::exception &e) {
+ etiLog.level(error) << "DPD Feedback RX exception: " << e.what();
+ }
+ catch (...) {
+ etiLog.level(error) << "DPD Feedback RX unknown exception!";
}
+
+ m_running.store(false);
}
void OutputUHDFeedback::ServeFeedback()
@@ -337,7 +356,7 @@ void OutputUHDFeedback::ServeFeedbackThread()
boost::this_thread::sleep(boost::posix_time::seconds(5));
}
- m_running = false;
+ m_running.store(false);
}
#endif