summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdoc/show_dabmux_stats.py16
-rw-r--r--lib/edi/STIDecoder.cpp35
-rw-r--r--lib/edi/STIDecoder.hpp13
-rw-r--r--lib/edi/STIWriter.cpp13
-rw-r--r--lib/edi/STIWriter.hpp7
-rw-r--r--lib/edi/common.cpp13
-rw-r--r--lib/edi/common.hpp8
-rw-r--r--src/ConfigParser.cpp2
-rw-r--r--src/ManagementServer.cpp20
-rw-r--r--src/ManagementServer.h4
-rw-r--r--src/input/Edi.cpp74
-rw-r--r--src/input/Edi.h4
12 files changed, 196 insertions, 13 deletions
diff --git a/doc/show_dabmux_stats.py b/doc/show_dabmux_stats.py
index d226208..a451ef2 100755
--- a/doc/show_dabmux_stats.py
+++ b/doc/show_dabmux_stats.py
@@ -46,7 +46,7 @@ if len(sys.argv) == 1:
data = sock.recv()
values = json.loads(data)['values']
- tmpl = "{ident:20}{maxfill:>8}{minfill:>8}{under:>8}{over:>8}{audioleft:>8}{audioright:>8}{peakleft:>8}{peakright:>8}{state:>16}"
+ tmpl = "{ident:20}{maxfill:>8}{minfill:>8}{under:>8}{over:>8}{audioleft:>8}{audioright:>8}{peakleft:>8}{peakright:>8}{state:>16}{version:>48}{uptime:>8}"
print(tmpl.format(
ident="id",
maxfill="max",
@@ -57,7 +57,9 @@ if len(sys.argv) == 1:
audioright="audio R",
peakleft="peak L",
peakright="peak R",
- state="state"))
+ state="state",
+ version="version",
+ uptime="uptime"))
for ident in values:
v = values[ident]['inputstat']
@@ -65,6 +67,12 @@ if len(sys.argv) == 1:
if 'state' not in v:
v['state'] = None
+ if 'version' not in v:
+ v['version'] = "Unknown"
+
+ if 'uptime' not in v:
+ v['uptime'] = "?"
+
print(tmpl.format(
ident=ident,
maxfill=v['max_fill'],
@@ -75,7 +83,9 @@ if len(sys.argv) == 1:
audioright=v['peak_right'],
peakleft=v['peak_left_slow'],
peakright=v['peak_right_slow'],
- state=v['state']))
+ state=v['state'],
+ version=v['version'],
+ uptime=v['uptime']))
elif len(sys.argv) == 2 and sys.argv[1] == "config":
diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp
index aab93e0..1f5d45e 100644
--- a/lib/edi/STIDecoder.cpp
+++ b/lib/edi/STIDecoder.cpp
@@ -44,6 +44,10 @@ STIDecoder::STIDecoder(STIDataCollector& data_collector, bool verbose) :
std::bind(&STIDecoder::decode_ssn, this, _1, _2));
m_dispatcher.register_tag("*dmy",
std::bind(&STIDecoder::decode_stardmy, this, _1, _2));
+ m_dispatcher.register_tag("ODRa",
+ std::bind(&STIDecoder::decode_odraudiolevel, this, _1, _2));
+ m_dispatcher.register_tag("ODRv",
+ std::bind(&STIDecoder::decode_odrversion, this, _1, _2));
}
void STIDecoder::push_bytes(const vector<uint8_t> &buf)
@@ -183,6 +187,37 @@ bool STIDecoder::decode_stardmy(const vector<uint8_t>& /*value*/, uint16_t)
return true;
}
+bool STIDecoder::decode_odraudiolevel(const vector<uint8_t>& value, uint16_t)
+{
+ constexpr size_t expected_length = 2 * sizeof(int16_t);
+
+ audio_level_data audio_level;
+
+ if (value.size() == expected_length) {
+ audio_level.left = read_16b(value.begin());
+ audio_level.right = read_16b(value.begin() + 2);
+ }
+ else {
+ audio_level.left = 0;
+ audio_level.right = 0;
+ etiLog.level(warn) << "EDI: ODR AudioLevel TAG has wrong length!";
+ }
+
+ m_data_collector.update_audio_levels(audio_level);
+
+ // Not being able to decode the audio level is a soft-failure, it should
+ // not disrupt decoding the actual audio data.
+ return true;
+}
+
+bool STIDecoder::decode_odrversion(const vector<uint8_t>& value, uint16_t)
+{
+ const auto vd = parse_odr_version_data(value);
+ m_data_collector.update_odr_version(vd);
+
+ return true;
+}
+
void STIDecoder::packet_completed()
{
m_data_collector.assemble();
diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp
index 3f6f729..e2aa850 100644
--- a/lib/edi/STIDecoder.hpp
+++ b/lib/edi/STIDecoder.hpp
@@ -53,6 +53,12 @@ struct sti_payload_data {
uint16_t stl(void) const { return istd.size(); }
};
+struct audio_level_data {
+ int16_t left = 0;
+ int16_t right = 0;
+};
+
+
/* A class that receives STI data must implement the interface described
* in the STIDataCollector. This can be e.g. a converter to ETI, or something that
* prepares data structures for a modulator.
@@ -78,6 +84,9 @@ class STIDataCollector {
virtual void add_payload(sti_payload_data&& payload) = 0;
+ virtual void update_audio_levels(const audio_level_data& data) = 0;
+ virtual void update_odr_version(const odr_version_data& data) = 0;
+
virtual void assemble() = 0;
};
@@ -113,11 +122,13 @@ class STIDecoder {
bool decode_ssn(const std::vector<uint8_t> &value, uint16_t n);
bool decode_stardmy(const std::vector<uint8_t> &value, uint16_t);
+ bool decode_odraudiolevel(const std::vector<uint8_t> &value, uint16_t);
+ bool decode_odrversion(const std::vector<uint8_t> &value, uint16_t);
+
void packet_completed();
STIDataCollector& m_data_collector;
TagDispatcher m_dispatcher;
-
};
}
diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp
index 399922a..ea3bfe8 100644
--- a/lib/edi/STIWriter.cpp
+++ b/lib/edi/STIWriter.cpp
@@ -53,6 +53,7 @@ void STIWriter::reinit()
m_stat_valid = false;
m_time_valid = false;
m_payload_valid = false;
+ m_audio_levels = audio_level_data();
m_stiFrame.frame.clear();
}
@@ -84,6 +85,16 @@ void STIWriter::add_payload(sti_payload_data&& payload)
m_payload_valid = true;
}
+void STIWriter::update_audio_levels(const audio_level_data& data)
+{
+ m_audio_levels = data;
+}
+
+void STIWriter::update_odr_version(const odr_version_data& data)
+{
+ m_version_data = data;
+}
+
void STIWriter::update_edi_time(
uint32_t utco,
uint32_t seconds)
@@ -118,6 +129,8 @@ void STIWriter::assemble()
// Do copies so as to preserve existing payload data
m_stiFrame.frame = m_payload.istd;
+ m_stiFrame.audio_levels = m_audio_levels;
+ m_stiFrame.version_data = m_version_data;
m_stiFrame.timestamp.seconds = m_seconds;
m_stiFrame.timestamp.utco = m_utco;
m_stiFrame.timestamp.tsta = m_management_data.tsta;
diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp
index a75cb69..16cbfe8 100644
--- a/lib/edi/STIWriter.hpp
+++ b/lib/edi/STIWriter.hpp
@@ -32,6 +32,8 @@ namespace EdiDecoder {
struct sti_frame_t {
std::vector<uint8_t> frame;
frame_timestamp_t timestamp;
+ audio_level_data audio_levels;
+ odr_version_data version_data;
};
class STIWriter : public STIDataCollector {
@@ -53,6 +55,9 @@ class STIWriter : public STIDataCollector {
virtual void update_sti_management(const sti_management_data& data);
virtual void add_payload(sti_payload_data&& payload);
+ virtual void update_audio_levels(const audio_level_data& data);
+ virtual void update_odr_version(const odr_version_data& data);
+
virtual void assemble(void);
// Return the assembled frame or an empty frame if not ready
@@ -77,6 +82,8 @@ class STIWriter : public STIDataCollector {
bool m_payload_valid = false;
sti_payload_data m_payload;
+ audio_level_data m_audio_levels;
+ odr_version_data m_version_data;
sti_frame_t m_stiFrame;
};
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index 4629b55..ac8ec0c 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -339,4 +339,17 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
return success;
}
+odr_version_data parse_odr_version_data(const std::vector<uint8_t>& data)
+{
+ if (data.size() < sizeof(uint32_t)) {
+ return {};
+ }
+
+ const size_t versionstr_length = data.size() - sizeof(uint32_t);
+ string version(data.begin(), data.begin() + versionstr_length);
+ uint32_t uptime_s = read_32b(data.begin() + versionstr_length);
+
+ return {version, uptime_s};
+}
+
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index 1aa2cb6..5d15f8d 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -94,4 +94,12 @@ class TagDispatcher {
std::function<void()> m_af_packet_completed;
};
+// Data carried inside the ODRv EDI TAG
+struct odr_version_data {
+ std::string version;
+ uint32_t uptime_s;
+};
+
+odr_version_data parse_odr_version_data(const std::vector<uint8_t>& data);
+
}
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index b1e785a..2fb375b 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -956,7 +956,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
}
}
else if (proto == "edi") {
- subchan->input = make_shared<Inputs::Edi>();
+ subchan->input = make_shared<Inputs::Edi>(subchanuid);
}
else if (proto == "stp") {
subchan->input = make_shared<Inputs::Sti_d_Rtp>();
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index 201fc7b..783a40b 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -485,6 +485,14 @@ void InputStat::notifyOverrun(void)
}
}
+void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s)
+{
+ unique_lock<mutex> lock(m_mutex);
+
+ m_version = version;
+ m_uptime_s = uptime_s;
+}
+
std::string InputStat::encodeValuesJSON()
{
std::ostringstream ss;
@@ -548,6 +556,13 @@ std::string InputStat::encodeValuesJSON()
return dB;
};
+ auto version = m_version;
+ size_t pos = 0;
+ while ((pos = version.find("\"", pos)) != std::string::npos) {
+ version.replace(pos, 1, "\\\"");
+ pos++;
+ }
+
ss <<
"{ \"inputstat\" : {"
"\"min_fill\": " << min_fill_buffer << ", "
@@ -557,7 +572,10 @@ std::string InputStat::encodeValuesJSON()
"\"peak_left_slow\": " << to_dB(peak_left) << ", "
"\"peak_right_slow\": " << to_dB(peak_right) << ", "
"\"num_underruns\": " << m_num_underruns << ", "
- "\"num_overruns\": " << m_num_overruns << ", ";
+ "\"num_overruns\": " << m_num_overruns << ", "
+ "\"version\": \"" << version << "\", "
+ "\"uptime\": " << m_uptime_s << ", "
+ ;
ss << "\"state\": ";
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index 18af48c..5b52957 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -100,6 +100,7 @@ class InputStat
void notifyPeakLevels(int peak_left, int peak_right);
void notifyUnderrun(void);
void notifyOverrun(void);
+ void notifyVersion(const std::string& version, uint32_t uptime_s);
std::string encodeValuesJSON(void);
input_state_t determineState(void);
@@ -131,6 +132,9 @@ class InputStat
size_t m_short_window_length = 0;
+ std::string m_version;
+ uint32_t m_uptime_s = 0;
+
/************* STATE ***************/
/* Variables used for determining the input state */
int m_glitch_counter = 0; // saturating counter
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 247c16f..f6d4c07 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -52,14 +52,18 @@ constexpr size_t MAX_FRAMES_QUEUED = 1000;
constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;
/* When not using timestamping, how many frames to prebuffer.
- * TODO should be configurable as ZMQ.
- */
+ * TODO should be configurable as ZMQ. */
constexpr size_t NUM_FRAMES_PREBUFFERING = 10;
+/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */
+constexpr size_t MAX_FRAMES_OVERRUN = 900;
-Edi::Edi() :
+
+Edi::Edi(const std::string& name) :
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(),
- m_sti_decoder(m_sti_writer, VERBOSE)
+ m_sti_decoder(m_sti_writer, VERBOSE),
+ m_name(name),
+ m_stats(name)
{ }
Edi::~Edi() {
@@ -99,7 +103,7 @@ void Edi::open(const std::string& name)
throw runtime_error("Cannot parse EDI input URI");
}
- m_name = name;
+ m_stats.registerAtServer();
m_running = true;
m_thread = std::thread(&Edi::m_run, this);
@@ -107,6 +111,9 @@ void Edi::open(const std::string& name)
size_t Edi::readFrame(uint8_t *buffer, size_t size)
{
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frames.size() * size);
+
EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING;
@@ -117,6 +124,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
return 0;
}
else if (not m_pending_sti_frame.frame.empty()) {
+ // Can only happen when switching from timestamp-based buffer management!
if (m_pending_sti_frame.frame.size() != size) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
@@ -124,6 +132,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
return 0;
}
else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right);
+
copy(m_pending_sti_frame.frame.begin(),
m_pending_sti_frame.frame.end(),
buffer);
@@ -137,6 +152,28 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
return 0;
}
else if (sti.frame.size() == size) {
+ // Steady-state when everything works well
+ if (m_frames.size() > MAX_FRAMES_OVERRUN) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length. */
+ size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING;
+
+ while (over_max--) {
+ EdiDecoder::sti_frame_t discard;
+ m_frames.try_pop(discard);
+ }
+ }
+
+ if (not sti.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti.version_data.version,
+ sti.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right);
+
copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
return size;
}
@@ -151,6 +188,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
memset(buffer, 0, size * sizeof(*buffer));
m_is_prebuffering = true;
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ m_stats.notifyUnderrun();
return 0;
}
}
@@ -161,6 +199,8 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
m_frames.try_pop(m_pending_sti_frame);
}
+ m_stats.notifyBuffer(m_frames.size() * size);
+
if (m_is_prebuffering) {
if (m_pending_sti_frame.frame.empty()) {
memset(buffer, 0, size);
@@ -178,6 +218,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
m_is_prebuffering = false;
etiLog.level(warn) << "EDI input " << m_name <<
" valid timestamp, pre-buffering complete";
+
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
m_pending_sti_frame.frame.clear();
return size;
@@ -188,6 +237,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {
m_pending_sti_frame.frame.clear();
}
+ m_stats.notifyUnderrun();
memset(buffer, 0, size);
return 0;
}
@@ -215,12 +265,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
etiLog.level(warn) << "EDI input " << m_name <<
" empty, re-enabling pre-buffering";
memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
return 0;
}
else if (not sti_frame.timestamp.valid()) {
etiLog.level(warn) << "EDI input " << m_name <<
- " invalid timestamp, re-enabling pre-buffering";
+ " invalid timestamp, ignoring";
memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
return 0;
}
else {
@@ -228,6 +281,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
if (offset > 24e-3) {
+ m_stats.notifyUnderrun();
m_is_prebuffering = true;
etiLog.level(warn) << "EDI input " << m_name <<
" timestamp out of bounds, re-enabling pre-buffering";
@@ -235,6 +289,14 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
return 0;
}
else {
+ if (not sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti_frame.version_data.version,
+ sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(sti_frame.audio_levels.left,
+ sti_frame.audio_levels.right);
copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer);
return size;
}
diff --git a/src/input/Edi.h b/src/input/Edi.h
index 8f270d0..56bbf65 100644
--- a/src/input/Edi.h
+++ b/src/input/Edi.h
@@ -36,6 +36,7 @@
#include "edi/STIDecoder.hpp"
#include "edi/STIWriter.hpp"
#include "ThreadsafeQueue.h"
+#include "ManagementServer.h"
namespace Inputs {
@@ -47,7 +48,7 @@ namespace Inputs {
*/
class Edi : public InputBase {
public:
- Edi();
+ Edi(const std::string& name);
Edi(const Edi&) = delete;
Edi& operator=(const Edi&) = delete;
~Edi();
@@ -81,6 +82,7 @@ class Edi : public InputBase {
bool m_is_prebuffering = true;
std::string m_name;
+ InputStat m_stats;
};
};