summaryrefslogtreecommitdiffstats
path: root/src/input/Edi.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-08-27 11:02:23 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-08-27 11:02:23 +0200
commit0298076eea4f92685f9a01974261da41c2a01e5b (patch)
treef72831cd212da64c1a3ea124bb9b590ced8558f2 /src/input/Edi.cpp
parentcac39dedee89d62ebf5d0135b84ccaa2e387a7cb (diff)
downloaddabmux-0298076eea4f92685f9a01974261da41c2a01e5b.tar.gz
dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.tar.bz2
dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.zip
EDI input: add new buffer management
Diffstat (limited to 'src/input/Edi.cpp')
-rw-r--r--src/input/Edi.cpp158
1 files changed, 124 insertions, 34 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 2d82902..95fac53 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -42,8 +42,19 @@ namespace Inputs {
constexpr bool VERBOSE = false;
constexpr size_t TCP_BLOCKSIZE = 2048;
+
+/* Absolute max number of frames to be queued, both with and without timestamping */
constexpr size_t MAX_FRAMES_QUEUED = 1000;
+/* When using timestamping, start discarding the front of the queue once the queue
+ * is this full. Must be smaller than MAX_FRAMES_QUEUED. */
+constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;
+
+/* When not using timestamping, how many frames to prebuffer.
+ * TODO should be configurable as ZMQ.
+ */
+constexpr size_t NUM_FRAMES_PREBUFFERING = 10;
+
Edi::Edi() :
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(),
@@ -93,43 +104,30 @@ void Edi::open(const std::string& name)
m_thread = std::thread(&Edi::m_run, this);
}
-int Edi::readFrame(uint8_t* buffer, size_t size)
+size_t Edi::readFrame(uint8_t *buffer, size_t size)
{
- if (m_pending_sti_frame.frame.empty()) {
- m_frames.try_pop(m_pending_sti_frame);
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
}
-
- if (not m_pending_sti_frame.frame.empty()) {
+ else if (not m_pending_sti_frame.frame.empty()) {
if (m_pending_sti_frame.frame.size() != size) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
}
else {
- const auto now = chrono::system_clock::now();
-
- if (m_pending_sti_frame.timestamp.to_system_clock() <= now) {
- etiLog.level(debug) << "EDI input take frame with TS " <<
- m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
-
- std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
- m_pending_sti_frame.frame.clear();
- }
- else {
- etiLog.level(debug) << "EDI input skip frame with TS " <<
- m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
- }
- }
- }
-
- return size;
-
-#if 0
- EdiDecoder::sti_frame_t sti;
- if (m_is_prebuffering) {
- m_is_prebuffering = m_frames.size() < 10;
- if (not m_is_prebuffering) {
- etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ copy(m_pending_sti_frame.frame.begin(),
+ m_pending_sti_frame.frame.end(),
+ buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
}
}
else if (m_frames.try_pop(sti)) {
@@ -138,21 +136,113 @@ int Edi::readFrame(uint8_t* buffer, size_t size)
return 0;
}
else if (sti.frame.size() == size) {
- std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ return size;
}
else {
- etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<
- " received, " << size << " requested";
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ sti.frame.size() << " received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
}
}
else {
memset(buffer, 0, size * sizeof(*buffer));
m_is_prebuffering = true;
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ return 0;
+ }
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ if (m_is_prebuffering) {
+ if (m_pending_sti_frame.frame.empty()) {
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (m_pending_sti_frame.frame.size() == size) {
+ // readFrame gets called every 24ms, so we allow max 24ms
+ // difference between the input frame timestamp and the requested
+ // timestamp.
+ if (m_pending_sti_frame.timestamp.valid()) {
+ double ts_frame = (double)m_pending_sti_frame.timestamp.seconds +
+ (m_pending_sti_frame.timestamp.tsta / 16384000.0);
+
+ double ts_req = (double)seconds + (tsta / 16384000.0);
+
+ if (abs(ts_frame - ts_req) < 24e-3) {
+ m_is_prebuffering = false;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " valid timestamp, pre-buffering complete";
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ else {
+ // Wait more, but erase the front of the frame queue to avoid
+ // stalling on one frame with incorrect timestamp
+ if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {
+ m_pending_sti_frame.frame.clear();
+ }
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name <<
+ " skipping frame without timestamp";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ EdiDecoder::sti_frame_t sti_frame;
+ m_frames.try_pop(sti_frame);
+ if (sti_frame.frame.empty()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " empty, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (not sti_frame.timestamp.valid()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " invalid timestamp, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ double ts_frame = (double)sti_frame.timestamp.seconds +
+ (sti_frame.timestamp.tsta / 16384000.0);
+
+ double ts_req = (double)seconds + (tsta / 16384000.0);
+
+ if (abs(ts_frame - ts_req) > 24e-3) {
+ m_is_prebuffering = true;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " timestamp out of bounds, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer);
+ return size;
+ }
+ }
}
- return size;
-#endif
}
void Edi::m_run()