From 23999721c627200ca17b0a090239315450affe75 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 11 Sep 2016 22:48:20 +0200 Subject: Add missing ThreadsafeQueue.h --- src/Makefile.am | 1 + src/ThreadsafeQueue.h | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 src/ThreadsafeQueue.h (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 562c6db..87d3eec 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -95,6 +95,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ RemoteControl.cpp RemoteControl.h \ TcpSocket.h TcpSocket.cpp \ UdpSocket.h UdpSocket.cpp \ + ThreadsafeQueue.h \ bridge.h bridge.c \ crc.h crc.c \ fig/FIG.h fig/FIG.cpp \ diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h new file mode 100644 index 0000000..d40c472 --- /dev/null +++ b/src/ThreadsafeQueue.h @@ -0,0 +1,144 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2013, 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue using boost thread library + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#ifndef THREADSAFE_QUEUE_H +#define THREADSAFE_QUEUE_H + +#include +#include + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available. + */ + +template +class ThreadsafeQueue +{ +public: + /* Push one element into the queue, and notify another thread that + * might be waiting. + * + * returns the new queue size. + */ + size_t push(T const& val) + { + boost::mutex::scoped_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + boost::mutex::scoped_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Send a notification for the receiver thread */ + void notify(void) + { + the_rx_notification.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(the_mutex); + return the_queue.empty(); + } + + size_t size() const + { + boost::mutex::scoped_lock lock(the_mutex); + return the_queue.size(); + } + + bool try_pop(T& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + if (the_queue.empty()) { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + + return true; + } + + void wait_and_pop(T& popped_value, size_t prebuffering = 1) + { + boost::mutex::scoped_lock lock(the_mutex); + while (the_queue.size() < prebuffering) { + the_rx_notification.wait(lock); + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + } + +private: + std::queue the_queue; + mutable boost::mutex the_mutex; + boost::condition_variable the_rx_notification; + boost::condition_variable the_tx_notification; +}; + +#endif + -- cgit v1.2.3