aboutsummaryrefslogtreecommitdiffstats
path: root/src/ThreadsafeQueue.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:29:24 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:29:24 +0200
commit411d03ac6b8ee1a8c06f952b9378c90516a715b7 (patch)
tree3236a6121eb9137a79b82699006df877e3876c32 /src/ThreadsafeQueue.h
parent4f9b087a578fac9dffef83cdcb41573468a4ae17 (diff)
downloaddabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.gz
dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.bz2
dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.zip
ThreadsafeQueue: add wakeup event instead of custom termination markers
This avoids the issue that the ~SDR termination marker doesn't reach the consumer because it's still prebuffering
Diffstat (limited to 'src/ThreadsafeQueue.h')
-rw-r--r--src/ThreadsafeQueue.h35
1 files changed, 29 insertions, 6 deletions
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index edda490..433eae3 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -40,9 +40,13 @@
* retrieves the elements.
*
* The queue can make the consumer block until an element
- * is available.
+ * is available, or a wakeup requested.
*/
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
template<typename T>
class ThreadsafeQueue
{
@@ -98,6 +102,17 @@ public:
return queue_size;
}
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
/* Send a notification for the receiver thread */
void notify(void)
{
@@ -135,15 +150,22 @@ public:
void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering) {
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
the_rx_notification.wait(lock);
}
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
- lock.unlock();
- the_tx_notification.notify_one();
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
}
private:
@@ -151,5 +173,6 @@ private:
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
};