aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-10-17 06:29:29 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-10-17 06:29:29 +0200
commitad311a62d4020e3fd7a7048e9d25bd5735206c81 (patch)
tree9ca1308c636129d6c1ff1bccfe6fad630cb5c37a
parent3446e66971ed340146c1885b7768eefbaecac1bc (diff)
downloaddabmod-ad311a62d4020e3fd7a7048e9d25bd5735206c81.tar.gz
dabmod-ad311a62d4020e3fd7a7048e9d25bd5735206c81.tar.bz2
dabmod-ad311a62d4020e3fd7a7048e9d25bd5735206c81.zip
Recover OutputUHDFeedback failure
-rw-r--r--src/OutputUHD.cpp16
-rw-r--r--src/OutputUHD.h2
-rw-r--r--src/OutputUHDFeedback.cpp125
-rw-r--r--src/OutputUHDFeedback.h19
4 files changed, 96 insertions, 66 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index 10e605f..e1fe9dd 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -281,7 +281,8 @@ OutputUHD::OutputUHD(
myUsrp->set_rx_gain(myConf.rxgain);
etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", myUsrp->get_rx_gain());
- uhdFeedback.setup(myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate);
+ uhdFeedback = std::make_shared<OutputUHDFeedback>(
+ myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate);
MDEBUG("OutputUHD:UHD ready.\n");
}
@@ -451,7 +452,7 @@ int OutputUHD::process(Buffer* dataIn)
async_rx_thread.join();
first_run = true;
- etiLog.level(error) << "OutputUHD: Error, UHD worker failed";
+ etiLog.level(error) << "OutputUHD UHD worker failed";
throw std::runtime_error("UHD worker failed");
}
@@ -460,7 +461,16 @@ int OutputUHD::process(Buffer* dataIn)
"OutputUHD: dropping one frame with invalid FCT";
}
else {
- uhdFeedback.set_tx_frame(frame.buf, frame.ts);
+ try {
+ uhdFeedback->set_tx_frame(frame.buf, frame.ts);
+ }
+ catch (const runtime_error& e) {
+ etiLog.level(warn) <<
+ "OutputUHD: Feedback server failed, restarting...";
+
+ uhdFeedback = std::make_shared<OutputUHDFeedback>(
+ myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate);
+ }
size_t num_frames = frames.push_wait_if_full(frame,
FRAMES_MAX_SIZE);
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index f1ae09c..dfa471d 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -165,7 +165,7 @@ class OutputUHD: public ModOutput, public RemoteControllable {
std::shared_ptr<boost::barrier> mySyncBarrier;
bool first_run = true;
bool gps_fix_verified = false;
- OutputUHDFeedback uhdFeedback;
+ std::shared_ptr<OutputUHDFeedback> uhdFeedback;
private:
// Resize the internal delay buffer according to the dabMode and
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index cfb74b7..68783f2 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -51,19 +51,16 @@ DESCRIPTION:
using namespace std;
typedef std::complex<float> complexf;
-OutputUHDFeedback::OutputUHDFeedback()
+OutputUHDFeedback::OutputUHDFeedback(
+ uhd::usrp::multi_usrp::sptr usrp,
+ uint16_t port,
+ uint32_t sampleRate)
{
- m_running.store(false);
-}
-
-void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate)
-{
- m_usrp = usrp;
+ m_port = port;
m_sampleRate = sampleRate;
- burstRequest.state = BurstRequestState::None;
+ m_usrp = usrp;
- if (port) {
- m_port = port;
+ if (m_port) {
m_running.store(true);
rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this);
@@ -76,16 +73,24 @@ OutputUHDFeedback::~OutputUHDFeedback()
m_running.store(false);
rx_burst_thread.interrupt();
- burst_tcp_thread.interrupt();
+ if (rx_burst_thread.joinable()) {
+ rx_burst_thread.join();
+ }
- rx_burst_thread.join();
- burst_tcp_thread.join();
+ burst_tcp_thread.interrupt();
+ if (burst_tcp_thread.joinable()) {
+ burst_tcp_thread.join();
+ }
}
void OutputUHDFeedback::set_tx_frame(
const std::vector<uint8_t> &buf,
const struct frame_timestamp &buf_ts)
{
+ if (not m_running) {
+ throw runtime_error("OutputUHDFeedback not running");
+ }
+
boost::mutex::scoped_lock lock(burstRequest.mutex);
if (buf.size() % sizeof(complexf) != 0) {
@@ -128,62 +133,76 @@ void OutputUHDFeedback::set_tx_frame(
void OutputUHDFeedback::ReceiveBurstThread()
{
- set_thread_name("uhdreceiveburst");
+ try {
+ set_thread_name("uhdreceiveburst");
- uhd::stream_args_t stream_args("fc32"); //complex floats
- auto rxStream = m_usrp->get_rx_stream(stream_args);
+ uhd::stream_args_t stream_args("fc32"); //complex floats
+ auto rxStream = m_usrp->get_rx_stream(stream_args);
- while (m_running) {
- boost::mutex::scoped_lock lock(burstRequest.mutex);
- while (burstRequest.state != BurstRequestState::SaveReceiveFrame) {
- if (not m_running) break;
- burstRequest.mutex_notification.wait(lock);
- }
+ while (m_running) {
+ boost::mutex::scoped_lock lock(burstRequest.mutex);
+ while (burstRequest.state != BurstRequestState::SaveReceiveFrame) {
+ if (not m_running) break;
+ burstRequest.mutex_notification.wait(lock);
+ }
- if (not m_running) break;
+ if (not m_running) break;
- uhd::stream_cmd_t cmd(
- uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
- cmd.num_samps = burstRequest.num_samples;
- cmd.stream_now = false;
+ uhd::stream_cmd_t cmd(
+ uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
+ cmd.num_samps = burstRequest.num_samples;
+ cmd.stream_now = false;
- double pps = burstRequest.rx_pps / 16384000.0;
- cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
+ double pps = burstRequest.rx_pps / 16384000.0;
+ cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
- // We need to free the mutex while we recv(), because otherwise we block the
- // TX thread
- lock.unlock();
+ // We need to free the mutex while we recv(), because otherwise we block the
+ // TX thread
+ lock.unlock();
- const double usrp_time = m_usrp->get_time_now().get_real_secs();
- const double cmd_time = cmd.time_spec.get_real_secs();
+ const double usrp_time = m_usrp->get_time_now().get_real_secs();
+ const double cmd_time = cmd.time_spec.get_real_secs();
- rxStream->issue_stream_cmd(cmd);
+ rxStream->issue_stream_cmd(cmd);
- uhd::rx_metadata_t md;
+ uhd::rx_metadata_t md;
- std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf));
+ std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf));
- const double timeout = 60;
- size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout);
+ const double timeout = 60;
+ size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout);
- lock.lock();
- burstRequest.rx_samples = std::move(buf);
- burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
+ lock.lock();
+ burstRequest.rx_samples = std::move(buf);
+ burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
- // The recv might have happened at another time than requested
- burstRequest.rx_second = md.time_spec.get_full_secs();
- burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
+ // The recv might have happened at another time than requested
+ burstRequest.rx_second = md.time_spec.get_full_secs();
+ burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
- etiLog.level(debug) << "DPD: acquired " << samples_read << " RX feedback samples " <<
- "at time " << burstRequest.tx_second << " + " <<
- std::fixed << burstRequest.tx_pps / 16384000.0 <<
- " Delta=" << cmd_time - usrp_time;
+ etiLog.level(debug) << "DPD: acquired " << samples_read <<
+ " RX feedback samples " <<
+ "at time " << burstRequest.tx_second << " + " <<
+ std::fixed << burstRequest.tx_pps / 16384000.0 <<
+ " Delta=" << cmd_time - usrp_time;
- burstRequest.state = BurstRequestState::Acquired;
+ burstRequest.state = BurstRequestState::Acquired;
- lock.unlock();
- burstRequest.mutex_notification.notify_one();
+ lock.unlock();
+ burstRequest.mutex_notification.notify_one();
+ }
+ }
+ catch (const runtime_error &e) {
+ etiLog.level(error) << "DPD Feedback RX runtime error: " << e.what();
+ }
+ catch (const std::exception &e) {
+ etiLog.level(error) << "DPD Feedback RX exception: " << e.what();
+ }
+ catch (...) {
+ etiLog.level(error) << "DPD Feedback RX unknown exception!";
}
+
+ m_running.store(false);
}
void OutputUHDFeedback::ServeFeedback()
@@ -337,7 +356,7 @@ void OutputUHDFeedback::ServeFeedbackThread()
boost::this_thread::sleep(boost::posix_time::seconds(5));
}
- m_running = false;
+ m_running.store(false);
}
#endif
diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h
index da0d487..80d287f 100644
--- a/src/OutputUHDFeedback.h
+++ b/src/OutputUHDFeedback.h
@@ -61,23 +61,23 @@ struct UHDReceiveBurstRequest {
mutable boost::mutex mutex;
boost::condition_variable mutex_notification;
- BurstRequestState state;
+ BurstRequestState state = BurstRequestState::None;
// In the SaveTransmit states, num_samples complexf samples are saved into
// the vectors
- size_t num_samples;
+ size_t num_samples = 0;
// The timestamp of the first sample of the TX buffers
- uint32_t tx_second;
- uint32_t tx_pps; // in units of 1/16384000s
+ uint32_t tx_second = 0;
+ uint32_t tx_pps = 0; // in units of 1/16384000s
// Samples contain complexf, but since our internal representation is uint8_t
// we keep it like that
std::vector<uint8_t> tx_samples;
// The timestamp of the first sample of the RX buffers
- uint32_t rx_second;
- uint32_t rx_pps;
+ uint32_t rx_second = 0;
+ uint32_t rx_pps = 0;
std::vector<uint8_t> rx_samples; // Also, actually complexf
};
@@ -85,13 +85,14 @@ struct UHDReceiveBurstRequest {
// Serve TX samples and RX feedback samples over a TCP connection
class OutputUHDFeedback {
public:
- OutputUHDFeedback();
+ OutputUHDFeedback(
+ uhd::usrp::multi_usrp::sptr usrp,
+ uint16_t port,
+ uint32_t sampleRate);
OutputUHDFeedback(const OutputUHDFeedback& other) = delete;
OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete;
~OutputUHDFeedback();
- void setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate);
-
void set_tx_frame(const std::vector<uint8_t> &buf,
const struct frame_timestamp& ts);