diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-07 10:14:51 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-07 10:14:51 +0200 |
commit | 8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 (patch) | |
tree | c0296c305409bb7fc373625aea05c2f57054eb5c /lib/ThreadsafeQueue.h | |
parent | 43f4a3a2a695c303bd4fdfbd7fec6def29284f2e (diff) | |
download | dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.gz dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.bz2 dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.zip |
Work on STI-D/EDI input
Diffstat (limited to 'lib/ThreadsafeQueue.h')
-rw-r--r-- | lib/ThreadsafeQueue.h | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h new file mode 100644 index 0000000..62f4c96 --- /dev/null +++ b/lib/ThreadsafeQueue.h @@ -0,0 +1,176 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue, depends on C++11 + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <mutex> +#include <condition_variable> +#include <queue> +#include <utility> + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available, or a wakeup requested. + */ + +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + +template<typename T> +class ThreadsafeQueue +{ +public: + /* Push one element into the queue, and notify another thread that + * might be waiting. + * + * returns the new queue size. + */ + size_t push(T const& val) + { + std::unique_lock<std::mutex> lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + size_t push(T&& val) + { + std::unique_lock<std::mutex> lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + std::unique_lock<std::mutex> lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock<std::mutex> lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + + /* Send a notification for the receiver thread */ + void notify(void) + { + the_rx_notification.notify_one(); + } + + bool empty() const + { + std::unique_lock<std::mutex> lock(the_mutex); + return the_queue.empty(); + } + + size_t size() const + { + std::unique_lock<std::mutex> lock(the_mutex); + return the_queue.size(); + } + + bool try_pop(T& popped_value) + { + std::unique_lock<std::mutex> lock(the_mutex); + if (the_queue.empty()) { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + + return true; + } + + void wait_and_pop(T& popped_value, size_t prebuffering = 1) + { + std::unique_lock<std::mutex> lock(the_mutex); + while (the_queue.size() < prebuffering and + not wakeup_requested) { + the_rx_notification.wait(lock); + } + + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + } + } + +private: + std::queue<T> the_queue; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; + bool wakeup_requested = false; +}; + |