aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-06-24 19:51:18 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-06-24 19:51:18 +0200
commitc93a726d8fffdca2b90aa22eed13ce58a0e8009f (patch)
treea0c39945367851b40dfd73d86508ceae3ce08acb
parent0a89de68defe14fb166a0a24b59ef495113a8b9d (diff)
downloaddabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.gz
dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.bz2
dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.zip
Fix UHD buffering leading to out of memory
Since commit d9ef93e, UHD does not backpressure the modulator anymore. If a pipe input is used, the ODR-DabMux before also doesn't get any back-pressure, the modulation thread and the mux run a very high rate. This high rate fills the buffer between OutputUHD and its worker thread, until the out-of-memory killer kills ODR-DabMod. Less impact on ZMQ input, because that is throttled at the mux, but we still have a buffer that can grow in an uncontrolled way
-rw-r--r--src/OutputUHD.cpp9
-rw-r--r--src/OutputUHD.h2
-rw-r--r--src/ThreadsafeQueue.h41
3 files changed, 40 insertions, 12 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index 1b84b7c..8f988f3 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014, 2015
+ Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -404,11 +404,8 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
}
else {
while (true) {
- if (uwd.frames.size() > FRAMES_MAX_SIZE) {
- usleep(10000); // 10ms
- }
-
- size_t num_frames = uwd.frames.push(frame);
+ size_t num_frames = uwd.frames.push_wait_if_full(frame,
+ FRAMES_MAX_SIZE);
etiLog.log(trace, "UHD,push %zu", num_frames);
break;
}
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index a74f627..1c59136 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014, 2015
+ Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index e5e83ef..e27c100 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -58,14 +58,37 @@ public:
size_t queue_size = the_queue.size();
lock.unlock();
- notify();
+ the_rx_notification.notify_one();
return queue_size;
}
- void notify()
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
{
- the_condition_variable.notify_one();
+ the_rx_notification.notify_one();
}
bool empty() const
@@ -89,6 +112,10 @@ public:
popped_value = the_queue.front();
the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
return true;
}
@@ -96,17 +123,21 @@ public:
{
boost::mutex::scoped_lock lock(the_mutex);
while (the_queue.size() < prebuffering) {
- the_condition_variable.wait(lock);
+ the_rx_notification.wait(lock);
}
popped_value = the_queue.front();
the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
}
private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
- boost::condition_variable the_condition_variable;
+ boost::condition_variable the_rx_notification;
+ boost::condition_variable the_tx_notification;
};
#endif