diff options
-rwxr-xr-x | doc/show_dabmux_stats.py | 16 | ||||
-rw-r--r-- | lib/edi/STIDecoder.cpp | 35 | ||||
-rw-r--r-- | lib/edi/STIDecoder.hpp | 13 | ||||
-rw-r--r-- | lib/edi/STIWriter.cpp | 13 | ||||
-rw-r--r-- | lib/edi/STIWriter.hpp | 7 | ||||
-rw-r--r-- | lib/edi/common.cpp | 13 | ||||
-rw-r--r-- | lib/edi/common.hpp | 8 | ||||
-rw-r--r-- | src/ConfigParser.cpp | 2 | ||||
-rw-r--r-- | src/ManagementServer.cpp | 20 | ||||
-rw-r--r-- | src/ManagementServer.h | 4 | ||||
-rw-r--r-- | src/input/Edi.cpp | 74 | ||||
-rw-r--r-- | src/input/Edi.h | 4 |
12 files changed, 196 insertions, 13 deletions
diff --git a/doc/show_dabmux_stats.py b/doc/show_dabmux_stats.py index d226208..a451ef2 100755 --- a/doc/show_dabmux_stats.py +++ b/doc/show_dabmux_stats.py @@ -46,7 +46,7 @@ if len(sys.argv) == 1: data = sock.recv() values = json.loads(data)['values'] - tmpl = "{ident:20}{maxfill:>8}{minfill:>8}{under:>8}{over:>8}{audioleft:>8}{audioright:>8}{peakleft:>8}{peakright:>8}{state:>16}" + tmpl = "{ident:20}{maxfill:>8}{minfill:>8}{under:>8}{over:>8}{audioleft:>8}{audioright:>8}{peakleft:>8}{peakright:>8}{state:>16}{version:>48}{uptime:>8}" print(tmpl.format( ident="id", maxfill="max", @@ -57,7 +57,9 @@ if len(sys.argv) == 1: audioright="audio R", peakleft="peak L", peakright="peak R", - state="state")) + state="state", + version="version", + uptime="uptime")) for ident in values: v = values[ident]['inputstat'] @@ -65,6 +67,12 @@ if len(sys.argv) == 1: if 'state' not in v: v['state'] = None + if 'version' not in v: + v['version'] = "Unknown" + + if 'uptime' not in v: + v['uptime'] = "?" + print(tmpl.format( ident=ident, maxfill=v['max_fill'], @@ -75,7 +83,9 @@ if len(sys.argv) == 1: audioright=v['peak_right'], peakleft=v['peak_left_slow'], peakright=v['peak_right_slow'], - state=v['state'])) + state=v['state'], + version=v['version'], + uptime=v['uptime'])) elif len(sys.argv) == 2 and sys.argv[1] == "config": diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp index aab93e0..1f5d45e 100644 --- a/lib/edi/STIDecoder.cpp +++ b/lib/edi/STIDecoder.cpp @@ -44,6 +44,10 @@ STIDecoder::STIDecoder(STIDataCollector& data_collector, bool verbose) : std::bind(&STIDecoder::decode_ssn, this, _1, _2)); m_dispatcher.register_tag("*dmy", std::bind(&STIDecoder::decode_stardmy, this, _1, _2)); + m_dispatcher.register_tag("ODRa", + std::bind(&STIDecoder::decode_odraudiolevel, this, _1, _2)); + m_dispatcher.register_tag("ODRv", + std::bind(&STIDecoder::decode_odrversion, this, _1, _2)); } void STIDecoder::push_bytes(const vector<uint8_t> &buf) @@ -183,6 +187,37 @@ bool STIDecoder::decode_stardmy(const vector<uint8_t>& /*value*/, uint16_t) return true; } +bool STIDecoder::decode_odraudiolevel(const vector<uint8_t>& value, uint16_t) +{ + constexpr size_t expected_length = 2 * sizeof(int16_t); + + audio_level_data audio_level; + + if (value.size() == expected_length) { + audio_level.left = read_16b(value.begin()); + audio_level.right = read_16b(value.begin() + 2); + } + else { + audio_level.left = 0; + audio_level.right = 0; + etiLog.level(warn) << "EDI: ODR AudioLevel TAG has wrong length!"; + } + + m_data_collector.update_audio_levels(audio_level); + + // Not being able to decode the audio level is a soft-failure, it should + // not disrupt decoding the actual audio data. + return true; +} + +bool STIDecoder::decode_odrversion(const vector<uint8_t>& value, uint16_t) +{ + const auto vd = parse_odr_version_data(value); + m_data_collector.update_odr_version(vd); + + return true; +} + void STIDecoder::packet_completed() { m_data_collector.assemble(); diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp index 3f6f729..e2aa850 100644 --- a/lib/edi/STIDecoder.hpp +++ b/lib/edi/STIDecoder.hpp @@ -53,6 +53,12 @@ struct sti_payload_data { uint16_t stl(void) const { return istd.size(); } }; +struct audio_level_data { + int16_t left = 0; + int16_t right = 0; +}; + + /* A class that receives STI data must implement the interface described * in the STIDataCollector. This can be e.g. a converter to ETI, or something that * prepares data structures for a modulator. @@ -78,6 +84,9 @@ class STIDataCollector { virtual void add_payload(sti_payload_data&& payload) = 0; + virtual void update_audio_levels(const audio_level_data& data) = 0; + virtual void update_odr_version(const odr_version_data& data) = 0; + virtual void assemble() = 0; }; @@ -113,11 +122,13 @@ class STIDecoder { bool decode_ssn(const std::vector<uint8_t> &value, uint16_t n); bool decode_stardmy(const std::vector<uint8_t> &value, uint16_t); + bool decode_odraudiolevel(const std::vector<uint8_t> &value, uint16_t); + bool decode_odrversion(const std::vector<uint8_t> &value, uint16_t); + void packet_completed(); STIDataCollector& m_data_collector; TagDispatcher m_dispatcher; - }; } diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp index 399922a..ea3bfe8 100644 --- a/lib/edi/STIWriter.cpp +++ b/lib/edi/STIWriter.cpp @@ -53,6 +53,7 @@ void STIWriter::reinit() m_stat_valid = false; m_time_valid = false; m_payload_valid = false; + m_audio_levels = audio_level_data(); m_stiFrame.frame.clear(); } @@ -84,6 +85,16 @@ void STIWriter::add_payload(sti_payload_data&& payload) m_payload_valid = true; } +void STIWriter::update_audio_levels(const audio_level_data& data) +{ + m_audio_levels = data; +} + +void STIWriter::update_odr_version(const odr_version_data& data) +{ + m_version_data = data; +} + void STIWriter::update_edi_time( uint32_t utco, uint32_t seconds) @@ -118,6 +129,8 @@ void STIWriter::assemble() // Do copies so as to preserve existing payload data m_stiFrame.frame = m_payload.istd; + m_stiFrame.audio_levels = m_audio_levels; + m_stiFrame.version_data = m_version_data; m_stiFrame.timestamp.seconds = m_seconds; m_stiFrame.timestamp.utco = m_utco; m_stiFrame.timestamp.tsta = m_management_data.tsta; diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp index a75cb69..16cbfe8 100644 --- a/lib/edi/STIWriter.hpp +++ b/lib/edi/STIWriter.hpp @@ -32,6 +32,8 @@ namespace EdiDecoder { struct sti_frame_t { std::vector<uint8_t> frame; frame_timestamp_t timestamp; + audio_level_data audio_levels; + odr_version_data version_data; }; class STIWriter : public STIDataCollector { @@ -53,6 +55,9 @@ class STIWriter : public STIDataCollector { virtual void update_sti_management(const sti_management_data& data); virtual void add_payload(sti_payload_data&& payload); + virtual void update_audio_levels(const audio_level_data& data); + virtual void update_odr_version(const odr_version_data& data); + virtual void assemble(void); // Return the assembled frame or an empty frame if not ready @@ -77,6 +82,8 @@ class STIWriter : public STIDataCollector { bool m_payload_valid = false; sti_payload_data m_payload; + audio_level_data m_audio_levels; + odr_version_data m_version_data; sti_frame_t m_stiFrame; }; diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 4629b55..ac8ec0c 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -339,4 +339,17 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload) return success; } +odr_version_data parse_odr_version_data(const std::vector<uint8_t>& data) +{ + if (data.size() < sizeof(uint32_t)) { + return {}; + } + + const size_t versionstr_length = data.size() - sizeof(uint32_t); + string version(data.begin(), data.begin() + versionstr_length); + uint32_t uptime_s = read_32b(data.begin() + versionstr_length); + + return {version, uptime_s}; +} + } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index 1aa2cb6..5d15f8d 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -94,4 +94,12 @@ class TagDispatcher { std::function<void()> m_af_packet_completed; }; +// Data carried inside the ODRv EDI TAG +struct odr_version_data { + std::string version; + uint32_t uptime_s; +}; + +odr_version_data parse_odr_version_data(const std::vector<uint8_t>& data); + } diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index b1e785a..2fb375b 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -956,7 +956,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, } } else if (proto == "edi") { - subchan->input = make_shared<Inputs::Edi>(); + subchan->input = make_shared<Inputs::Edi>(subchanuid); } else if (proto == "stp") { subchan->input = make_shared<Inputs::Sti_d_Rtp>(); diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 201fc7b..783a40b 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -485,6 +485,14 @@ void InputStat::notifyOverrun(void) } } +void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s) +{ + unique_lock<mutex> lock(m_mutex); + + m_version = version; + m_uptime_s = uptime_s; +} + std::string InputStat::encodeValuesJSON() { std::ostringstream ss; @@ -548,6 +556,13 @@ std::string InputStat::encodeValuesJSON() return dB; }; + auto version = m_version; + size_t pos = 0; + while ((pos = version.find("\"", pos)) != std::string::npos) { + version.replace(pos, 1, "\\\""); + pos++; + } + ss << "{ \"inputstat\" : {" "\"min_fill\": " << min_fill_buffer << ", " @@ -557,7 +572,10 @@ std::string InputStat::encodeValuesJSON() "\"peak_left_slow\": " << to_dB(peak_left) << ", " "\"peak_right_slow\": " << to_dB(peak_right) << ", " "\"num_underruns\": " << m_num_underruns << ", " - "\"num_overruns\": " << m_num_overruns << ", "; + "\"num_overruns\": " << m_num_overruns << ", " + "\"version\": \"" << version << "\", " + "\"uptime\": " << m_uptime_s << ", " + ; ss << "\"state\": "; diff --git a/src/ManagementServer.h b/src/ManagementServer.h index 18af48c..5b52957 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -100,6 +100,7 @@ class InputStat void notifyPeakLevels(int peak_left, int peak_right); void notifyUnderrun(void); void notifyOverrun(void); + void notifyVersion(const std::string& version, uint32_t uptime_s); std::string encodeValuesJSON(void); input_state_t determineState(void); @@ -131,6 +132,9 @@ class InputStat size_t m_short_window_length = 0; + std::string m_version; + uint32_t m_uptime_s = 0; + /************* STATE ***************/ /* Variables used for determining the input state */ int m_glitch_counter = 0; // saturating counter diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 247c16f..f6d4c07 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -52,14 +52,18 @@ constexpr size_t MAX_FRAMES_QUEUED = 1000; constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; /* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ. - */ + * TODO should be configurable as ZMQ. */ constexpr size_t NUM_FRAMES_PREBUFFERING = 10; +/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ +constexpr size_t MAX_FRAMES_OVERRUN = 900; -Edi::Edi() : + +Edi::Edi(const std::string& name) : m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), - m_sti_decoder(m_sti_writer, VERBOSE) + m_sti_decoder(m_sti_writer, VERBOSE), + m_name(name), + m_stats(name) { } Edi::~Edi() { @@ -99,7 +103,7 @@ void Edi::open(const std::string& name) throw runtime_error("Cannot parse EDI input URI"); } - m_name = name; + m_stats.registerAtServer(); m_running = true; m_thread = std::thread(&Edi::m_run, this); @@ -107,6 +111,9 @@ void Edi::open(const std::string& name) size_t Edi::readFrame(uint8_t *buffer, size_t size) { + // Save stats data in bytes, not in frames + m_stats.notifyBuffer(m_frames.size() * size); + EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; @@ -117,6 +124,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) return 0; } else if (not m_pending_sti_frame.frame.empty()) { + // Can only happen when switching from timestamp-based buffer management! if (m_pending_sti_frame.frame.size() != size) { etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << m_pending_sti_frame.frame.size() << " received, " << size << " requested"; @@ -124,6 +132,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) return 0; } else { + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right); + copy(m_pending_sti_frame.frame.begin(), m_pending_sti_frame.frame.end(), buffer); @@ -137,6 +152,28 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) return 0; } else if (sti.frame.size() == size) { + // Steady-state when everything works well + if (m_frames.size() > MAX_FRAMES_OVERRUN) { + m_stats.notifyOverrun(); + + /* If the buffer is too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. */ + size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; + + while (over_max--) { + EdiDecoder::sti_frame_t discard; + m_frames.try_pop(discard); + } + } + + if (not sti.version_data.version.empty()) { + m_stats.notifyVersion( + sti.version_data.version, + sti.version_data.uptime_s); + } + m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right); + copy(sti.frame.cbegin(), sti.frame.cend(), buffer); return size; } @@ -151,6 +188,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) memset(buffer, 0, size * sizeof(*buffer)); m_is_prebuffering = true; etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + m_stats.notifyUnderrun(); return 0; } } @@ -161,6 +199,8 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc m_frames.try_pop(m_pending_sti_frame); } + m_stats.notifyBuffer(m_frames.size() * size); + if (m_is_prebuffering) { if (m_pending_sti_frame.frame.empty()) { memset(buffer, 0, size); @@ -178,6 +218,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc m_is_prebuffering = false; etiLog.level(warn) << "EDI input " << m_name << " valid timestamp, pre-buffering complete"; + + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, + m_pending_sti_frame.audio_levels.right); copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); m_pending_sti_frame.frame.clear(); return size; @@ -188,6 +237,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { m_pending_sti_frame.frame.clear(); } + m_stats.notifyUnderrun(); memset(buffer, 0, size); return 0; } @@ -215,12 +265,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc etiLog.level(warn) << "EDI input " << m_name << " empty, re-enabling pre-buffering"; memset(buffer, 0, size); + m_stats.notifyUnderrun(); + m_is_prebuffering = true; return 0; } else if (not sti_frame.timestamp.valid()) { etiLog.level(warn) << "EDI input " << m_name << - " invalid timestamp, re-enabling pre-buffering"; + " invalid timestamp, ignoring"; memset(buffer, 0, size); + m_stats.notifyUnderrun(); return 0; } else { @@ -228,6 +281,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req); if (offset > 24e-3) { + m_stats.notifyUnderrun(); m_is_prebuffering = true; etiLog.level(warn) << "EDI input " << m_name << " timestamp out of bounds, re-enabling pre-buffering"; @@ -235,6 +289,14 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc return 0; } else { + if (not sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + sti_frame.version_data.version, + sti_frame.version_data.uptime_s); + } + + m_stats.notifyPeakLevels(sti_frame.audio_levels.left, + sti_frame.audio_levels.right); copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer); return size; } diff --git a/src/input/Edi.h b/src/input/Edi.h index 8f270d0..56bbf65 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -36,6 +36,7 @@ #include "edi/STIDecoder.hpp" #include "edi/STIWriter.hpp" #include "ThreadsafeQueue.h" +#include "ManagementServer.h" namespace Inputs { @@ -47,7 +48,7 @@ namespace Inputs { */ class Edi : public InputBase { public: - Edi(); + Edi(const std::string& name); Edi(const Edi&) = delete; Edi& operator=(const Edi&) = delete; ~Edi(); @@ -81,6 +82,7 @@ class Edi : public InputBase { bool m_is_prebuffering = true; std::string m_name; + InputStat m_stats; }; }; |