diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 14:10:45 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 14:10:45 +0100 |
commit | e3f85ddc8ea565bc81b538269298a9f97b055c8c (patch) | |
tree | 34b8c42021261bbb4e6cbc0b9d94e59ce834ee08 | |
parent | 8048e2e242c6029337bfe6cc056d2830b127044b (diff) | |
download | dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.tar.gz dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.tar.bz2 dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.zip |
Fix livelock in synchronous ZeroMQ scenario
-rw-r--r-- | src/InputZeroMQReader.cpp | 11 | ||||
-rw-r--r-- | src/OutputUHD.cpp | 2 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 11 |
3 files changed, 18 insertions, 6 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index afb6b7d..483429b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyrigth (C) 2013 + Copyrigth (C) 2013, 2014 Matthias P. Braendli, matthias.braendli@mpb.li */ /* @@ -117,12 +117,15 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } else { + workerdata->in_messages->notify(); + if (!buffer_full) { - workerdata->in_messages->notify(); fprintf(stderr, "ZeroMQ buffer overfull !\n"); buffer_full = true; } + + queue_size = workerdata->in_messages->size(); } if (queue_size < 5) { @@ -132,9 +135,11 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } } catch (zmq::error_t& err) { - printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); + fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + fprintf(stderr, "ZeroMQ input worker terminated\n"); + subscriber.close(); } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index bd4231e..73bd38e 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -590,6 +590,8 @@ loopend: // swap buffers workerbuffer = (workerbuffer + 1) % 2; } + + uwd->logger->level(warn) << "UHD worker terminated"; } diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 3dd5450..78e9ef0 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -54,7 +54,8 @@ public: /* Create a queue where it has to contain at least * required_size elements before pop is possible */ - ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {} + ThreadsafeQueue(size_t required_size) : the_required_size(required_size) { + } /* Push one element into the queue, and notify another thread that * might be waiting. @@ -84,6 +85,11 @@ public: return the_queue.empty(); } + size_t size() const + { + return the_queue.size(); + } + bool try_pop(T& popped_value) { boost::mutex::scoped_lock lock(the_mutex); @@ -100,8 +106,7 @@ public: void wait_and_pop(T& popped_value) { boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.size() < the_required_size) - { + while(the_queue.size() < the_required_size) { the_condition_variable.wait(lock); } |