diff options
| author | shunt010 <sam@maxxwave.co.uk> | 2025-12-31 13:19:28 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-31 13:19:28 +0000 |
| commit | f8eaf51f61cdae65e90675920e427d23b8da7027 (patch) | |
| tree | e70c92214ac05cf0a2001e0481e289343c094d65 /lib/ThreadsafeQueue.h | |
| parent | f8b5402727b7e94aecbfb663a601577f97bae5b9 (diff) | |
| parent | a5f80a99e0dad51c45e8511347f27d816ae92e20 (diff) | |
| download | dabmux-f8eaf51f61cdae65e90675920e427d23b8da7027.tar.gz dabmux-f8eaf51f61cdae65e90675920e427d23b8da7027.tar.bz2 dabmux-f8eaf51f61cdae65e90675920e427d23b8da7027.zip | |
Merge pull request #1 from Opendigitalradio/master
Bring up to date
Diffstat (limited to 'lib/ThreadsafeQueue.h')
| -rw-r--r-- | lib/ThreadsafeQueue.h | 84 |
1 files changed, 71 insertions, 13 deletions
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 815dfe0..a8d2e85 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -28,10 +28,12 @@ #pragma once +#include <functional> #include <mutex> #include <condition_variable> -#include <queue> +#include <deque> #include <utility> +#include <cassert> /* This queue is meant to be used by two threads. One producer * that pushes elements into the queue, and one consumer that @@ -62,14 +64,13 @@ public: std::unique_lock<std::mutex> lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.push(val); + the_queue.push_back(val); } else if (queue_size_before < max_size) { - the_queue.push(val); + the_queue.push_back(val); } size_t queue_size = the_queue.size(); lock.unlock(); - the_rx_notification.notify_one(); return queue_size; @@ -80,10 +81,10 @@ public: std::unique_lock<std::mutex> lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } else if (queue_size_before < max_size) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -93,11 +94,57 @@ public: return queue_size; } + struct push_overflow_result { bool overflowed; size_t new_size; }; + + /* Push one element into the queue, and if queue is + * full remove one element from the other end. + * + * max_size == 0 is not allowed. + * + * returns the new queue size and a flag if overflow occurred. + */ + push_overflow_result push_overflow(T const& val, size_t max_size) + { + assert(max_size > 0); + std::unique_lock<std::mutex> lock(the_mutex); + + bool overflow = false; + while (the_queue.size() >= max_size) { + overflow = true; + the_queue.pop_front(); + } + the_queue.push_back(val); + const size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return {overflow, queue_size}; + } + + push_overflow_result push_overflow(T&& val, size_t max_size) + { + assert(max_size > 0); + std::unique_lock<std::mutex> lock(the_mutex); + + bool overflow = false; + while (the_queue.size() >= max_size) { + overflow = true; + the_queue.pop_front(); + } + the_queue.emplace_back(std::move(val)); + const size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return {overflow, queue_size}; + } + + /* Push one element into the queue, but wait until the * queue size goes below the threshold. * - * Notify waiting thread. - * * returns the new queue size. */ size_t push_wait_if_full(T const& val, size_t threshold) @@ -106,7 +153,7 @@ public: while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } - the_queue.push(val); + the_queue.push_back(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -152,7 +199,7 @@ public: } popped_value = the_queue.front(); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); @@ -174,15 +221,26 @@ public: } else { std::swap(popped_value, the_queue.front()); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); } } + template<typename R> + std::vector<R> map(std::function<R(const T&)> func) const + { + std::vector<R> result; + std::unique_lock<std::mutex> lock(the_mutex); + for (const T& elem : the_queue) { + result.push_back(func(elem)); + } + return result; + } + private: - std::queue<T> the_queue; + std::deque<T> the_queue; mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; |
