diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ThreadsafeQueue.h | 45 | 
1 files changed, 28 insertions, 17 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index d40c472..4524559 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -2,10 +2,10 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2013, 2014 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li -   An implementation for a threadsafe queue using boost thread library +   An implementation for a threadsafe queue using std::thread     When creating a ThreadsafeQueue, one can specify the minimal number     of elements it must contain before it is possible to take one @@ -28,11 +28,12 @@     along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.   */ -#ifndef THREADSAFE_QUEUE_H -#define THREADSAFE_QUEUE_H +#pragma once -#include <boost/thread.hpp> +#include <mutex> +#include <condition_variable>  #include <queue> +#include <utility>  /* This queue is meant to be used by two threads. One producer   * that pushes elements into the queue, and one consumer that @@ -53,7 +54,7 @@ public:       */      size_t push(T const& val)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          the_queue.push(val);          size_t queue_size = the_queue.size();          lock.unlock(); @@ -63,6 +64,18 @@ public:          return queue_size;      } +    size_t push(T&& val) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        the_queue.emplace(std::move(val)); +        size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return queue_size; +    } +      /* Push one element into the queue, but wait until the       * queue size goes below the threshold.       * @@ -72,7 +85,7 @@ public:       */      size_t push_wait_if_full(T const& val, size_t threshold)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          while (the_queue.size() >= threshold) {              the_tx_notification.wait(lock);          } @@ -93,19 +106,19 @@ public:      bool empty() const      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          return the_queue.empty();      }      size_t size() const      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          return the_queue.size();      }      bool try_pop(T& popped_value)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          if (the_queue.empty()) {              return false;          } @@ -121,12 +134,12 @@ public:      void wait_and_pop(T& popped_value, size_t prebuffering = 1)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          while (the_queue.size() < prebuffering) {              the_rx_notification.wait(lock);          } -        popped_value = the_queue.front(); +        std::swap(popped_value, the_queue.front());          the_queue.pop();          lock.unlock(); @@ -135,10 +148,8 @@ public:  private:      std::queue<T> the_queue; -    mutable boost::mutex the_mutex; -    boost::condition_variable the_rx_notification; -    boost::condition_variable the_tx_notification; +    mutable std::mutex the_mutex; +    std::condition_variable the_rx_notification; +    std::condition_variable the_tx_notification;  }; -#endif -  | 
