From 4de1e587d973c9bfcc52155f778a33fa9c969c83 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 20 Apr 2018 12:41:43 +0200 Subject: Update ThreadsafeQueue --- src/ThreadsafeQueue.h | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) (limited to 'src/ThreadsafeQueue.h') diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 4524559..ab287b2 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -5,7 +5,7 @@ Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li - An implementation for a threadsafe queue using std::thread + An implementation for a threadsafe queue, depends on C++11 When creating a ThreadsafeQueue, one can specify the minimal number of elements it must contain before it is possible to take one @@ -40,9 +40,13 @@ * retrieves the elements. * * The queue can make the consumer block until an element - * is available. + * is available, or a wakeup requested. */ +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + template class ThreadsafeQueue { @@ -98,6 +102,17 @@ public: return queue_size; } + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + /* Send a notification for the receiver thread */ void notify(void) { @@ -135,15 +150,22 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { std::unique_lock lock(the_mutex); - while (the_queue.size() < prebuffering) { + while (the_queue.size() < prebuffering and + not wakeup_requested) { the_rx_notification.wait(lock); } - std::swap(popped_value, the_queue.front()); - the_queue.pop(); + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); - lock.unlock(); - the_tx_notification.notify_one(); + lock.unlock(); + the_tx_notification.notify_one(); + } } private: @@ -151,5 +173,6 @@ private: mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; + bool wakeup_requested = false; }; -- cgit v1.2.3