summaryrefslogtreecommitdiffstats
path: root/src/ThreadsafeQueue.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-06-24 19:51:18 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-06-24 19:51:18 +0200
commitc93a726d8fffdca2b90aa22eed13ce58a0e8009f (patch)
treea0c39945367851b40dfd73d86508ceae3ce08acb /src/ThreadsafeQueue.h
parent0a89de68defe14fb166a0a24b59ef495113a8b9d (diff)
downloaddabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.gz
dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.bz2
dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.zip
Fix UHD buffering leading to out of memory
Since commit d9ef93e, UHD does not backpressure the modulator anymore. If a pipe input is used, the ODR-DabMux before also doesn't get any back-pressure, the modulation thread and the mux run a very high rate. This high rate fills the buffer between OutputUHD and its worker thread, until the out-of-memory killer kills ODR-DabMod. Less impact on ZMQ input, because that is throttled at the mux, but we still have a buffer that can grow in an uncontrolled way
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r--src/ThreadsafeQueue.h41
1 files changed, 36 insertions, 5 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index e5e83ef..e27c100 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -58,14 +58,37 @@ public:
size_t queue_size = the_queue.size();
lock.unlock();
- notify();
+ the_rx_notification.notify_one();
return queue_size;
}
- void notify()
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
{
- the_condition_variable.notify_one();
+ the_rx_notification.notify_one();
}
bool empty() const
@@ -89,6 +112,10 @@ public:
popped_value = the_queue.front();
the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
return true;
}
@@ -96,17 +123,21 @@ public:
{
boost::mutex::scoped_lock lock(the_mutex);
while (the_queue.size() < prebuffering) {
- the_condition_variable.wait(lock);
+ the_rx_notification.wait(lock);
}
popped_value = the_queue.front();
the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
}
private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
- boost::condition_variable the_condition_variable;
+ boost::condition_variable the_rx_notification;
+ boost::condition_variable the_tx_notification;
};
#endif