diff options
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r-- | src/ThreadsafeQueue.h | 41 |
1 files changed, 36 insertions, 5 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e5e83ef..e27c100 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -58,14 +58,37 @@ public: size_t queue_size = the_queue.size(); lock.unlock(); - notify(); + the_rx_notification.notify_one(); return queue_size; } - void notify() + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + boost::mutex::scoped_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Send a notification for the receiver thread */ + void notify(void) { - the_condition_variable.notify_one(); + the_rx_notification.notify_one(); } bool empty() const @@ -89,6 +112,10 @@ public: popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + return true; } @@ -96,17 +123,21 @@ public: { boost::mutex::scoped_lock lock(the_mutex); while (the_queue.size() < prebuffering) { - the_condition_variable.wait(lock); + the_rx_notification.wait(lock); } popped_value = the_queue.front(); the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); } private: std::queue<T> the_queue; mutable boost::mutex the_mutex; - boost::condition_variable the_condition_variable; + boost::condition_variable the_rx_notification; + boost::condition_variable the_tx_notification; }; #endif |