diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-06-24 19:51:18 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-06-24 19:51:18 +0200 |
commit | c93a726d8fffdca2b90aa22eed13ce58a0e8009f (patch) | |
tree | a0c39945367851b40dfd73d86508ceae3ce08acb /src/ThreadsafeQueue.h | |
parent | 0a89de68defe14fb166a0a24b59ef495113a8b9d (diff) | |
download | dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.gz dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.bz2 dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.zip |
Fix UHD buffering leading to out of memory
Since commit d9ef93e, UHD does not backpressure the modulator anymore.
If a pipe input is used, the ODR-DabMux before also doesn't get any
back-pressure, the modulation thread and the mux run a very high rate.
This high rate fills the buffer between OutputUHD and its worker thread,
until the out-of-memory killer kills ODR-DabMod.
Less impact on ZMQ input, because that is throttled at the mux, but
we still have a buffer that can grow in an uncontrolled way
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r-- | src/ThreadsafeQueue.h | 41 |
1 files changed, 36 insertions, 5 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e5e83ef..e27c100 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -58,14 +58,37 @@ public: size_t queue_size = the_queue.size(); lock.unlock(); - notify(); + the_rx_notification.notify_one(); return queue_size; } - void notify() + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + boost::mutex::scoped_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Send a notification for the receiver thread */ + void notify(void) { - the_condition_variable.notify_one(); + the_rx_notification.notify_one(); } bool empty() const @@ -89,6 +112,10 @@ public: popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + return true; } @@ -96,17 +123,21 @@ public: { boost::mutex::scoped_lock lock(the_mutex); while (the_queue.size() < prebuffering) { - the_condition_variable.wait(lock); + the_rx_notification.wait(lock); } popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); } private: std::queue<T> the_queue; mutable boost::mutex the_mutex; - boost::condition_variable the_condition_variable; + boost::condition_variable the_rx_notification; + boost::condition_variable the_tx_notification; }; #endif |