diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-06-05 09:15:56 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-06-05 09:22:22 +0200 |
commit | 71eb84d5f483af8d22402de3d2ec70b08b5802d3 (patch) | |
tree | 589fd978bdc357a0c67179d83c5c04bb0d732082 /src/InputZeroMQReader.cpp | |
parent | c9ea7fb88809d935f3eaeee108415ff5abd17ead (diff) | |
download | dabmod-71eb84d5f483af8d22402de3d2ec70b08b5802d3.tar.gz dabmod-71eb84d5f483af8d22402de3d2ec70b08b5802d3.tar.bz2 dabmod-71eb84d5f483af8d22402de3d2ec70b08b5802d3.zip |
Fix intermittent underruns after a restart
When using UHD without synchronous=1:
After a modulator restart, the input ZMQ buffer is nearly empty, and
the modulator actually gets blocked by the ThreadsafeQueue. This is
visible by looking at the time deltas printed in the debugging code.
This commit adds a minimal form of prebuffering (10 ETI frames) to the
ZeroMQ input, and removes the useless minimum required size for the
ThreadsafeQueue.
In synchronous=1, the issue is not visible because the TIST defines the
time to transmit, which will cause the ZMQ buffer to fill.
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 32 |
1 files changed, 30 insertions, 2 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 8706e1e..e95644a 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -41,6 +41,7 @@ #include "porting.h" #include "InputReader.h" #include "PcDebug.h" +#include "Utils.h" #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, @@ -86,7 +87,34 @@ int InputZeroMQReader::GetNextFrame(void* buffer) const size_t framesize = 6144; boost::shared_ptr<std::vector<uint8_t> > incoming; - in_messages_.wait_and_pop(incoming); + + struct timespec time_before; + int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before); + + /* Do some prebuffering because reads will happen in bursts + * (4 ETI frames in TM1) and we should make sure that + * we can serve the data required for a full transmission frame. + */ + if (in_messages_.size() < 4) { + const size_t prebuffering = 10; + in_messages_.wait_and_pop(incoming, prebuffering); + } + else { + in_messages_.wait_and_pop(incoming); + } + + struct timespec time_after; + int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after); + + if (time_before_ret == 0 and time_after_ret == 0) { + etiLog.level(debug) << "ZMQ Time delta : " << + timespecdiff_us(time_before, time_after) << " us, queue " << + in_messages_.size(); + } + else { + etiLog.level(error) << "ZMQ Time delta failed " << + time_before_ret << " " << time_after_ret; + } if (! workerdata_.running) { throw zmq_input_overflow(); @@ -193,7 +221,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << "elements !"; + etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } |