diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2023-11-21 22:12:14 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2023-11-21 22:12:14 +0100 |
commit | 5fe36a627405b8fc65bdb212a6d505b9a6c8e489 (patch) | |
tree | 8646d654e0467be8603c5ad37fb6cd89656dfd26 /lib/ThreadsafeQueue.h | |
parent | 477ac4639a7c7f74f07a6164096fc7de102528ff (diff) | |
parent | f84065c3cc6fff0edb771f85190f7228f4d740b6 (diff) | |
download | dabmod-5fe36a627405b8fc65bdb212a6d505b9a6c8e489.tar.gz dabmod-5fe36a627405b8fc65bdb212a6d505b9a6c8e489.tar.bz2 dabmod-5fe36a627405b8fc65bdb212a6d505b9a6c8e489.zip |
Merge branch 'dexter' into next
Diffstat (limited to 'lib/ThreadsafeQueue.h')
-rw-r--r-- | lib/ThreadsafeQueue.h | 54 |
1 files changed, 50 insertions, 4 deletions
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 815dfe0..8b385d6 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2023 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -32,6 +32,7 @@ #include <condition_variable> #include <queue> #include <utility> +#include <cassert> /* This queue is meant to be used by two threads. One producer * that pushes elements into the queue, and one consumer that @@ -69,7 +70,6 @@ public: } size_t queue_size = the_queue.size(); lock.unlock(); - the_rx_notification.notify_one(); return queue_size; @@ -93,11 +93,57 @@ public: return queue_size; } + struct push_overflow_result { bool overflowed; size_t new_size; }; + + /* Push one element into the queue, and if queue is + * full remove one element from the other end. + * + * max_size == 0 is not allowed. + * + * returns the new queue size and a flag if overflow occurred. + */ + push_overflow_result push_overflow(T const& val, size_t max_size) + { + assert(max_size > 0); + std::unique_lock<std::mutex> lock(the_mutex); + + bool overflow = false; + while (the_queue.size() >= max_size) { + overflow = true; + the_queue.pop(); + } + the_queue.push(val); + const size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return {overflow, queue_size}; + } + + push_overflow_result push_overflow(T&& val, size_t max_size) + { + assert(max_size > 0); + std::unique_lock<std::mutex> lock(the_mutex); + + bool overflow = false; + while (the_queue.size() >= max_size) { + overflow = true; + the_queue.pop(); + } + the_queue.emplace(std::move(val)); + const size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return {overflow, queue_size}; + } + + /* Push one element into the queue, but wait until the * queue size goes below the threshold. * - * Notify waiting thread. - * * returns the new queue size. */ size_t push_wait_if_full(T const& val, size_t threshold) |