summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-23 14:10:45 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-23 14:10:45 +0100
commite3f85ddc8ea565bc81b538269298a9f97b055c8c (patch)
tree34b8c42021261bbb4e6cbc0b9d94e59ce834ee08
parent8048e2e242c6029337bfe6cc056d2830b127044b (diff)
downloaddabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.tar.gz
dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.tar.bz2
dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.zip
Fix livelock in synchronous ZeroMQ scenario
-rw-r--r--src/InputZeroMQReader.cpp11
-rw-r--r--src/OutputUHD.cpp2
-rw-r--r--src/ThreadsafeQueue.h11
3 files changed, 18 insertions, 6 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index afb6b7d..483429b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013
+ Copyrigth (C) 2013, 2014
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -117,12 +117,15 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
else
{
+ workerdata->in_messages->notify();
+
if (!buffer_full) {
- workerdata->in_messages->notify();
fprintf(stderr, "ZeroMQ buffer overfull !\n");
buffer_full = true;
}
+
+ queue_size = workerdata->in_messages->size();
}
if (queue_size < 5) {
@@ -132,9 +135,11 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
}
catch (zmq::error_t& err) {
- printf("ZeroMQ error in RecvProcess: '%s'\n", err.what());
+ fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
+ fprintf(stderr, "ZeroMQ input worker terminated\n");
+
subscriber.close();
}
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index bd4231e..73bd38e 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -590,6 +590,8 @@ loopend:
// swap buffers
workerbuffer = (workerbuffer + 1) % 2;
}
+
+ uwd->logger->level(warn) << "UHD worker terminated";
}
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index 3dd5450..78e9ef0 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -54,7 +54,8 @@ public:
/* Create a queue where it has to contain at least
* required_size elements before pop is possible
*/
- ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {}
+ ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {
+ }
/* Push one element into the queue, and notify another thread that
* might be waiting.
@@ -84,6 +85,11 @@ public:
return the_queue.empty();
}
+ size_t size() const
+ {
+ return the_queue.size();
+ }
+
bool try_pop(T& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
@@ -100,8 +106,7 @@ public:
void wait_and_pop(T& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
- while(the_queue.size() < the_required_size)
- {
+ while(the_queue.size() < the_required_size) {
the_condition_variable.wait(lock);
}