diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-20 12:29:24 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-20 12:29:24 +0200 |
commit | 411d03ac6b8ee1a8c06f952b9378c90516a715b7 (patch) | |
tree | 3236a6121eb9137a79b82699006df877e3876c32 /src | |
parent | 4f9b087a578fac9dffef83cdcb41573468a4ae17 (diff) | |
download | dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.gz dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.bz2 dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.zip |
ThreadsafeQueue: add wakeup event instead of custom termination markers
This avoids the issue that the ~SDR termination marker doesn't reach the
consumer because it's still prebuffering
Diffstat (limited to 'src')
-rw-r--r-- | src/EtiReader.h | 1 | ||||
-rw-r--r-- | src/FIRFilter.h | 1 | ||||
-rw-r--r-- | src/Log.cpp | 12 | ||||
-rw-r--r-- | src/Log.h | 5 | ||||
-rw-r--r-- | src/MemlessPoly.cpp | 8 | ||||
-rw-r--r-- | src/MemlessPoly.h | 6 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 35 | ||||
-rw-r--r-- | src/output/SDR.cpp | 10 | ||||
-rw-r--r-- | src/output/Soapy.h | 1 | ||||
-rw-r--r-- | src/output/USRPTime.h | 1 |
10 files changed, 46 insertions, 34 deletions
diff --git a/src/EtiReader.h b/src/EtiReader.h index dfbaaa9..554231e 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -40,7 +40,6 @@ #ifdef HAVE_EDI # include "lib/UdpSocket.h" #endif -#include "ThreadsafeQueue.h" #include <vector> #include <memory> diff --git a/src/FIRFilter.h b/src/FIRFilter.h index d04c456..e03321a 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -34,7 +34,6 @@ #include "RemoteControl.h" #include "ModPlugin.h" #include "PcDebug.h" -#include "ThreadsafeQueue.h" #include <sys/types.h> #include <complex> diff --git a/src/Log.cpp b/src/Log.cpp index ee16880..81f7955 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -82,15 +82,15 @@ void Logger::io_process() set_thread_name("logger"); while (1) { log_message_t m; - m_message_queue.wait_and_pop(m); - - auto message = m.message; - - if (m.level == trace and m.message.empty()) { - // Special message to stop thread + try { + m_message_queue.wait_and_pop(m); + } + catch (const ThreadsafeQueueWakeup&) { break; } + auto message = m.message; + /* Remove a potential trailing newline. * It doesn't look good in syslog */ @@ -138,10 +138,7 @@ class Logger { Logger(const Logger& other) = delete; const Logger& operator=(const Logger& other) = delete; ~Logger() { - // Special message to stop the thread - log_message_t m(trace, ""); - - m_message_queue.push(m); + m_message_queue.trigger_wakeup(); m_io_thread.join(); } diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index 1e19071..88a68bc 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -280,9 +280,10 @@ void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata) while (true) { worker_t::input_data_t in_data; - workerdata->in_queue.wait_and_pop(in_data); - - if (in_data.terminate) { + try { + workerdata->in_queue.wait_and_pop(in_data); + } + catch (const ThreadsafeQueueWakeup&) { break; } @@ -322,7 +323,6 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) size_t start = 0; for (auto& worker : m_workers) { worker_t::input_data_t dat; - dat.terminate = false; dat.dpd_type = m_dpd_type; dat.lut_scalefactor = m_lut_scalefactor; dat.lut = m_lut.data(); diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 4c67d46..7f00261 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -78,8 +78,6 @@ private: struct worker_t { struct input_data_t { - bool terminate = false; - dpd_type_t dpd_type; // Valid for polynomial types @@ -112,9 +110,7 @@ private: ~worker_t() { if (thread.joinable()) { - input_data_t terminate_tag; - terminate_tag.terminate = true; - in_queue.push(terminate_tag); + in_queue.trigger_wakeup(); thread.join(); } } diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index edda490..433eae3 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -40,9 +40,13 @@ * retrieves the elements. * * The queue can make the consumer block until an element - * is available. + * is available, or a wakeup requested. */ +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + template<typename T> class ThreadsafeQueue { @@ -98,6 +102,17 @@ public: return queue_size; } + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock<std::mutex> lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + /* Send a notification for the receiver thread */ void notify(void) { @@ -135,15 +150,22 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() < prebuffering) { + while (the_queue.size() < prebuffering and + not wakeup_requested) { the_rx_notification.wait(lock); } - std::swap(popped_value, the_queue.front()); - the_queue.pop(); + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); - lock.unlock(); - the_tx_notification.notify_one(); + lock.unlock(); + the_tx_notification.notify_one(); + } } private: @@ -151,5 +173,6 @@ private: mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; + bool wakeup_requested = false; }; diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp index b47433d..7c1b585 100644 --- a/src/output/SDR.cpp +++ b/src/output/SDR.cpp @@ -84,9 +84,7 @@ SDR::~SDR() { m_running.store(false); - FrameData end_marker; - end_marker.buf.clear(); - m_queue.push(end_marker); + m_queue.trigger_wakeup(); if (m_device_thread.joinable()) { m_device_thread.join(); @@ -181,7 +179,7 @@ void SDR::process_thread_entry() m_queue.wait_and_pop(frame, pop_prebuffering); etiLog.log(trace, "SDR,pop"); - if (m_running.load() == false or frame.buf.empty()) { + if (m_running.load() == false) { break; } @@ -203,8 +201,10 @@ void SDR::process_thread_entry() } } } + catch (const ThreadsafeQueueWakeup& e) { } catch (const runtime_error& e) { - etiLog.level(error) << "SDR output thread caught runtime error: " << e.what(); + etiLog.level(error) << "SDR output thread caught runtime error: " << + e.what(); } m_running.store(false); diff --git a/src/output/Soapy.h b/src/output/Soapy.h index 5c20156..3045420 100644 --- a/src/output/Soapy.h +++ b/src/output/Soapy.h @@ -48,7 +48,6 @@ DESCRIPTION: #include "ModPlugin.h" #include "EtiReader.h" #include "RemoteControl.h" -#include "ThreadsafeQueue.h" namespace Output { diff --git a/src/output/USRPTime.h b/src/output/USRPTime.h index 70e55ae..51ca800 100644 --- a/src/output/USRPTime.h +++ b/src/output/USRPTime.h @@ -48,7 +48,6 @@ DESCRIPTION: #include "output/SDR.h" #include "TimestampDecoder.h" #include "RemoteControl.h" -#include "ThreadsafeQueue.h" #include <stdio.h> #include <sys/types.h> |