diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-10-07 04:52:50 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-10-07 04:52:50 +0200 |
commit | 0aa6201a490bfabc0ae021ceb9f1fb0f46727c5d (patch) | |
tree | a71d934d8151dd43e7c16555fef17fd749c5da70 /src/ThreadsafeQueue.h | |
parent | 558d74bffd9f069955af52c0b308a1d6169bcff0 (diff) | |
parent | 0330221d51421caa110b8c5dcb567cc3d0620eb9 (diff) | |
download | dabmod-0aa6201a490bfabc0ae021ceb9f1fb0f46727c5d.tar.gz dabmod-0aa6201a490bfabc0ae021ceb9f1fb0f46727c5d.tar.bz2 dabmod-0aa6201a490bfabc0ae021ceb9f1fb0f46727c5d.zip |
Merge lime output into next branch
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r-- | src/ThreadsafeQueue.h | 178 |
1 files changed, 0 insertions, 178 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ee26ca0..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - An implementation for a threadsafe queue, depends on C++11 - - When creating a ThreadsafeQueue, one can specify the minimal number - of elements it must contain before it is possible to take one - element out. - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <mutex> -#include <condition_variable> -#include <queue> -#include <utility> - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template<typename T> -class ThreadsafeQueue -{ -public: - /* Push one element into the queue, and notify another thread that - * might be waiting. - * - * returns the new queue size. - */ - size_t push(T const& val) - { - std::unique_lock<std::mutex> lock(the_mutex); - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - size_t push(T&& val) - { - std::unique_lock<std::mutex> lock(the_mutex); - the_queue.emplace(std::move(val)); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Push one element into the queue, but wait until the - * queue size goes below the threshold. - * - * Notify waiting thread. - * - * returns the new queue size. - */ - size_t push_wait_if_full(T const& val, size_t threshold) - { - std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() >= threshold) { - the_tx_notification.wait(lock); - } - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Trigger a wakeup event on a blocking consumer, which - * will receive a ThreadsafeQueueWakeup exception. - */ - void trigger_wakeup(void) - { - std::unique_lock<std::mutex> lock(the_mutex); - wakeup_requested = true; - lock.unlock(); - the_rx_notification.notify_one(); - } - - /* Send a notification for the receiver thread */ - void notify(void) - { - the_rx_notification.notify_one(); - } - - bool empty() const - { - std::unique_lock<std::mutex> lock(the_mutex); - return the_queue.empty(); - } - - size_t size() const - { - std::unique_lock<std::mutex> lock(the_mutex); - return the_queue.size(); - } - - bool try_pop(T& popped_value) - { - std::unique_lock<std::mutex> lock(the_mutex); - if (the_queue.empty()) { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - - return true; - } - - void wait_and_pop(T& popped_value, size_t prebuffering = 1) - { - std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() < prebuffering and - not wakeup_requested) { - the_rx_notification.wait(lock); - } - - if (wakeup_requested) { - wakeup_requested = false; - throw ThreadsafeQueueWakeup(); - } - else { - std::swap(popped_value, the_queue.front()); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - } - } - -private: - std::queue<T> the_queue; - mutable std::mutex the_mutex; - std::condition_variable the_rx_notification; - std::condition_variable the_tx_notification; - bool wakeup_requested = false; -}; - |