From d43c72adb60395ea20550d49d0310aebbdca53a3 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 23 Mar 2014 21:51:13 +0100 Subject: Maintain transmission frame FP relationship in ZeroMQ input --- src/InputReader.h | 13 ++++++++++++- src/InputZeroMQReader.cpp | 19 ++++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/InputReader.h b/src/InputReader.h index de8fb78..6a7d7c3 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -143,15 +143,26 @@ class InputZeroMQWorker { public: InputZeroMQWorker() : - zmqcontext(1) {} + running(false), + zmqcontext(1), + m_to_drop(0) { } void Start(struct InputZeroMQThreadData* workerdata); void Stop(); private: void RecvProcess(struct InputZeroMQThreadData* workerdata); + bool running; zmq::context_t zmqcontext; // is thread-safe boost::thread recv_thread; + + /* We must be careful to keep frame phase consistent. If we + * drop a single ETI frame, we will break the transmission + * frame vs. ETI frame phase. + * + * Here we keep track of how many ETI frames we must drop + */ + int m_to_drop; }; class InputZeroMQReader : public InputReader diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 483429b..52f651b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) zmq::message_t incoming; subscriber.recv(&incoming); - if (queue_size < MAX_QUEUE_SIZE) { + if (m_to_drop) { + queue_size = workerdata->in_messages->size(); + if (queue_size > 4) { + workerdata->in_messages->notify(); + } + m_to_drop--; + } + else if (queue_size < MAX_QUEUE_SIZE) { if (buffer_full) { fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", queue_size); @@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) holder->move(&incoming); // move the message into the holder queue_size = workerdata->in_messages->push(holder); } - else - { + else { workerdata->in_messages->notify(); if (!buffer_full) { @@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } queue_size = workerdata->in_messages->size(); + + /* Drop three more incoming ETI frames before + * we start accepting them again, to guarantee + * that we keep transmission frame vs. ETI frame + * phase. + */ + m_to_drop = 3; } if (queue_size < 5) { -- cgit v1.2.3