aboutsummaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
Diffstat (limited to 'src/input')
-rw-r--r--src/input/Edi.cpp74
-rw-r--r--src/input/Edi.h4
2 files changed, 71 insertions, 7 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 247c16f..f6d4c07 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -52,14 +52,18 @@ constexpr size_t MAX_FRAMES_QUEUED = 1000;
constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;
/* When not using timestamping, how many frames to prebuffer.
- * TODO should be configurable as ZMQ.
- */
+ * TODO should be configurable as ZMQ. */
constexpr size_t NUM_FRAMES_PREBUFFERING = 10;
+/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */
+constexpr size_t MAX_FRAMES_OVERRUN = 900;
-Edi::Edi() :
+
+Edi::Edi(const std::string& name) :
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(),
- m_sti_decoder(m_sti_writer, VERBOSE)
+ m_sti_decoder(m_sti_writer, VERBOSE),
+ m_name(name),
+ m_stats(name)
{ }
Edi::~Edi() {
@@ -99,7 +103,7 @@ void Edi::open(const std::string& name)
throw runtime_error("Cannot parse EDI input URI");
}
- m_name = name;
+ m_stats.registerAtServer();
m_running = true;
m_thread = std::thread(&Edi::m_run, this);
@@ -107,6 +111,9 @@ void Edi::open(const std::string& name)
size_t Edi::readFrame(uint8_t *buffer, size_t size)
{
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frames.size() * size);
+
EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING;
@@ -117,6 +124,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
return 0;
}
else if (not m_pending_sti_frame.frame.empty()) {
+ // Can only happen when switching from timestamp-based buffer management!
if (m_pending_sti_frame.frame.size() != size) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
@@ -124,6 +132,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
return 0;
}
else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right);
+
copy(m_pending_sti_frame.frame.begin(),
m_pending_sti_frame.frame.end(),
buffer);
@@ -137,6 +152,28 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
return 0;
}
else if (sti.frame.size() == size) {
+ // Steady-state when everything works well
+ if (m_frames.size() > MAX_FRAMES_OVERRUN) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length. */
+ size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING;
+
+ while (over_max--) {
+ EdiDecoder::sti_frame_t discard;
+ m_frames.try_pop(discard);
+ }
+ }
+
+ if (not sti.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti.version_data.version,
+ sti.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right);
+
copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
return size;
}
@@ -151,6 +188,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
memset(buffer, 0, size * sizeof(*buffer));
m_is_prebuffering = true;
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ m_stats.notifyUnderrun();
return 0;
}
}
@@ -161,6 +199,8 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
m_frames.try_pop(m_pending_sti_frame);
}
+ m_stats.notifyBuffer(m_frames.size() * size);
+
if (m_is_prebuffering) {
if (m_pending_sti_frame.frame.empty()) {
memset(buffer, 0, size);
@@ -178,6 +218,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
m_is_prebuffering = false;
etiLog.level(warn) << "EDI input " << m_name <<
" valid timestamp, pre-buffering complete";
+
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
m_pending_sti_frame.frame.clear();
return size;
@@ -188,6 +237,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {
m_pending_sti_frame.frame.clear();
}
+ m_stats.notifyUnderrun();
memset(buffer, 0, size);
return 0;
}
@@ -215,12 +265,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
etiLog.level(warn) << "EDI input " << m_name <<
" empty, re-enabling pre-buffering";
memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
return 0;
}
else if (not sti_frame.timestamp.valid()) {
etiLog.level(warn) << "EDI input " << m_name <<
- " invalid timestamp, re-enabling pre-buffering";
+ " invalid timestamp, ignoring";
memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
return 0;
}
else {
@@ -228,6 +281,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
if (offset > 24e-3) {
+ m_stats.notifyUnderrun();
m_is_prebuffering = true;
etiLog.level(warn) << "EDI input " << m_name <<
" timestamp out of bounds, re-enabling pre-buffering";
@@ -235,6 +289,14 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
return 0;
}
else {
+ if (not sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti_frame.version_data.version,
+ sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(sti_frame.audio_levels.left,
+ sti_frame.audio_levels.right);
copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer);
return size;
}
diff --git a/src/input/Edi.h b/src/input/Edi.h
index 8f270d0..56bbf65 100644
--- a/src/input/Edi.h
+++ b/src/input/Edi.h
@@ -36,6 +36,7 @@
#include "edi/STIDecoder.hpp"
#include "edi/STIWriter.hpp"
#include "ThreadsafeQueue.h"
+#include "ManagementServer.h"
namespace Inputs {
@@ -47,7 +48,7 @@ namespace Inputs {
*/
class Edi : public InputBase {
public:
- Edi();
+ Edi(const std::string& name);
Edi(const Edi&) = delete;
Edi& operator=(const Edi&) = delete;
~Edi();
@@ -81,6 +82,7 @@ class Edi : public InputBase {
bool m_is_prebuffering = true;
std::string m_name;
+ InputStat m_stats;
};
};