diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 21:51:13 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-23 22:09:12 +0100 |
commit | d43c72adb60395ea20550d49d0310aebbdca53a3 (patch) | |
tree | dd6481427b2160fcb14074117daa878df2d60b15 /src/InputZeroMQReader.cpp | |
parent | e3f85ddc8ea565bc81b538269298a9f97b055c8c (diff) | |
download | dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.gz dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.bz2 dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.zip |
Maintain transmission frame FP relationship in ZeroMQ input
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 483429b..52f651b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) zmq::message_t incoming; subscriber.recv(&incoming); - if (queue_size < MAX_QUEUE_SIZE) { + if (m_to_drop) { + queue_size = workerdata->in_messages->size(); + if (queue_size > 4) { + workerdata->in_messages->notify(); + } + m_to_drop--; + } + else if (queue_size < MAX_QUEUE_SIZE) { if (buffer_full) { fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", queue_size); @@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) holder->move(&incoming); // move the message into the holder queue_size = workerdata->in_messages->push(holder); } - else - { + else { workerdata->in_messages->notify(); if (!buffer_full) { @@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } queue_size = workerdata->in_messages->size(); + + /* Drop three more incoming ETI frames before + * we start accepting them again, to guarantee + * that we keep transmission frame vs. ETI frame + * phase. + */ + m_to_drop = 3; } if (queue_size < 5) { |