aboutsummaryrefslogtreecommitdiffstats
path: root/src/ThreadsafeQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r--src/ThreadsafeQueue.h41
1 files changed, 36 insertions, 5 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index e5e83ef..e27c100 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -58,14 +58,37 @@ public:
size_t queue_size = the_queue.size();
lock.unlock();
- notify();
+ the_rx_notification.notify_one();
return queue_size;
}
- void notify()
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
{
- the_condition_variable.notify_one();
+ the_rx_notification.notify_one();
}
bool empty() const
@@ -89,6 +112,10 @@ public:
popped_value = the_queue.front();
the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
return true;
}
@@ -96,17 +123,21 @@ public:
{
boost::mutex::scoped_lock lock(the_mutex);
while (the_queue.size() < prebuffering) {
- the_condition_variable.wait(lock);
+ the_rx_notification.wait(lock);
}
popped_value = the_queue.front();
the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
}
private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
- boost::condition_variable the_condition_variable;
+ boost::condition_variable the_rx_notification;
+ boost::condition_variable the_tx_notification;
};
#endif