diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-06-05 09:15:56 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-06-05 09:22:22 +0200 |
commit | 71eb84d5f483af8d22402de3d2ec70b08b5802d3 (patch) | |
tree | 589fd978bdc357a0c67179d83c5c04bb0d732082 /src/ThreadsafeQueue.h | |
parent | c9ea7fb88809d935f3eaeee108415ff5abd17ead (diff) | |
download | dabmod-71eb84d5f483af8d22402de3d2ec70b08b5802d3.tar.gz dabmod-71eb84d5f483af8d22402de3d2ec70b08b5802d3.tar.bz2 dabmod-71eb84d5f483af8d22402de3d2ec70b08b5802d3.zip |
Fix intermittent underruns after a restart
When using UHD without synchronous=1:
After a modulator restart, the input ZMQ buffer is nearly empty, and
the modulator actually gets blocked by the ThreadsafeQueue. This is
visible by looking at the time deltas printed in the debugging code.
This commit adds a minimal form of prebuffering (10 ETI frames) to the
ZeroMQ input, and removes the useless minimum required size for the
ThreadsafeQueue.
In synchronous=1, the issue is not visible because the TIST defines the
time to transmit, which will cause the ZMQ buffer to fill.
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r-- | src/ThreadsafeQueue.h | 24 |
1 files changed, 6 insertions, 18 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 78e9ef0..e5e83ef 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -38,25 +38,14 @@ * that pushes elements into the queue, and one consumer that * retrieves the elements. * - * The queue can make the consumer block until enough elements - * are available. + * The queue can make the consumer block until an element + * is available. */ template<typename T> class ThreadsafeQueue { public: - /* Create a new queue without any minimum required - * fill before it is possible to pop an element - */ - ThreadsafeQueue() : the_required_size(1) {} - - /* Create a queue where it has to contain at least - * required_size elements before pop is possible - */ - ThreadsafeQueue(size_t required_size) : the_required_size(required_size) { - } - /* Push one element into the queue, and notify another thread that * might be waiting. * @@ -87,14 +76,14 @@ public: size_t size() const { + boost::mutex::scoped_lock lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { boost::mutex::scoped_lock lock(the_mutex); - if(the_queue.size() < the_required_size) - { + if (the_queue.empty()) { return false; } @@ -103,10 +92,10 @@ public: return true; } - void wait_and_pop(T& popped_value) + void wait_and_pop(T& popped_value, size_t prebuffering = 1) { boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.size() < the_required_size) { + while (the_queue.size() < prebuffering) { the_condition_variable.wait(lock); } @@ -118,7 +107,6 @@ private: std::queue<T> the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable; - size_t the_required_size; }; #endif |