diff options
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/Edi.cpp | 74 | ||||
-rw-r--r-- | src/input/Edi.h | 4 |
2 files changed, 71 insertions, 7 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 247c16f..f6d4c07 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -52,14 +52,18 @@ constexpr size_t MAX_FRAMES_QUEUED = 1000; constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; /* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ. - */ + * TODO should be configurable as ZMQ. */ constexpr size_t NUM_FRAMES_PREBUFFERING = 10; +/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ +constexpr size_t MAX_FRAMES_OVERRUN = 900; -Edi::Edi() : + +Edi::Edi(const std::string& name) : m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), - m_sti_decoder(m_sti_writer, VERBOSE) + m_sti_decoder(m_sti_writer, VERBOSE), + m_name(name), + m_stats(name) { } Edi::~Edi() { @@ -99,7 +103,7 @@ void Edi::open(const std::string& name) throw runtime_error("Cannot parse EDI input URI"); } - m_name = name; + m_stats.registerAtServer(); m_running = true; m_thread = std::thread(&Edi::m_run, this); @@ -107,6 +111,9 @@ void Edi::open(const std::string& name) size_t Edi::readFrame(uint8_t *buffer, size_t size) { + // Save stats data in bytes, not in frames + m_stats.notifyBuffer(m_frames.size() * size); + EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; @@ -117,6 +124,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) return 0; } else if (not m_pending_sti_frame.frame.empty()) { + // Can only happen when switching from timestamp-based buffer management! if (m_pending_sti_frame.frame.size() != size) { etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << m_pending_sti_frame.frame.size() << " received, " << size << " requested"; @@ -124,6 +132,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) return 0; } else { + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right); + copy(m_pending_sti_frame.frame.begin(), m_pending_sti_frame.frame.end(), buffer); @@ -137,6 +152,28 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) return 0; } else if (sti.frame.size() == size) { + // Steady-state when everything works well + if (m_frames.size() > MAX_FRAMES_OVERRUN) { + m_stats.notifyOverrun(); + + /* If the buffer is too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. */ + size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; + + while (over_max--) { + EdiDecoder::sti_frame_t discard; + m_frames.try_pop(discard); + } + } + + if (not sti.version_data.version.empty()) { + m_stats.notifyVersion( + sti.version_data.version, + sti.version_data.uptime_s); + } + m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right); + copy(sti.frame.cbegin(), sti.frame.cend(), buffer); return size; } @@ -151,6 +188,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) memset(buffer, 0, size * sizeof(*buffer)); m_is_prebuffering = true; etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + m_stats.notifyUnderrun(); return 0; } } @@ -161,6 +199,8 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc m_frames.try_pop(m_pending_sti_frame); } + m_stats.notifyBuffer(m_frames.size() * size); + if (m_is_prebuffering) { if (m_pending_sti_frame.frame.empty()) { memset(buffer, 0, size); @@ -178,6 +218,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc m_is_prebuffering = false; etiLog.level(warn) << "EDI input " << m_name << " valid timestamp, pre-buffering complete"; + + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, + m_pending_sti_frame.audio_levels.right); copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); m_pending_sti_frame.frame.clear(); return size; @@ -188,6 +237,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { m_pending_sti_frame.frame.clear(); } + m_stats.notifyUnderrun(); memset(buffer, 0, size); return 0; } @@ -215,12 +265,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc etiLog.level(warn) << "EDI input " << m_name << " empty, re-enabling pre-buffering"; memset(buffer, 0, size); + m_stats.notifyUnderrun(); + m_is_prebuffering = true; return 0; } else if (not sti_frame.timestamp.valid()) { etiLog.level(warn) << "EDI input " << m_name << - " invalid timestamp, re-enabling pre-buffering"; + " invalid timestamp, ignoring"; memset(buffer, 0, size); + m_stats.notifyUnderrun(); return 0; } else { @@ -228,6 +281,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req); if (offset > 24e-3) { + m_stats.notifyUnderrun(); m_is_prebuffering = true; etiLog.level(warn) << "EDI input " << m_name << " timestamp out of bounds, re-enabling pre-buffering"; @@ -235,6 +289,14 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc return 0; } else { + if (not sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + sti_frame.version_data.version, + sti_frame.version_data.uptime_s); + } + + m_stats.notifyPeakLevels(sti_frame.audio_levels.left, + sti_frame.audio_levels.right); copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer); return size; } diff --git a/src/input/Edi.h b/src/input/Edi.h index 8f270d0..56bbf65 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -36,6 +36,7 @@ #include "edi/STIDecoder.hpp" #include "edi/STIWriter.hpp" #include "ThreadsafeQueue.h" +#include "ManagementServer.h" namespace Inputs { @@ -47,7 +48,7 @@ namespace Inputs { */ class Edi : public InputBase { public: - Edi(); + Edi(const std::string& name); Edi(const Edi&) = delete; Edi& operator=(const Edi&) = delete; ~Edi(); @@ -81,6 +82,7 @@ class Edi : public InputBase { bool m_is_prebuffering = true; std::string m_name; + InputStat m_stats; }; }; |