aboutsummaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
Diffstat (limited to 'src/input')
-rw-r--r--src/input/Edi.cpp48
-rw-r--r--src/input/Edi.h3
2 files changed, 42 insertions, 9 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 8aee296..765a355 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -27,6 +27,7 @@
#include "input/Edi.h"
#include <regex>
+#include <chrono>
#include <stdexcept>
#include <sstream>
#include <cstring>
@@ -41,7 +42,7 @@ namespace Inputs {
constexpr bool VERBOSE = false;
constexpr size_t TCP_BLOCKSIZE = 2048;
-constexpr size_t MAX_FRAMES_QUEUED = 10;
+constexpr size_t MAX_FRAMES_QUEUED = 1000;
Edi::Edi() :
m_tcp_receive_server(TCP_BLOCKSIZE),
@@ -96,23 +97,53 @@ int Edi::open(const std::string& name)
int Edi::readFrame(uint8_t* buffer, size_t size)
{
- vector<uint8_t> frame;
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ if (not m_pending_sti_frame.frame.empty()) {
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ else {
+ const auto now = chrono::system_clock::now();
+
+ if (m_pending_sti_frame.timestamp.to_system_clock() <= now) {
+ etiLog.level(debug) << "EDI input take frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+
+ std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ }
+ else {
+ etiLog.level(debug) << "EDI input skip frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+ }
+ }
+ }
+
+ return size;
+
+#if 0
+ EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
m_is_prebuffering = m_frames.size() < 10;
if (not m_is_prebuffering) {
etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
}
}
- else if (m_frames.try_pop(frame)) {
- if (frame.size() == 0) {
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
etiLog.level(debug) << "EDI input " << m_name << " empty frame";
return 0;
}
- else if (frame.size() == size) {
- std::copy(frame.cbegin(), frame.cend(), buffer);
+ else if (sti.frame.size() == size) {
+ std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
}
else {
- etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() <<
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<
" received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
}
@@ -123,6 +154,7 @@ int Edi::readFrame(uint8_t* buffer, size_t size)
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
}
return size;
+#endif
}
void Edi::m_run()
@@ -159,7 +191,7 @@ void Edi::m_run()
const auto sti = m_sti_writer.getFrame();
if (not sti.frame.empty()) {
- m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED);
+ m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);
work_done = true;
}
diff --git a/src/input/Edi.h b/src/input/Edi.h
index 7b3dc04..66ff682 100644
--- a/src/input/Edi.h
+++ b/src/input/Edi.h
@@ -71,7 +71,8 @@ class Edi : public InputBase {
EdiDecoder::STIDecoder m_sti_decoder;
std::thread m_thread;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
- ThreadsafeQueue<std::vector<uint8_t> > m_frames;
+ ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+ EdiDecoder::sti_frame_t m_pending_sti_frame;
bool m_is_prebuffering = true;