From 411d03ac6b8ee1a8c06f952b9378c90516a715b7 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 20 Apr 2018 12:29:24 +0200 Subject: ThreadsafeQueue: add wakeup event instead of custom termination markers This avoids the issue that the ~SDR termination marker doesn't reach the consumer because it's still prebuffering --- src/ThreadsafeQueue.h | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) (limited to 'src/ThreadsafeQueue.h') diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index edda490..433eae3 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -40,9 +40,13 @@ * retrieves the elements. * * The queue can make the consumer block until an element - * is available. + * is available, or a wakeup requested. */ +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + template class ThreadsafeQueue { @@ -98,6 +102,17 @@ public: return queue_size; } + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + /* Send a notification for the receiver thread */ void notify(void) { @@ -135,15 +150,22 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { std::unique_lock lock(the_mutex); - while (the_queue.size() < prebuffering) { + while (the_queue.size() < prebuffering and + not wakeup_requested) { the_rx_notification.wait(lock); } - std::swap(popped_value, the_queue.front()); - the_queue.pop(); + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); - lock.unlock(); - the_tx_notification.notify_one(); + lock.unlock(); + the_tx_notification.notify_one(); + } } private: @@ -151,5 +173,6 @@ private: mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; + bool wakeup_requested = false; }; -- cgit v1.2.3