diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-20 12:29:24 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-20 12:29:24 +0200 |
commit | 411d03ac6b8ee1a8c06f952b9378c90516a715b7 (patch) | |
tree | 3236a6121eb9137a79b82699006df877e3876c32 /src/ThreadsafeQueue.h | |
parent | 4f9b087a578fac9dffef83cdcb41573468a4ae17 (diff) | |
download | dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.gz dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.bz2 dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.zip |
ThreadsafeQueue: add wakeup event instead of custom termination markers
This avoids the issue that the ~SDR termination marker doesn't reach the
consumer because it's still prebuffering
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r-- | src/ThreadsafeQueue.h | 35 |
1 files changed, 29 insertions, 6 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index edda490..433eae3 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -40,9 +40,13 @@ * retrieves the elements. * * The queue can make the consumer block until an element - * is available. + * is available, or a wakeup requested. */ +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + template<typename T> class ThreadsafeQueue { @@ -98,6 +102,17 @@ public: return queue_size; } + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock<std::mutex> lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + /* Send a notification for the receiver thread */ void notify(void) { @@ -135,15 +150,22 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() < prebuffering) { + while (the_queue.size() < prebuffering and + not wakeup_requested) { the_rx_notification.wait(lock); } - std::swap(popped_value, the_queue.front()); - the_queue.pop(); + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); - lock.unlock(); - the_tx_notification.notify_one(); + lock.unlock(); + the_tx_notification.notify_one(); + } } private: @@ -151,5 +173,6 @@ private: mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; + bool wakeup_requested = false; }; |