summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:41:43 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:41:43 +0200
commit4de1e587d973c9bfcc52155f778a33fa9c969c83 (patch)
treefb74edc65e0f77f8eddbc63161e6ca2bb8ecef91
parente00c0e0c13639f23e16f59ada7f0aca1b8be16be (diff)
downloaddabmux-4de1e587d973c9bfcc52155f778a33fa9c969c83.tar.gz
dabmux-4de1e587d973c9bfcc52155f778a33fa9c969c83.tar.bz2
dabmux-4de1e587d973c9bfcc52155f778a33fa9c969c83.zip
Update ThreadsafeQueue
-rw-r--r--src/ThreadsafeQueue.h37
1 files changed, 30 insertions, 7 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index 4524559..ab287b2 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -5,7 +5,7 @@
Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
- An implementation for a threadsafe queue using std::thread
+ An implementation for a threadsafe queue, depends on C++11
When creating a ThreadsafeQueue, one can specify the minimal number
of elements it must contain before it is possible to take one
@@ -40,9 +40,13 @@
* retrieves the elements.
*
* The queue can make the consumer block until an element
- * is available.
+ * is available, or a wakeup requested.
*/
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
template<typename T>
class ThreadsafeQueue
{
@@ -98,6 +102,17 @@ public:
return queue_size;
}
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
/* Send a notification for the receiver thread */
void notify(void)
{
@@ -135,15 +150,22 @@ public:
void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering) {
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
the_rx_notification.wait(lock);
}
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
- lock.unlock();
- the_tx_notification.notify_one();
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
}
private:
@@ -151,5 +173,6 @@ private:
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
};