From 907534bccee046c0499e9f936873c229b247ef95 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 23 Sep 2019 14:44:45 +0200 Subject: Add buffermanagement setting to RC --- src/input/Edi.cpp | 91 ++++++++++++++++++++++++++++++++++++++++++------------ src/input/Edi.h | 25 +++++++++++++-- src/input/inputs.h | 15 +++++++++ 3 files changed, 109 insertions(+), 22 deletions(-) (limited to 'src/input') diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index f6d4c07..98e8e9c 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -44,27 +44,23 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; -/* Absolute max number of frames to be queued, both with and without timestamping */ -constexpr size_t MAX_FRAMES_QUEUED = 1000; - -/* When using timestamping, start discarding the front of the queue once the queue - * is this full. Must be smaller than MAX_FRAMES_QUEUED. */ -constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; - -/* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ. */ -constexpr size_t NUM_FRAMES_PREBUFFERING = 10; -/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ -constexpr size_t MAX_FRAMES_OVERRUN = 900; - - Edi::Edi(const std::string& name) : + RemoteControllable(name), m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), m_sti_decoder(m_sti_writer, VERBOSE), m_name(name), m_stats(name) -{ } +{ + RC_ADD_PARAMETER(buffermanagement, + "Set type of buffer management to use [prebuffering, timestamped]"); + + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [24ms frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [24ms frames]"); +} Edi::~Edi() { m_running = false; @@ -116,7 +112,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { - m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; + m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering; if (not m_is_prebuffering) { etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; } @@ -153,13 +149,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) } else if (sti.frame.size() == size) { // Steady-state when everything works well - if (m_frames.size() > MAX_FRAMES_OVERRUN) { + if (m_frames.size() > m_max_frames_overrun) { m_stats.notifyOverrun(); /* If the buffer is too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; + size_t over_max = m_frames.size() - m_num_frames_prebuffering; while (over_max--) { EdiDecoder::sti_frame_t discard; @@ -234,7 +230,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc else { // Wait more, but erase the front of the frame queue to avoid // stalling on one frame with incorrect timestamp - if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { + if (m_frames.size() >= m_max_frames_overrun) { m_pending_sti_frame.frame.clear(); } m_stats.notifyUnderrun(); @@ -338,7 +334,10 @@ void Edi::m_run() const auto sti = m_sti_writer.getFrame(); if (not sti.frame.empty()) { - m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); + // We should not wait here, because we want the complete input buffering + // happening inside m_frames. Using the blocking function is only a protection + // against runaway memory usage if something goes wrong in the consumer. + m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2); work_done = true; } @@ -363,4 +362,56 @@ void Edi::close() m_udp_sock.close(); } + +void Edi::set_parameter(const std::string& parameter, const std::string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + m_max_frames_overrun = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_limit = atol(value.c_str()); + m_num_frames_prebuffering = new_limit; + } + else if (parameter == "buffermanagement") { + if (value == "prebuffering") { + setBufferManagement(Inputs::BufferManagement::Prebuffering); + } + else if (value == "timestamped") { + setBufferManagement(Inputs::BufferManagement::Timestamped); + } + else { + throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name()); + } + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } +} + +const std::string Edi::get_parameter(const std::string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_max_frames_overrun; + } + else if (parameter == "prebuffering") { + ss << m_num_frames_prebuffering; + } + else if (parameter == "buffermanagement") { + switch (getBufferManagement()) { + case Inputs::BufferManagement::Prebuffering: + ss << "prebuffering"; + break; + case Inputs::BufferManagement::Timestamped: + ss << "Timestamped"; + break; + } + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } + return ss.str(); +} + } diff --git a/src/input/Edi.h b/src/input/Edi.h index 56bbf65..0a44139 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -46,7 +46,7 @@ namespace Inputs { * * This way, the EDI decoding happens in a separate thread. */ -class Edi : public InputBase { +class Edi : public InputBase, public RemoteControllable { public: Edi(const std::string& name); Edi(const Edi&) = delete; @@ -59,6 +59,10 @@ class Edi : public InputBase { virtual int setBitrate(int bitrate); virtual void close(); + /* Remote control */ + virtual void set_parameter(const std::string& parameter, const std::string& value); + virtual const std::string get_parameter(const std::string& parameter) const; + protected: void m_run(); @@ -75,12 +79,29 @@ class Edi : public InputBase { std::atomic m_running = ATOMIC_VAR_INIT(false); ThreadsafeQueue m_frames; + std::mutex m_rc_params_mutex; + // InputBase defines bufferManagement, which must also be guarded by that mutex + // Used in timestamp-based buffer management EdiDecoder::sti_frame_t m_pending_sti_frame; - // Used in prebuffering-based buffer management + // State variable used in prebuffering-based buffer management bool m_is_prebuffering = true; + /* When using prebuffering, consider the buffer to be full on the + * receive side if it's above the overrun threshold. + * + * When using timestamping, start discarding the front of the queue once the queue + * is this full. Must be smaller than m_max_frames_queued. + * + * Parameter 'buffer' inside RC. */ + std::atomic m_max_frames_overrun = ATOMIC_VAR_INIT(1000); + + /* When not using timestamping, how many frames to prebuffer. + * Parameter 'prebuffering' inside RC. */ + std::atomic m_num_frames_prebuffering = ATOMIC_VAR_INIT(10); + + std::string m_name; InputStat m_stats; }; diff --git a/src/input/inputs.h b/src/input/inputs.h index b99a88f..5d4fc60 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -35,6 +35,15 @@ namespace Inputs { +enum class BufferManagement { + // Use a buffer in the input that doesn't consider timestamps + Prebuffering, + + // Buffer incoming data until a given timestamp is reached + Timestamped, +}; + + /* New input object base */ class InputBase { public: @@ -73,8 +82,14 @@ class InputBase { virtual void close() = 0; virtual ~InputBase() {} + + void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; } + BufferManagement getBufferManagement() const { return m_bufferManagement; } + protected: InputBase() {} + + std::atomic m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering); }; }; -- cgit v1.2.3