diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 21:51:13 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 22:09:12 +0100 | 
| commit | d43c72adb60395ea20550d49d0310aebbdca53a3 (patch) | |
| tree | dd6481427b2160fcb14074117daa878df2d60b15 | |
| parent | e3f85ddc8ea565bc81b538269298a9f97b055c8c (diff) | |
| download | dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.gz dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.bz2 dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.zip | |
Maintain transmission frame FP relationship in ZeroMQ input
| -rw-r--r-- | src/InputReader.h | 13 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 19 | 
2 files changed, 28 insertions, 4 deletions
| diff --git a/src/InputReader.h b/src/InputReader.h index de8fb78..6a7d7c3 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -143,15 +143,26 @@ class InputZeroMQWorker  {      public:          InputZeroMQWorker() : -            zmqcontext(1)  {} +            running(false), +            zmqcontext(1), +            m_to_drop(0) { }          void Start(struct InputZeroMQThreadData* workerdata);          void Stop();      private:          void RecvProcess(struct InputZeroMQThreadData* workerdata); +          bool running;          zmq::context_t zmqcontext; // is thread-safe          boost::thread recv_thread; + +        /* We must be careful to keep frame phase consistent. If we +         * drop a single ETI frame, we will break the transmission +         * frame vs. ETI frame phase. +         * +         * Here we keep track of how many ETI frames we must drop +         */ +        int m_to_drop;  };  class InputZeroMQReader : public InputReader diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 483429b..52f651b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)              zmq::message_t incoming;              subscriber.recv(&incoming); -            if (queue_size < MAX_QUEUE_SIZE) { +            if (m_to_drop) { +                queue_size = workerdata->in_messages->size(); +                if (queue_size > 4) { +                    workerdata->in_messages->notify(); +                } +                m_to_drop--; +            } +            else if (queue_size < MAX_QUEUE_SIZE) {                  if (buffer_full) {                      fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",                              queue_size); @@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                  holder->move(&incoming); // move the message into the holder                  queue_size = workerdata->in_messages->push(holder);              } -            else -            { +            else {                  workerdata->in_messages->notify();                  if (!buffer_full) { @@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                  }                  queue_size = workerdata->in_messages->size(); + +                /* Drop three more incoming ETI frames before +                 * we start accepting them again, to guarantee +                 * that we keep transmission frame vs. ETI frame +                 * phase. +                 */ +                m_to_drop = 3;              }              if (queue_size < 5) { | 
