diff options
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/Edi.cpp | 48 | ||||
-rw-r--r-- | src/input/Edi.h | 3 |
2 files changed, 42 insertions, 9 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 8aee296..765a355 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -27,6 +27,7 @@ #include "input/Edi.h" #include <regex> +#include <chrono> #include <stdexcept> #include <sstream> #include <cstring> @@ -41,7 +42,7 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; -constexpr size_t MAX_FRAMES_QUEUED = 10; +constexpr size_t MAX_FRAMES_QUEUED = 1000; Edi::Edi() : m_tcp_receive_server(TCP_BLOCKSIZE), @@ -96,23 +97,53 @@ int Edi::open(const std::string& name) int Edi::readFrame(uint8_t* buffer, size_t size) { - vector<uint8_t> frame; + if (m_pending_sti_frame.frame.empty()) { + m_frames.try_pop(m_pending_sti_frame); + } + + if (not m_pending_sti_frame.frame.empty()) { + if (m_pending_sti_frame.frame.size() != size) { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + memset(buffer, 0, size * sizeof(*buffer)); + } + else { + const auto now = chrono::system_clock::now(); + + if (m_pending_sti_frame.timestamp.to_system_clock() <= now) { + etiLog.level(debug) << "EDI input take frame with TS " << + m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); + + std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); + m_pending_sti_frame.frame.clear(); + } + else { + etiLog.level(debug) << "EDI input skip frame with TS " << + m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); + } + } + } + + return size; + +#if 0 + EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { m_is_prebuffering = m_frames.size() < 10; if (not m_is_prebuffering) { etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; } } - else if (m_frames.try_pop(frame)) { - if (frame.size() == 0) { + else if (m_frames.try_pop(sti)) { + if (sti.frame.size() == 0) { etiLog.level(debug) << "EDI input " << m_name << " empty frame"; return 0; } - else if (frame.size() == size) { - std::copy(frame.cbegin(), frame.cend(), buffer); + else if (sti.frame.size() == size) { + std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer); } else { - etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() << + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() << " received, " << size << " requested"; memset(buffer, 0, size * sizeof(*buffer)); } @@ -123,6 +154,7 @@ int Edi::readFrame(uint8_t* buffer, size_t size) etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; } return size; +#endif } void Edi::m_run() @@ -159,7 +191,7 @@ void Edi::m_run() const auto sti = m_sti_writer.getFrame(); if (not sti.frame.empty()) { - m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED); + m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); work_done = true; } diff --git a/src/input/Edi.h b/src/input/Edi.h index 7b3dc04..66ff682 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -71,7 +71,8 @@ class Edi : public InputBase { EdiDecoder::STIDecoder m_sti_decoder; std::thread m_thread; std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); - ThreadsafeQueue<std::vector<uint8_t> > m_frames; + ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; + EdiDecoder::sti_frame_t m_pending_sti_frame; bool m_is_prebuffering = true; |