summaryrefslogtreecommitdiffstats
path: root/lib/ThreadsafeQueue.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
commit8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 (patch)
treec0296c305409bb7fc373625aea05c2f57054eb5c /lib/ThreadsafeQueue.h
parent43f4a3a2a695c303bd4fdfbd7fec6def29284f2e (diff)
downloaddabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.gz
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.bz2
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.zip
Work on STI-D/EDI input
Diffstat (limited to 'lib/ThreadsafeQueue.h')
-rw-r--r--lib/ThreadsafeQueue.h176
1 files changed, 176 insertions, 0 deletions
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
new file mode 100644
index 0000000..62f4c96
--- /dev/null
+++ b/lib/ThreadsafeQueue.h
@@ -0,0 +1,176 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue, depends on C++11
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <utility>
+
+/* This queue is meant to be used by two threads. One producer
+ * that pushes elements into the queue, and one consumer that
+ * retrieves the elements.
+ *
+ * The queue can make the consumer block until an element
+ * is available, or a wakeup requested.
+ */
+
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
+template<typename T>
+class ThreadsafeQueue
+{
+public:
+ /* Push one element into the queue, and notify another thread that
+ * might be waiting.
+ *
+ * returns the new queue size.
+ */
+ size_t push(T const& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ size_t push(T&& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.emplace(std::move(val));
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
+ {
+ the_rx_notification.notify_one();
+ }
+
+ bool empty() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ size_t size() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.size();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
+ the_rx_notification.wait(lock);
+ }
+
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
+ }
+
+private:
+ std::queue<T> the_queue;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
+};
+