diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-08-27 11:02:23 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-08-27 11:02:23 +0200 |
commit | 0298076eea4f92685f9a01974261da41c2a01e5b (patch) | |
tree | f72831cd212da64c1a3ea124bb9b590ced8558f2 /src/input/Edi.cpp | |
parent | cac39dedee89d62ebf5d0135b84ccaa2e387a7cb (diff) | |
download | dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.tar.gz dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.tar.bz2 dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.zip |
EDI input: add new buffer management
Diffstat (limited to 'src/input/Edi.cpp')
-rw-r--r-- | src/input/Edi.cpp | 158 |
1 files changed, 124 insertions, 34 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 2d82902..95fac53 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -42,8 +42,19 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; + +/* Absolute max number of frames to be queued, both with and without timestamping */ constexpr size_t MAX_FRAMES_QUEUED = 1000; +/* When using timestamping, start discarding the front of the queue once the queue + * is this full. Must be smaller than MAX_FRAMES_QUEUED. */ +constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; + +/* When not using timestamping, how many frames to prebuffer. + * TODO should be configurable as ZMQ. + */ +constexpr size_t NUM_FRAMES_PREBUFFERING = 10; + Edi::Edi() : m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), @@ -93,43 +104,30 @@ void Edi::open(const std::string& name) m_thread = std::thread(&Edi::m_run, this); } -int Edi::readFrame(uint8_t* buffer, size_t size) +size_t Edi::readFrame(uint8_t *buffer, size_t size) { - if (m_pending_sti_frame.frame.empty()) { - m_frames.try_pop(m_pending_sti_frame); + EdiDecoder::sti_frame_t sti; + if (m_is_prebuffering) { + m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; + if (not m_is_prebuffering) { + etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + } + memset(buffer, 0, size * sizeof(*buffer)); + return 0; } - - if (not m_pending_sti_frame.frame.empty()) { + else if (not m_pending_sti_frame.frame.empty()) { if (m_pending_sti_frame.frame.size() != size) { etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << m_pending_sti_frame.frame.size() << " received, " << size << " requested"; memset(buffer, 0, size * sizeof(*buffer)); + return 0; } else { - const auto now = chrono::system_clock::now(); - - if (m_pending_sti_frame.timestamp.to_system_clock() <= now) { - etiLog.level(debug) << "EDI input take frame with TS " << - m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); - - std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); - m_pending_sti_frame.frame.clear(); - } - else { - etiLog.level(debug) << "EDI input skip frame with TS " << - m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); - } - } - } - - return size; - -#if 0 - EdiDecoder::sti_frame_t sti; - if (m_is_prebuffering) { - m_is_prebuffering = m_frames.size() < 10; - if (not m_is_prebuffering) { - etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + copy(m_pending_sti_frame.frame.begin(), + m_pending_sti_frame.frame.end(), + buffer); + m_pending_sti_frame.frame.clear(); + return size; } } else if (m_frames.try_pop(sti)) { @@ -138,21 +136,113 @@ int Edi::readFrame(uint8_t* buffer, size_t size) return 0; } else if (sti.frame.size() == size) { - std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer); + copy(sti.frame.cbegin(), sti.frame.cend(), buffer); + return size; } else { - etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() << - " received, " << size << " requested"; + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + sti.frame.size() << " received, " << size << " requested"; memset(buffer, 0, size * sizeof(*buffer)); + return 0; } } else { memset(buffer, 0, size * sizeof(*buffer)); m_is_prebuffering = true; etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + return 0; + } +} + +size_t Edi::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) +{ + if (m_pending_sti_frame.frame.empty()) { + m_frames.try_pop(m_pending_sti_frame); + } + + if (m_is_prebuffering) { + if (m_pending_sti_frame.frame.empty()) { + memset(buffer, 0, size); + return 0; + } + else if (m_pending_sti_frame.frame.size() == size) { + // readFrame gets called every 24ms, so we allow max 24ms + // difference between the input frame timestamp and the requested + // timestamp. + if (m_pending_sti_frame.timestamp.valid()) { + double ts_frame = (double)m_pending_sti_frame.timestamp.seconds + + (m_pending_sti_frame.timestamp.tsta / 16384000.0); + + double ts_req = (double)seconds + (tsta / 16384000.0); + + if (abs(ts_frame - ts_req) < 24e-3) { + m_is_prebuffering = false; + etiLog.level(warn) << "EDI input " << m_name << + " valid timestamp, pre-buffering complete"; + copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); + m_pending_sti_frame.frame.clear(); + return size; + } + else { + // Wait more, but erase the front of the frame queue to avoid + // stalling on one frame with incorrect timestamp + if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { + m_pending_sti_frame.frame.clear(); + } + memset(buffer, 0, size); + return 0; + } + } + else { + etiLog.level(debug) << "EDI input " << m_name << + " skipping frame without timestamp"; + m_pending_sti_frame.frame.clear(); + memset(buffer, 0, size); + return 0; + } + } + else { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + m_pending_sti_frame.frame.clear(); + memset(buffer, 0, size); + return 0; + } + } + else { + EdiDecoder::sti_frame_t sti_frame; + m_frames.try_pop(sti_frame); + if (sti_frame.frame.empty()) { + etiLog.level(warn) << "EDI input " << m_name << + " empty, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else if (not sti_frame.timestamp.valid()) { + etiLog.level(warn) << "EDI input " << m_name << + " invalid timestamp, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else { + double ts_frame = (double)sti_frame.timestamp.seconds + + (sti_frame.timestamp.tsta / 16384000.0); + + double ts_req = (double)seconds + (tsta / 16384000.0); + + if (abs(ts_frame - ts_req) > 24e-3) { + m_is_prebuffering = true; + etiLog.level(warn) << "EDI input " << m_name << + " timestamp out of bounds, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else { + copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer); + return size; + } + } } - return size; -#endif } void Edi::m_run() |