summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ThreadsafeQueue.h45
1 files changed, 28 insertions, 17 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index d40c472..4524559 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -2,10 +2,10 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013, 2014
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
- An implementation for a threadsafe queue using boost thread library
+ An implementation for a threadsafe queue using std::thread
When creating a ThreadsafeQueue, one can specify the minimal number
of elements it must contain before it is possible to take one
@@ -28,11 +28,12 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef THREADSAFE_QUEUE_H
-#define THREADSAFE_QUEUE_H
+#pragma once
-#include <boost/thread.hpp>
+#include <mutex>
+#include <condition_variable>
#include <queue>
+#include <utility>
/* This queue is meant to be used by two threads. One producer
* that pushes elements into the queue, and one consumer that
@@ -53,7 +54,7 @@ public:
*/
size_t push(T const& val)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
the_queue.push(val);
size_t queue_size = the_queue.size();
lock.unlock();
@@ -63,6 +64,18 @@ public:
return queue_size;
}
+ size_t push(T&& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.emplace(std::move(val));
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
/* Push one element into the queue, but wait until the
* queue size goes below the threshold.
*
@@ -72,7 +85,7 @@ public:
*/
size_t push_wait_if_full(T const& val, size_t threshold)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
while (the_queue.size() >= threshold) {
the_tx_notification.wait(lock);
}
@@ -93,19 +106,19 @@ public:
bool empty() const
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
size_t size() const
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
if (the_queue.empty()) {
return false;
}
@@ -121,12 +134,12 @@ public:
void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
while (the_queue.size() < prebuffering) {
the_rx_notification.wait(lock);
}
- popped_value = the_queue.front();
+ std::swap(popped_value, the_queue.front());
the_queue.pop();
lock.unlock();
@@ -135,10 +148,8 @@ public:
private:
std::queue<T> the_queue;
- mutable boost::mutex the_mutex;
- boost::condition_variable the_rx_notification;
- boost::condition_variable the_tx_notification;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
};
-#endif
-