diff options
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r-- | src/ThreadsafeQueue.h | 43 |
1 files changed, 27 insertions, 16 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e27c100..6f3808c 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue using boost thread library @@ -28,11 +28,12 @@ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef THREADSAFE_QUEUE_H -#define THREADSAFE_QUEUE_H +#pragma once -#include <boost/thread.hpp> +#include <mutex> +#include <condition_variable> #include <queue> +#include <utility> /* This queue is meant to be used by two threads. One producer * that pushes elements into the queue, and one consumer that @@ -53,7 +54,7 @@ public: */ size_t push(T const& val) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); the_queue.push(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -63,6 +64,18 @@ public: return queue_size; } + size_t push(T&& val) + { + std::unique_lock<std::mutex> lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + /* Push one element into the queue, but wait until the * queue size goes below the threshold. * @@ -72,7 +85,7 @@ public: */ size_t push_wait_if_full(T const& val, size_t threshold) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } @@ -93,19 +106,19 @@ public: bool empty() const { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); return the_queue.empty(); } size_t size() const { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); if (the_queue.empty()) { return false; } @@ -121,12 +134,12 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); while (the_queue.size() < prebuffering) { the_rx_notification.wait(lock); } - popped_value = the_queue.front(); + std::swap(popped_value, the_queue.front()); the_queue.pop(); lock.unlock(); @@ -135,10 +148,8 @@ public: private: std::queue<T> the_queue; - mutable boost::mutex the_mutex; - boost::condition_variable the_rx_notification; - boost::condition_variable the_tx_notification; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; }; -#endif - |