diff options
-rw-r--r-- | src/InputReader.h | 13 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 19 |
2 files changed, 28 insertions, 4 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index de8fb78..6a7d7c3 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -143,15 +143,26 @@ class InputZeroMQWorker { public: InputZeroMQWorker() : - zmqcontext(1) {} + running(false), + zmqcontext(1), + m_to_drop(0) { } void Start(struct InputZeroMQThreadData* workerdata); void Stop(); private: void RecvProcess(struct InputZeroMQThreadData* workerdata); + bool running; zmq::context_t zmqcontext; // is thread-safe boost::thread recv_thread; + + /* We must be careful to keep frame phase consistent. If we + * drop a single ETI frame, we will break the transmission + * frame vs. ETI frame phase. + * + * Here we keep track of how many ETI frames we must drop + */ + int m_to_drop; }; class InputZeroMQReader : public InputReader diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 483429b..52f651b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) zmq::message_t incoming; subscriber.recv(&incoming); - if (queue_size < MAX_QUEUE_SIZE) { + if (m_to_drop) { + queue_size = workerdata->in_messages->size(); + if (queue_size > 4) { + workerdata->in_messages->notify(); + } + m_to_drop--; + } + else if (queue_size < MAX_QUEUE_SIZE) { if (buffer_full) { fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", queue_size); @@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) holder->move(&incoming); // move the message into the holder queue_size = workerdata->in_messages->push(holder); } - else - { + else { workerdata->in_messages->notify(); if (!buffer_full) { @@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } queue_size = workerdata->in_messages->size(); + + /* Drop three more incoming ETI frames before + * we start accepting them again, to guarantee + * that we keep transmission frame vs. ETI frame + * phase. + */ + m_to_drop = 3; } if (queue_size < 5) { |