From 411d03ac6b8ee1a8c06f952b9378c90516a715b7 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 20 Apr 2018 12:29:24 +0200 Subject: ThreadsafeQueue: add wakeup event instead of custom termination markers This avoids the issue that the ~SDR termination marker doesn't reach the consumer because it's still prebuffering --- src/EtiReader.h | 1 - src/FIRFilter.h | 1 - src/Log.cpp | 12 ++++++------ src/Log.h | 5 +---- src/MemlessPoly.cpp | 8 ++++---- src/MemlessPoly.h | 6 +----- src/ThreadsafeQueue.h | 35 +++++++++++++++++++++++++++++------ src/output/SDR.cpp | 10 +++++----- src/output/Soapy.h | 1 - src/output/USRPTime.h | 1 - 10 files changed, 46 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/EtiReader.h b/src/EtiReader.h index dfbaaa9..554231e 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -40,7 +40,6 @@ #ifdef HAVE_EDI # include "lib/UdpSocket.h" #endif -#include "ThreadsafeQueue.h" #include #include diff --git a/src/FIRFilter.h b/src/FIRFilter.h index d04c456..e03321a 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -34,7 +34,6 @@ #include "RemoteControl.h" #include "ModPlugin.h" #include "PcDebug.h" -#include "ThreadsafeQueue.h" #include #include diff --git a/src/Log.cpp b/src/Log.cpp index ee16880..81f7955 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -82,15 +82,15 @@ void Logger::io_process() set_thread_name("logger"); while (1) { log_message_t m; - m_message_queue.wait_and_pop(m); - - auto message = m.message; - - if (m.level == trace and m.message.empty()) { - // Special message to stop thread + try { + m_message_queue.wait_and_pop(m); + } + catch (const ThreadsafeQueueWakeup&) { break; } + auto message = m.message; + /* Remove a potential trailing newline. * It doesn't look good in syslog */ diff --git a/src/Log.h b/src/Log.h index 0e09bc9..df6e07b 100644 --- a/src/Log.h +++ b/src/Log.h @@ -138,10 +138,7 @@ class Logger { Logger(const Logger& other) = delete; const Logger& operator=(const Logger& other) = delete; ~Logger() { - // Special message to stop the thread - log_message_t m(trace, ""); - - m_message_queue.push(m); + m_message_queue.trigger_wakeup(); m_io_thread.join(); } diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index 1e19071..88a68bc 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -280,9 +280,10 @@ void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata) while (true) { worker_t::input_data_t in_data; - workerdata->in_queue.wait_and_pop(in_data); - - if (in_data.terminate) { + try { + workerdata->in_queue.wait_and_pop(in_data); + } + catch (const ThreadsafeQueueWakeup&) { break; } @@ -322,7 +323,6 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) size_t start = 0; for (auto& worker : m_workers) { worker_t::input_data_t dat; - dat.terminate = false; dat.dpd_type = m_dpd_type; dat.lut_scalefactor = m_lut_scalefactor; dat.lut = m_lut.data(); diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 4c67d46..7f00261 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -78,8 +78,6 @@ private: struct worker_t { struct input_data_t { - bool terminate = false; - dpd_type_t dpd_type; // Valid for polynomial types @@ -112,9 +110,7 @@ private: ~worker_t() { if (thread.joinable()) { - input_data_t terminate_tag; - terminate_tag.terminate = true; - in_queue.push(terminate_tag); + in_queue.trigger_wakeup(); thread.join(); } } diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index edda490..433eae3 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -40,9 +40,13 @@ * retrieves the elements. * * The queue can make the consumer block until an element - * is available. + * is available, or a wakeup requested. */ +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + template class ThreadsafeQueue { @@ -98,6 +102,17 @@ public: return queue_size; } + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + /* Send a notification for the receiver thread */ void notify(void) { @@ -135,15 +150,22 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { std::unique_lock lock(the_mutex); - while (the_queue.size() < prebuffering) { + while (the_queue.size() < prebuffering and + not wakeup_requested) { the_rx_notification.wait(lock); } - std::swap(popped_value, the_queue.front()); - the_queue.pop(); + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); - lock.unlock(); - the_tx_notification.notify_one(); + lock.unlock(); + the_tx_notification.notify_one(); + } } private: @@ -151,5 +173,6 @@ private: mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; + bool wakeup_requested = false; }; diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp index b47433d..7c1b585 100644 --- a/src/output/SDR.cpp +++ b/src/output/SDR.cpp @@ -84,9 +84,7 @@ SDR::~SDR() { m_running.store(false); - FrameData end_marker; - end_marker.buf.clear(); - m_queue.push(end_marker); + m_queue.trigger_wakeup(); if (m_device_thread.joinable()) { m_device_thread.join(); @@ -181,7 +179,7 @@ void SDR::process_thread_entry() m_queue.wait_and_pop(frame, pop_prebuffering); etiLog.log(trace, "SDR,pop"); - if (m_running.load() == false or frame.buf.empty()) { + if (m_running.load() == false) { break; } @@ -203,8 +201,10 @@ void SDR::process_thread_entry() } } } + catch (const ThreadsafeQueueWakeup& e) { } catch (const runtime_error& e) { - etiLog.level(error) << "SDR output thread caught runtime error: " << e.what(); + etiLog.level(error) << "SDR output thread caught runtime error: " << + e.what(); } m_running.store(false); diff --git a/src/output/Soapy.h b/src/output/Soapy.h index 5c20156..3045420 100644 --- a/src/output/Soapy.h +++ b/src/output/Soapy.h @@ -48,7 +48,6 @@ DESCRIPTION: #include "ModPlugin.h" #include "EtiReader.h" #include "RemoteControl.h" -#include "ThreadsafeQueue.h" namespace Output { diff --git a/src/output/USRPTime.h b/src/output/USRPTime.h index 70e55ae..51ca800 100644 --- a/src/output/USRPTime.h +++ b/src/output/USRPTime.h @@ -48,7 +48,6 @@ DESCRIPTION: #include "output/SDR.h" #include "TimestampDecoder.h" #include "RemoteControl.h" -#include "ThreadsafeQueue.h" #include #include -- cgit v1.2.3