summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-23 21:51:13 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-23 22:09:12 +0100
commitd43c72adb60395ea20550d49d0310aebbdca53a3 (patch)
treedd6481427b2160fcb14074117daa878df2d60b15
parente3f85ddc8ea565bc81b538269298a9f97b055c8c (diff)
downloaddabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.gz
dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.tar.bz2
dabmod-d43c72adb60395ea20550d49d0310aebbdca53a3.zip
Maintain transmission frame FP relationship in ZeroMQ input
-rw-r--r--src/InputReader.h13
-rw-r--r--src/InputZeroMQReader.cpp19
2 files changed, 28 insertions, 4 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index de8fb78..6a7d7c3 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -143,15 +143,26 @@ class InputZeroMQWorker
{
public:
InputZeroMQWorker() :
- zmqcontext(1) {}
+ running(false),
+ zmqcontext(1),
+ m_to_drop(0) { }
void Start(struct InputZeroMQThreadData* workerdata);
void Stop();
private:
void RecvProcess(struct InputZeroMQThreadData* workerdata);
+
bool running;
zmq::context_t zmqcontext; // is thread-safe
boost::thread recv_thread;
+
+ /* We must be careful to keep frame phase consistent. If we
+ * drop a single ETI frame, we will break the transmission
+ * frame vs. ETI frame phase.
+ *
+ * Here we keep track of how many ETI frames we must drop
+ */
+ int m_to_drop;
};
class InputZeroMQReader : public InputReader
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 483429b..52f651b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -104,7 +104,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
zmq::message_t incoming;
subscriber.recv(&incoming);
- if (queue_size < MAX_QUEUE_SIZE) {
+ if (m_to_drop) {
+ queue_size = workerdata->in_messages->size();
+ if (queue_size > 4) {
+ workerdata->in_messages->notify();
+ }
+ m_to_drop--;
+ }
+ else if (queue_size < MAX_QUEUE_SIZE) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);
@@ -115,8 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
holder->move(&incoming); // move the message into the holder
queue_size = workerdata->in_messages->push(holder);
}
- else
- {
+ else {
workerdata->in_messages->notify();
if (!buffer_full) {
@@ -126,6 +132,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
queue_size = workerdata->in_messages->size();
+
+ /* Drop three more incoming ETI frames before
+ * we start accepting them again, to guarantee
+ * that we keep transmission frame vs. ETI frame
+ * phase.
+ */
+ m_to_drop = 3;
}
if (queue_size < 5) {