diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-23 14:44:45 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-23 14:44:45 +0200 |
commit | 907534bccee046c0499e9f936873c229b247ef95 (patch) | |
tree | 07f6995d0c8614f80a920f86660809b2fbd79b2b /src | |
parent | 015427d9e74f34dc7d0f7fbad4ad1eaad6537cce (diff) | |
download | dabmux-907534bccee046c0499e9f936873c229b247ef95.tar.gz dabmux-907534bccee046c0499e9f936873c229b247ef95.tar.bz2 dabmux-907534bccee046c0499e9f936873c229b247ef95.zip |
Add buffermanagement setting to RC
Diffstat (limited to 'src')
-rw-r--r-- | src/ConfigParser.cpp | 8 | ||||
-rw-r--r-- | src/DabMultiplexer.cpp | 12 | ||||
-rw-r--r-- | src/MuxElements.cpp | 11 | ||||
-rw-r--r-- | src/MuxElements.h | 12 | ||||
-rw-r--r-- | src/input/Edi.cpp | 91 | ||||
-rw-r--r-- | src/input/Edi.h | 25 | ||||
-rw-r--r-- | src/input/inputs.h | 15 |
7 files changed, 130 insertions, 44 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2fb375b..d45cfef 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -956,7 +956,9 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, } } else if (proto == "edi") { - subchan->input = make_shared<Inputs::Edi>(subchanuid); + auto inedi = make_shared<Inputs::Edi>(subchanuid); + rcs.enrol(inedi.get()); + subchan->input = inedi; } else if (proto == "stp") { subchan->input = make_shared<Inputs::Sti_d_Rtp>(); @@ -1023,10 +1025,10 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, const string bufferManagement = pt.get("buffer-management", "prebuffering"); if (bufferManagement == "prebuffering") { - subchan->bufferManagement = BufferManagement::Prebuffering; + subchan->input->setBufferManagement(Inputs::BufferManagement::Prebuffering); } else if (bufferManagement == "timestamped") { - subchan->bufferManagement = BufferManagement::Timestamped; + subchan->input->setBufferManagement(Inputs::BufferManagement::Timestamped); } else { throw runtime_error("Subchannel with uid " + subchanuid + " has invalid buffer-management !"); diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 549850a..9bd3c3f 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -592,17 +592,9 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs edi::TagESTn& tag = edi_subchannelToTag[subchannel.get()]; int sizeSubchannel = subchannel->getSizeByte(); - int result = -1; - switch (subchannel->bufferManagement) { - case BufferManagement::Prebuffering: - result = subchannel->input->readFrame(&etiFrame[index], sizeSubchannel); - break; - case BufferManagement::Timestamped: - // no need to check enableTist because we always increment the timestamp - result = subchannel->input->readFrame(&etiFrame[index], + // no need to check enableTist because we always increment the timestamp + int result = subchannel->readFrame(&etiFrame[index], sizeSubchannel, edi_time + m_tist_offset, tai_utc_offset, timestamp); - break; - } if (result < 0) { etiLog.log(info, diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp index ad1fcd4..81466a8 100644 --- a/src/MuxElements.cpp +++ b/src/MuxElements.cpp @@ -784,6 +784,17 @@ unsigned short DabSubchannel::getSizeDWord() const return (bitrate * 3) >> 3; } +size_t DabSubchannel::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ + switch (input->getBufferManagement()) { + case Inputs::BufferManagement::Prebuffering: + return input->readFrame(buffer, size); + case Inputs::BufferManagement::Timestamped: + return input->readFrame(buffer, size, seconds, utco, tsta); + } + throw logic_error("Unhandled case"); +} + LinkageSet::LinkageSet(const std::string& name, uint16_t lsn, bool active, diff --git a/src/MuxElements.h b/src/MuxElements.h index f691093..0f7e621 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -338,14 +338,6 @@ struct dabProtection { }; }; -enum class BufferManagement { - // Use a buffer in the input that doesn't consider timestamps - Prebuffering, - - // Buffer incoming data until a given timestamp is reached - Timestamped, -}; - class DabSubchannel { public: @@ -364,10 +356,12 @@ public: // Calculate subchannel size in number of uint64_t unsigned short getSizeDWord(void) const; + // Read from the input, using the correct buffer management + size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); + std::string uid; std::string inputUri; - BufferManagement bufferManagement = BufferManagement::Prebuffering; std::shared_ptr<Inputs::InputBase> input; unsigned char id = 0; subchannel_type_t type = subchannel_type_t::DABAudio; diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index f6d4c07..98e8e9c 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -44,27 +44,23 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; -/* Absolute max number of frames to be queued, both with and without timestamping */ -constexpr size_t MAX_FRAMES_QUEUED = 1000; - -/* When using timestamping, start discarding the front of the queue once the queue - * is this full. Must be smaller than MAX_FRAMES_QUEUED. */ -constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; - -/* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ. */ -constexpr size_t NUM_FRAMES_PREBUFFERING = 10; -/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ -constexpr size_t MAX_FRAMES_OVERRUN = 900; - - Edi::Edi(const std::string& name) : + RemoteControllable(name), m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), m_sti_decoder(m_sti_writer, VERBOSE), m_name(name), m_stats(name) -{ } +{ + RC_ADD_PARAMETER(buffermanagement, + "Set type of buffer management to use [prebuffering, timestamped]"); + + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [24ms frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [24ms frames]"); +} Edi::~Edi() { m_running = false; @@ -116,7 +112,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { - m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; + m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering; if (not m_is_prebuffering) { etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; } @@ -153,13 +149,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) } else if (sti.frame.size() == size) { // Steady-state when everything works well - if (m_frames.size() > MAX_FRAMES_OVERRUN) { + if (m_frames.size() > m_max_frames_overrun) { m_stats.notifyOverrun(); /* If the buffer is too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; + size_t over_max = m_frames.size() - m_num_frames_prebuffering; while (over_max--) { EdiDecoder::sti_frame_t discard; @@ -234,7 +230,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc else { // Wait more, but erase the front of the frame queue to avoid // stalling on one frame with incorrect timestamp - if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { + if (m_frames.size() >= m_max_frames_overrun) { m_pending_sti_frame.frame.clear(); } m_stats.notifyUnderrun(); @@ -338,7 +334,10 @@ void Edi::m_run() const auto sti = m_sti_writer.getFrame(); if (not sti.frame.empty()) { - m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); + // We should not wait here, because we want the complete input buffering + // happening inside m_frames. Using the blocking function is only a protection + // against runaway memory usage if something goes wrong in the consumer. + m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2); work_done = true; } @@ -363,4 +362,56 @@ void Edi::close() m_udp_sock.close(); } + +void Edi::set_parameter(const std::string& parameter, const std::string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + m_max_frames_overrun = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_limit = atol(value.c_str()); + m_num_frames_prebuffering = new_limit; + } + else if (parameter == "buffermanagement") { + if (value == "prebuffering") { + setBufferManagement(Inputs::BufferManagement::Prebuffering); + } + else if (value == "timestamped") { + setBufferManagement(Inputs::BufferManagement::Timestamped); + } + else { + throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name()); + } + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } +} + +const std::string Edi::get_parameter(const std::string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_max_frames_overrun; + } + else if (parameter == "prebuffering") { + ss << m_num_frames_prebuffering; + } + else if (parameter == "buffermanagement") { + switch (getBufferManagement()) { + case Inputs::BufferManagement::Prebuffering: + ss << "prebuffering"; + break; + case Inputs::BufferManagement::Timestamped: + ss << "Timestamped"; + break; + } + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } + return ss.str(); +} + } diff --git a/src/input/Edi.h b/src/input/Edi.h index 56bbf65..0a44139 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -46,7 +46,7 @@ namespace Inputs { * * This way, the EDI decoding happens in a separate thread. */ -class Edi : public InputBase { +class Edi : public InputBase, public RemoteControllable { public: Edi(const std::string& name); Edi(const Edi&) = delete; @@ -59,6 +59,10 @@ class Edi : public InputBase { virtual int setBitrate(int bitrate); virtual void close(); + /* Remote control */ + virtual void set_parameter(const std::string& parameter, const std::string& value); + virtual const std::string get_parameter(const std::string& parameter) const; + protected: void m_run(); @@ -75,12 +79,29 @@ class Edi : public InputBase { std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; + std::mutex m_rc_params_mutex; + // InputBase defines bufferManagement, which must also be guarded by that mutex + // Used in timestamp-based buffer management EdiDecoder::sti_frame_t m_pending_sti_frame; - // Used in prebuffering-based buffer management + // State variable used in prebuffering-based buffer management bool m_is_prebuffering = true; + /* When using prebuffering, consider the buffer to be full on the + * receive side if it's above the overrun threshold. + * + * When using timestamping, start discarding the front of the queue once the queue + * is this full. Must be smaller than m_max_frames_queued. + * + * Parameter 'buffer' inside RC. */ + std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000); + + /* When not using timestamping, how many frames to prebuffer. + * Parameter 'prebuffering' inside RC. */ + std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10); + + std::string m_name; InputStat m_stats; }; diff --git a/src/input/inputs.h b/src/input/inputs.h index b99a88f..5d4fc60 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -35,6 +35,15 @@ namespace Inputs { +enum class BufferManagement { + // Use a buffer in the input that doesn't consider timestamps + Prebuffering, + + // Buffer incoming data until a given timestamp is reached + Timestamped, +}; + + /* New input object base */ class InputBase { public: @@ -73,8 +82,14 @@ class InputBase { virtual void close() = 0; virtual ~InputBase() {} + + void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; } + BufferManagement getBufferManagement() const { return m_bufferManagement; } + protected: InputBase() {} + + std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering); }; }; |