From f353cc3a990946ad6f4649b9491aaf3f5344a83e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 6 Mar 2018 23:06:14 +0100 Subject: Update ThreadSafequeue --- src/ThreadsafeQueue.h | 45 ++++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 17 deletions(-) (limited to 'src/ThreadsafeQueue.h') diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index d40c472..4524559 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -2,10 +2,10 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li - An implementation for a threadsafe queue using boost thread library + An implementation for a threadsafe queue using std::thread When creating a ThreadsafeQueue, one can specify the minimal number of elements it must contain before it is possible to take one @@ -28,11 +28,12 @@ along with ODR-DabMux. If not, see . */ -#ifndef THREADSAFE_QUEUE_H -#define THREADSAFE_QUEUE_H +#pragma once -#include +#include +#include #include +#include /* This queue is meant to be used by two threads. One producer * that pushes elements into the queue, and one consumer that @@ -53,7 +54,7 @@ public: */ size_t push(T const& val) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock lock(the_mutex); the_queue.push(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -63,6 +64,18 @@ public: return queue_size; } + size_t push(T&& val) + { + std::unique_lock lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + /* Push one element into the queue, but wait until the * queue size goes below the threshold. * @@ -72,7 +85,7 @@ public: */ size_t push_wait_if_full(T const& val, size_t threshold) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock lock(the_mutex); while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } @@ -93,19 +106,19 @@ public: bool empty() const { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock lock(the_mutex); return the_queue.empty(); } size_t size() const { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock lock(the_mutex); if (the_queue.empty()) { return false; } @@ -121,12 +134,12 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock lock(the_mutex); while (the_queue.size() < prebuffering) { the_rx_notification.wait(lock); } - popped_value = the_queue.front(); + std::swap(popped_value, the_queue.front()); the_queue.pop(); lock.unlock(); @@ -135,10 +148,8 @@ public: private: std::queue the_queue; - mutable boost::mutex the_mutex; - boost::condition_variable the_rx_notification; - boost::condition_variable the_tx_notification; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; }; -#endif - -- cgit v1.2.3