diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-06-24 19:51:18 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-06-24 19:51:18 +0200 |
commit | c93a726d8fffdca2b90aa22eed13ce58a0e8009f (patch) | |
tree | a0c39945367851b40dfd73d86508ceae3ce08acb /src | |
parent | 0a89de68defe14fb166a0a24b59ef495113a8b9d (diff) | |
download | dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.gz dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.tar.bz2 dabmod-c93a726d8fffdca2b90aa22eed13ce58a0e8009f.zip |
Fix UHD buffering leading to out of memory
Since commit d9ef93e, UHD does not backpressure the modulator anymore.
If a pipe input is used, the ODR-DabMux before also doesn't get any
back-pressure, the modulation thread and the mux run a very high rate.
This high rate fills the buffer between OutputUHD and its worker thread,
until the out-of-memory killer kills ODR-DabMod.
Less impact on ZMQ input, because that is throttled at the mux, but
we still have a buffer that can grow in an uncontrolled way
Diffstat (limited to 'src')
-rw-r--r-- | src/OutputUHD.cpp | 9 | ||||
-rw-r--r-- | src/OutputUHD.h | 2 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 41 |
3 files changed, 40 insertions, 12 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 1b84b7c..8f988f3 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014, 2015 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -404,11 +404,8 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } else { while (true) { - if (uwd.frames.size() > FRAMES_MAX_SIZE) { - usleep(10000); // 10ms - } - - size_t num_frames = uwd.frames.push(frame); + size_t num_frames = uwd.frames.push_wait_if_full(frame, + FRAMES_MAX_SIZE); etiLog.log(trace, "UHD,push %zu", num_frames); break; } diff --git a/src/OutputUHD.h b/src/OutputUHD.h index a74f627..1c59136 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014, 2015 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e5e83ef..e27c100 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -58,14 +58,37 @@ public: size_t queue_size = the_queue.size(); lock.unlock(); - notify(); + the_rx_notification.notify_one(); return queue_size; } - void notify() + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + boost::mutex::scoped_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Send a notification for the receiver thread */ + void notify(void) { - the_condition_variable.notify_one(); + the_rx_notification.notify_one(); } bool empty() const @@ -89,6 +112,10 @@ public: popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + return true; } @@ -96,17 +123,21 @@ public: { boost::mutex::scoped_lock lock(the_mutex); while (the_queue.size() < prebuffering) { - the_condition_variable.wait(lock); + the_rx_notification.wait(lock); } popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); } private: std::queue<T> the_queue; mutable boost::mutex the_mutex; - boost::condition_variable the_condition_variable; + boost::condition_variable the_rx_notification; + boost::condition_variable the_tx_notification; }; #endif |