aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:29:24 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:29:24 +0200
commit411d03ac6b8ee1a8c06f952b9378c90516a715b7 (patch)
tree3236a6121eb9137a79b82699006df877e3876c32
parent4f9b087a578fac9dffef83cdcb41573468a4ae17 (diff)
downloaddabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.gz
dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.tar.bz2
dabmod-411d03ac6b8ee1a8c06f952b9378c90516a715b7.zip
ThreadsafeQueue: add wakeup event instead of custom termination markers
This avoids the issue that the ~SDR termination marker doesn't reach the consumer because it's still prebuffering
-rw-r--r--src/EtiReader.h1
-rw-r--r--src/FIRFilter.h1
-rw-r--r--src/Log.cpp12
-rw-r--r--src/Log.h5
-rw-r--r--src/MemlessPoly.cpp8
-rw-r--r--src/MemlessPoly.h6
-rw-r--r--src/ThreadsafeQueue.h35
-rw-r--r--src/output/SDR.cpp10
-rw-r--r--src/output/Soapy.h1
-rw-r--r--src/output/USRPTime.h1
10 files changed, 46 insertions, 34 deletions
diff --git a/src/EtiReader.h b/src/EtiReader.h
index dfbaaa9..554231e 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -40,7 +40,6 @@
#ifdef HAVE_EDI
# include "lib/UdpSocket.h"
#endif
-#include "ThreadsafeQueue.h"
#include <vector>
#include <memory>
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index d04c456..e03321a 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -34,7 +34,6 @@
#include "RemoteControl.h"
#include "ModPlugin.h"
#include "PcDebug.h"
-#include "ThreadsafeQueue.h"
#include <sys/types.h>
#include <complex>
diff --git a/src/Log.cpp b/src/Log.cpp
index ee16880..81f7955 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -82,15 +82,15 @@ void Logger::io_process()
set_thread_name("logger");
while (1) {
log_message_t m;
- m_message_queue.wait_and_pop(m);
-
- auto message = m.message;
-
- if (m.level == trace and m.message.empty()) {
- // Special message to stop thread
+ try {
+ m_message_queue.wait_and_pop(m);
+ }
+ catch (const ThreadsafeQueueWakeup&) {
break;
}
+ auto message = m.message;
+
/* Remove a potential trailing newline.
* It doesn't look good in syslog
*/
diff --git a/src/Log.h b/src/Log.h
index 0e09bc9..df6e07b 100644
--- a/src/Log.h
+++ b/src/Log.h
@@ -138,10 +138,7 @@ class Logger {
Logger(const Logger& other) = delete;
const Logger& operator=(const Logger& other) = delete;
~Logger() {
- // Special message to stop the thread
- log_message_t m(trace, "");
-
- m_message_queue.push(m);
+ m_message_queue.trigger_wakeup();
m_io_thread.join();
}
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp
index 1e19071..88a68bc 100644
--- a/src/MemlessPoly.cpp
+++ b/src/MemlessPoly.cpp
@@ -280,9 +280,10 @@ void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata)
while (true) {
worker_t::input_data_t in_data;
- workerdata->in_queue.wait_and_pop(in_data);
-
- if (in_data.terminate) {
+ try {
+ workerdata->in_queue.wait_and_pop(in_data);
+ }
+ catch (const ThreadsafeQueueWakeup&) {
break;
}
@@ -322,7 +323,6 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)
size_t start = 0;
for (auto& worker : m_workers) {
worker_t::input_data_t dat;
- dat.terminate = false;
dat.dpd_type = m_dpd_type;
dat.lut_scalefactor = m_lut_scalefactor;
dat.lut = m_lut.data();
diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h
index 4c67d46..7f00261 100644
--- a/src/MemlessPoly.h
+++ b/src/MemlessPoly.h
@@ -78,8 +78,6 @@ private:
struct worker_t {
struct input_data_t {
- bool terminate = false;
-
dpd_type_t dpd_type;
// Valid for polynomial types
@@ -112,9 +110,7 @@ private:
~worker_t() {
if (thread.joinable()) {
- input_data_t terminate_tag;
- terminate_tag.terminate = true;
- in_queue.push(terminate_tag);
+ in_queue.trigger_wakeup();
thread.join();
}
}
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index edda490..433eae3 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -40,9 +40,13 @@
* retrieves the elements.
*
* The queue can make the consumer block until an element
- * is available.
+ * is available, or a wakeup requested.
*/
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
template<typename T>
class ThreadsafeQueue
{
@@ -98,6 +102,17 @@ public:
return queue_size;
}
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
/* Send a notification for the receiver thread */
void notify(void)
{
@@ -135,15 +150,22 @@ public:
void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering) {
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
the_rx_notification.wait(lock);
}
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
- lock.unlock();
- the_tx_notification.notify_one();
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
}
private:
@@ -151,5 +173,6 @@ private:
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
};
diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp
index b47433d..7c1b585 100644
--- a/src/output/SDR.cpp
+++ b/src/output/SDR.cpp
@@ -84,9 +84,7 @@ SDR::~SDR()
{
m_running.store(false);
- FrameData end_marker;
- end_marker.buf.clear();
- m_queue.push(end_marker);
+ m_queue.trigger_wakeup();
if (m_device_thread.joinable()) {
m_device_thread.join();
@@ -181,7 +179,7 @@ void SDR::process_thread_entry()
m_queue.wait_and_pop(frame, pop_prebuffering);
etiLog.log(trace, "SDR,pop");
- if (m_running.load() == false or frame.buf.empty()) {
+ if (m_running.load() == false) {
break;
}
@@ -203,8 +201,10 @@ void SDR::process_thread_entry()
}
}
}
+ catch (const ThreadsafeQueueWakeup& e) { }
catch (const runtime_error& e) {
- etiLog.level(error) << "SDR output thread caught runtime error: " << e.what();
+ etiLog.level(error) << "SDR output thread caught runtime error: " <<
+ e.what();
}
m_running.store(false);
diff --git a/src/output/Soapy.h b/src/output/Soapy.h
index 5c20156..3045420 100644
--- a/src/output/Soapy.h
+++ b/src/output/Soapy.h
@@ -48,7 +48,6 @@ DESCRIPTION:
#include "ModPlugin.h"
#include "EtiReader.h"
#include "RemoteControl.h"
-#include "ThreadsafeQueue.h"
namespace Output {
diff --git a/src/output/USRPTime.h b/src/output/USRPTime.h
index 70e55ae..51ca800 100644
--- a/src/output/USRPTime.h
+++ b/src/output/USRPTime.h
@@ -48,7 +48,6 @@ DESCRIPTION:
#include "output/SDR.h"
#include "TimestampDecoder.h"
#include "RemoteControl.h"
-#include "ThreadsafeQueue.h"
#include <stdio.h>
#include <sys/types.h>