From 907534bccee046c0499e9f936873c229b247ef95 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 23 Sep 2019 14:44:45 +0200 Subject: Add buffermanagement setting to RC --- src/input/Edi.cpp | 91 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 20 deletions(-) (limited to 'src/input/Edi.cpp') diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index f6d4c07..98e8e9c 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -44,27 +44,23 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; -/* Absolute max number of frames to be queued, both with and without timestamping */ -constexpr size_t MAX_FRAMES_QUEUED = 1000; - -/* When using timestamping, start discarding the front of the queue once the queue - * is this full. Must be smaller than MAX_FRAMES_QUEUED. */ -constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; - -/* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ. */ -constexpr size_t NUM_FRAMES_PREBUFFERING = 10; -/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ -constexpr size_t MAX_FRAMES_OVERRUN = 900; - - Edi::Edi(const std::string& name) : + RemoteControllable(name), m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), m_sti_decoder(m_sti_writer, VERBOSE), m_name(name), m_stats(name) -{ } +{ + RC_ADD_PARAMETER(buffermanagement, + "Set type of buffer management to use [prebuffering, timestamped]"); + + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [24ms frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [24ms frames]"); +} Edi::~Edi() { m_running = false; @@ -116,7 +112,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { - m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; + m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering; if (not m_is_prebuffering) { etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; } @@ -153,13 +149,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) } else if (sti.frame.size() == size) { // Steady-state when everything works well - if (m_frames.size() > MAX_FRAMES_OVERRUN) { + if (m_frames.size() > m_max_frames_overrun) { m_stats.notifyOverrun(); /* If the buffer is too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; + size_t over_max = m_frames.size() - m_num_frames_prebuffering; while (over_max--) { EdiDecoder::sti_frame_t discard; @@ -234,7 +230,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc else { // Wait more, but erase the front of the frame queue to avoid // stalling on one frame with incorrect timestamp - if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { + if (m_frames.size() >= m_max_frames_overrun) { m_pending_sti_frame.frame.clear(); } m_stats.notifyUnderrun(); @@ -338,7 +334,10 @@ void Edi::m_run() const auto sti = m_sti_writer.getFrame(); if (not sti.frame.empty()) { - m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); + // We should not wait here, because we want the complete input buffering + // happening inside m_frames. Using the blocking function is only a protection + // against runaway memory usage if something goes wrong in the consumer. + m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2); work_done = true; } @@ -363,4 +362,56 @@ void Edi::close() m_udp_sock.close(); } + +void Edi::set_parameter(const std::string& parameter, const std::string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + m_max_frames_overrun = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_limit = atol(value.c_str()); + m_num_frames_prebuffering = new_limit; + } + else if (parameter == "buffermanagement") { + if (value == "prebuffering") { + setBufferManagement(Inputs::BufferManagement::Prebuffering); + } + else if (value == "timestamped") { + setBufferManagement(Inputs::BufferManagement::Timestamped); + } + else { + throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name()); + } + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } +} + +const std::string Edi::get_parameter(const std::string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_max_frames_overrun; + } + else if (parameter == "prebuffering") { + ss << m_num_frames_prebuffering; + } + else if (parameter == "buffermanagement") { + switch (getBufferManagement()) { + case Inputs::BufferManagement::Prebuffering: + ss << "prebuffering"; + break; + case Inputs::BufferManagement::Timestamped: + ss << "Timestamped"; + break; + } + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } + return ss.str(); +} + } -- cgit v1.2.3