summaryrefslogtreecommitdiffstats
path: root/src/ThreadsafeQueue.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
commit8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 (patch)
treec0296c305409bb7fc373625aea05c2f57054eb5c /src/ThreadsafeQueue.h
parent43f4a3a2a695c303bd4fdfbd7fec6def29284f2e (diff)
downloaddabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.gz
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.bz2
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.zip
Work on STI-D/EDI input
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r--src/ThreadsafeQueue.h178
1 files changed, 0 insertions, 178 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
deleted file mode 100644
index ab287b2..0000000
--- a/src/ThreadsafeQueue.h
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- An implementation for a threadsafe queue, depends on C++11
-
- When creating a ThreadsafeQueue, one can specify the minimal number
- of elements it must contain before it is possible to take one
- element out.
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <utility>
-
-/* This queue is meant to be used by two threads. One producer
- * that pushes elements into the queue, and one consumer that
- * retrieves the elements.
- *
- * The queue can make the consumer block until an element
- * is available, or a wakeup requested.
- */
-
-/* Class thrown by blocking pop to tell the consumer
- * that there's a wakeup requested. */
-class ThreadsafeQueueWakeup {};
-
-template<typename T>
-class ThreadsafeQueue
-{
-public:
- /* Push one element into the queue, and notify another thread that
- * might be waiting.
- *
- * returns the new queue size.
- */
- size_t push(T const& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- size_t push(T&& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.emplace(std::move(val));
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Push one element into the queue, but wait until the
- * queue size goes below the threshold.
- *
- * Notify waiting thread.
- *
- * returns the new queue size.
- */
- size_t push_wait_if_full(T const& val, size_t threshold)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() >= threshold) {
- the_tx_notification.wait(lock);
- }
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Trigger a wakeup event on a blocking consumer, which
- * will receive a ThreadsafeQueueWakeup exception.
- */
- void trigger_wakeup(void)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- wakeup_requested = true;
- lock.unlock();
- the_rx_notification.notify_one();
- }
-
- /* Send a notification for the receiver thread */
- void notify(void)
- {
- the_rx_notification.notify_one();
- }
-
- bool empty() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.empty();
- }
-
- size_t size() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.size();
- }
-
- bool try_pop(T& popped_value)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- if (the_queue.empty()) {
- return false;
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
-
- return true;
- }
-
- void wait_and_pop(T& popped_value, size_t prebuffering = 1)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering and
- not wakeup_requested) {
- the_rx_notification.wait(lock);
- }
-
- if (wakeup_requested) {
- wakeup_requested = false;
- throw ThreadsafeQueueWakeup();
- }
- else {
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
- }
- }
-
-private:
- std::queue<T> the_queue;
- mutable std::mutex the_mutex;
- std::condition_variable the_rx_notification;
- std::condition_variable the_tx_notification;
- bool wakeup_requested = false;
-};
-