diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 14:10:45 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 14:10:45 +0100 | 
| commit | e3f85ddc8ea565bc81b538269298a9f97b055c8c (patch) | |
| tree | 34b8c42021261bbb4e6cbc0b9d94e59ce834ee08 | |
| parent | 8048e2e242c6029337bfe6cc056d2830b127044b (diff) | |
| download | dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.tar.gz dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.tar.bz2 dabmod-e3f85ddc8ea565bc81b538269298a9f97b055c8c.zip | |
Fix livelock in synchronous ZeroMQ scenario
| -rw-r--r-- | src/InputZeroMQReader.cpp | 11 | ||||
| -rw-r--r-- | src/OutputUHD.cpp | 2 | ||||
| -rw-r--r-- | src/ThreadsafeQueue.h | 11 | 
3 files changed, 18 insertions, 6 deletions
| diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index afb6b7d..483429b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyrigth (C) 2013 +   Copyrigth (C) 2013, 2014     Matthias P. Braendli, matthias.braendli@mpb.li   */  /* @@ -117,12 +117,15 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)              }              else              { +                workerdata->in_messages->notify(); +                  if (!buffer_full) { -                    workerdata->in_messages->notify();                      fprintf(stderr, "ZeroMQ buffer overfull !\n");                      buffer_full = true;                  } + +                queue_size = workerdata->in_messages->size();              }              if (queue_size < 5) { @@ -132,9 +135,11 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)          }      }      catch (zmq::error_t& err) { -        printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); +        fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());      } +    fprintf(stderr, "ZeroMQ input worker terminated\n"); +      subscriber.close();  } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index bd4231e..73bd38e 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -590,6 +590,8 @@ loopend:          // swap buffers          workerbuffer = (workerbuffer + 1) % 2;      } + +    uwd->logger->level(warn) << "UHD worker terminated";  } diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 3dd5450..78e9ef0 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -54,7 +54,8 @@ public:      /* Create a queue where it has to contain at least       * required_size elements before pop is possible       */ -    ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {} +    ThreadsafeQueue(size_t required_size) : the_required_size(required_size) { +    }      /* Push one element into the queue, and notify another thread that       * might be waiting. @@ -84,6 +85,11 @@ public:          return the_queue.empty();      } +    size_t size() const +    { +        return the_queue.size(); +    } +      bool try_pop(T& popped_value)      {          boost::mutex::scoped_lock lock(the_mutex); @@ -100,8 +106,7 @@ public:      void wait_and_pop(T& popped_value)      {          boost::mutex::scoped_lock lock(the_mutex); -        while(the_queue.size() < the_required_size) -        { +        while(the_queue.size() < the_required_size) {              the_condition_variable.wait(lock);          } | 
