aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am1
-rw-r--r--src/ThreadsafeQueue.h144
2 files changed, 145 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 562c6db..87d3eec 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -95,6 +95,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \
RemoteControl.cpp RemoteControl.h \
TcpSocket.h TcpSocket.cpp \
UdpSocket.h UdpSocket.cpp \
+ ThreadsafeQueue.h \
bridge.h bridge.c \
crc.h crc.c \
fig/FIG.h fig/FIG.cpp \
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
new file mode 100644
index 0000000..d40c472
--- /dev/null
+++ b/src/ThreadsafeQueue.h
@@ -0,0 +1,144 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2013, 2014
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue using boost thread library
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef THREADSAFE_QUEUE_H
+#define THREADSAFE_QUEUE_H
+
+#include <boost/thread.hpp>
+#include <queue>
+
+/* This queue is meant to be used by two threads. One producer
+ * that pushes elements into the queue, and one consumer that
+ * retrieves the elements.
+ *
+ * The queue can make the consumer block until an element
+ * is available.
+ */
+
+template<typename T>
+class ThreadsafeQueue
+{
+public:
+ /* Push one element into the queue, and notify another thread that
+ * might be waiting.
+ *
+ * returns the new queue size.
+ */
+ size_t push(T const& val)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
+ {
+ the_rx_notification.notify_one();
+ }
+
+ bool empty() const
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ size_t size() const
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ return the_queue.size();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ while (the_queue.size() < prebuffering) {
+ the_rx_notification.wait(lock);
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
+
+private:
+ std::queue<T> the_queue;
+ mutable boost::mutex the_mutex;
+ boost::condition_variable the_rx_notification;
+ boost::condition_variable the_tx_notification;
+};
+
+#endif
+