aboutsummaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-23 21:51:13 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-23 22:09:12 +0100
commitd43c72adb60395ea20550d49d0310aebbdca53a3 (patch)
treedd6481427b2160fcb14074117daa878df2d60b15 /src/InputZeroMQReader.cpp
parente3f85ddc8ea565bc81b538269298a9f97b055c8c (diff)
downloaddabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.gz
dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.bz2
dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.zip
Maintain transmission frame FP relationship in ZeroMQ input
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp19
1 files changed, 16 insertions, 3 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 483429b..52f651b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
zmq::message_t incoming;
subscriber.recv(&incoming);
- if (queue_size < MAX_QUEUE_SIZE) {
+ if (m_to_drop) {
+ queue_size = workerdata->in_messages->size();
+ if (queue_size > 4) {
+ workerdata->in_messages->notify();
+ }
+ m_to_drop--;
+ }
+ else if (queue_size < MAX_QUEUE_SIZE) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);
@@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
holder->move(&incoming); // move the message into the holder
queue_size = workerdata->in_messages->push(holder);
}
- else
- {
+ else {
workerdata->in_messages->notify();
if (!buffer_full) {
@@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
queue_size = workerdata->in_messages->size();
+
+ /* Drop three more incoming ETI frames before
+ * we start accepting them again, to guarantee
+ * that we keep transmission frame vs. ETI frame
+ * phase.
+ */
+ m_to_drop = 3;
}
if (queue_size < 5) {