summaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp19
1 files changed, 16 insertions, 3 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 483429b..52f651b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
zmq::message_t incoming;
subscriber.recv(&incoming);
- if (queue_size < MAX_QUEUE_SIZE) {
+ if (m_to_drop) {
+ queue_size = workerdata->in_messages->size();
+ if (queue_size > 4) {
+ workerdata->in_messages->notify();
+ }
+ m_to_drop--;
+ }
+ else if (queue_size < MAX_QUEUE_SIZE) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);
@@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
holder->move(&incoming); // move the message into the holder
queue_size = workerdata->in_messages->push(holder);
}
- else
- {
+ else {
workerdata->in_messages->notify();
if (!buffer_full) {
@@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
queue_size = workerdata->in_messages->size();
+
+ /* Drop three more incoming ETI frames before
+ * we start accepting them again, to guarantee
+ * that we keep transmission frame vs. ETI frame
+ * phase.
+ */
+ m_to_drop = 3;
}
if (queue_size < 5) {