From 0298076eea4f92685f9a01974261da41c2a01e5b Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 27 Aug 2019 11:02:23 +0200 Subject: EDI input: add new buffer management --- src/input/Edi.cpp | 158 +++++++++++++++++++++++++++++++++++++++++------------ src/input/Edi.h | 6 +- src/input/File.cpp | 17 +++++- src/input/File.h | 9 +-- src/input/Prbs.cpp | 8 ++- src/input/Prbs.h | 3 +- src/input/Udp.cpp | 18 ++++-- src/input/Udp.h | 5 +- src/input/Zmq.cpp | 21 ++++--- src/input/Zmq.h | 5 +- src/input/inputs.h | 27 ++++++++- 11 files changed, 215 insertions(+), 62 deletions(-) diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 2d82902..95fac53 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -42,8 +42,19 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; + +/* Absolute max number of frames to be queued, both with and without timestamping */ constexpr size_t MAX_FRAMES_QUEUED = 1000; +/* When using timestamping, start discarding the front of the queue once the queue + * is this full. Must be smaller than MAX_FRAMES_QUEUED. */ +constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; + +/* When not using timestamping, how many frames to prebuffer. + * TODO should be configurable as ZMQ. + */ +constexpr size_t NUM_FRAMES_PREBUFFERING = 10; + Edi::Edi() : m_tcp_receive_server(TCP_BLOCKSIZE), m_sti_writer(), @@ -93,43 +104,30 @@ void Edi::open(const std::string& name) m_thread = std::thread(&Edi::m_run, this); } -int Edi::readFrame(uint8_t* buffer, size_t size) +size_t Edi::readFrame(uint8_t *buffer, size_t size) { - if (m_pending_sti_frame.frame.empty()) { - m_frames.try_pop(m_pending_sti_frame); + EdiDecoder::sti_frame_t sti; + if (m_is_prebuffering) { + m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; + if (not m_is_prebuffering) { + etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + } + memset(buffer, 0, size * sizeof(*buffer)); + return 0; } - - if (not m_pending_sti_frame.frame.empty()) { + else if (not m_pending_sti_frame.frame.empty()) { if (m_pending_sti_frame.frame.size() != size) { etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << m_pending_sti_frame.frame.size() << " received, " << size << " requested"; memset(buffer, 0, size * sizeof(*buffer)); + return 0; } else { - const auto now = chrono::system_clock::now(); - - if (m_pending_sti_frame.timestamp.to_system_clock() <= now) { - etiLog.level(debug) << "EDI input take frame with TS " << - m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); - - std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); - m_pending_sti_frame.frame.clear(); - } - else { - etiLog.level(debug) << "EDI input skip frame with TS " << - m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); - } - } - } - - return size; - -#if 0 - EdiDecoder::sti_frame_t sti; - if (m_is_prebuffering) { - m_is_prebuffering = m_frames.size() < 10; - if (not m_is_prebuffering) { - etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + copy(m_pending_sti_frame.frame.begin(), + m_pending_sti_frame.frame.end(), + buffer); + m_pending_sti_frame.frame.clear(); + return size; } } else if (m_frames.try_pop(sti)) { @@ -138,21 +136,113 @@ int Edi::readFrame(uint8_t* buffer, size_t size) return 0; } else if (sti.frame.size() == size) { - std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer); + copy(sti.frame.cbegin(), sti.frame.cend(), buffer); + return size; } else { - etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() << - " received, " << size << " requested"; + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + sti.frame.size() << " received, " << size << " requested"; memset(buffer, 0, size * sizeof(*buffer)); + return 0; } } else { memset(buffer, 0, size * sizeof(*buffer)); m_is_prebuffering = true; etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + return 0; + } +} + +size_t Edi::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) +{ + if (m_pending_sti_frame.frame.empty()) { + m_frames.try_pop(m_pending_sti_frame); + } + + if (m_is_prebuffering) { + if (m_pending_sti_frame.frame.empty()) { + memset(buffer, 0, size); + return 0; + } + else if (m_pending_sti_frame.frame.size() == size) { + // readFrame gets called every 24ms, so we allow max 24ms + // difference between the input frame timestamp and the requested + // timestamp. + if (m_pending_sti_frame.timestamp.valid()) { + double ts_frame = (double)m_pending_sti_frame.timestamp.seconds + + (m_pending_sti_frame.timestamp.tsta / 16384000.0); + + double ts_req = (double)seconds + (tsta / 16384000.0); + + if (abs(ts_frame - ts_req) < 24e-3) { + m_is_prebuffering = false; + etiLog.level(warn) << "EDI input " << m_name << + " valid timestamp, pre-buffering complete"; + copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); + m_pending_sti_frame.frame.clear(); + return size; + } + else { + // Wait more, but erase the front of the frame queue to avoid + // stalling on one frame with incorrect timestamp + if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { + m_pending_sti_frame.frame.clear(); + } + memset(buffer, 0, size); + return 0; + } + } + else { + etiLog.level(debug) << "EDI input " << m_name << + " skipping frame without timestamp"; + m_pending_sti_frame.frame.clear(); + memset(buffer, 0, size); + return 0; + } + } + else { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + m_pending_sti_frame.frame.clear(); + memset(buffer, 0, size); + return 0; + } + } + else { + EdiDecoder::sti_frame_t sti_frame; + m_frames.try_pop(sti_frame); + if (sti_frame.frame.empty()) { + etiLog.level(warn) << "EDI input " << m_name << + " empty, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else if (not sti_frame.timestamp.valid()) { + etiLog.level(warn) << "EDI input " << m_name << + " invalid timestamp, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else { + double ts_frame = (double)sti_frame.timestamp.seconds + + (sti_frame.timestamp.tsta / 16384000.0); + + double ts_req = (double)seconds + (tsta / 16384000.0); + + if (abs(ts_frame - ts_req) > 24e-3) { + m_is_prebuffering = true; + etiLog.level(warn) << "EDI input " << m_name << + " timestamp out of bounds, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else { + copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer); + return size; + } + } } - return size; -#endif } void Edi::m_run() diff --git a/src/input/Edi.h b/src/input/Edi.h index 9542e28..bf65ac9 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -53,7 +53,8 @@ class Edi : public InputBase { ~Edi(); virtual void open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta); virtual int setBitrate(int bitrate); virtual void close(); @@ -72,8 +73,11 @@ class Edi : public InputBase { std::thread m_thread; std::atomic m_running = ATOMIC_VAR_INIT(false); ThreadsafeQueue m_frames; + + // Used in timestamp-based buffer management EdiDecoder::sti_frame_t m_pending_sti_frame; + // Used in prebuffering-based buffer management bool m_is_prebuffering = true; std::string m_name; diff --git a/src/input/File.cpp b/src/input/File.cpp index 99dd68d..9c36263 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -74,6 +74,14 @@ void FileBase::open(const std::string& name) } } +size_t FileBase::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) +{ + // Will not be implemented, as there is no obvious way to carry timestamps + // in files. + memset(buffer, 0, size); + return 0; +} + int FileBase::setBitrate(int bitrate) { if (bitrate <= 0) { @@ -181,7 +189,7 @@ ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size) return size; } -int MPEGFile::readFrame(uint8_t* buffer, size_t size) +size_t MPEGFile::readFrame(uint8_t *buffer, size_t size) { int result; bool do_rewind = false; @@ -274,6 +282,9 @@ MUTE_SUBCHANNEL: } } } + + // TODO this is probably wrong, because it should return + // the number of bytes written. return result; } @@ -296,7 +307,7 @@ int MPEGFile::setBitrate(int bitrate) return bitrate; } -int RawFile::readFrame(uint8_t* buffer, size_t size) +size_t RawFile::readFrame(uint8_t *buffer, size_t size) { return readFromFile(buffer, size); } @@ -306,7 +317,7 @@ PacketFile::PacketFile(bool enhancedPacketMode) m_enhancedPacketEnabled = enhancedPacketMode; } -int PacketFile::readFrame(uint8_t* buffer, size_t size) +size_t PacketFile::readFrame(uint8_t *buffer, size_t size) { size_t written = 0; int length; diff --git a/src/input/File.h b/src/input/File.h index 429a1ce..3e96ad4 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -37,7 +37,8 @@ namespace Inputs { class FileBase : public InputBase { public: virtual void open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size) = 0; + virtual size_t readFrame(uint8_t *buffer, size_t size) = 0; + virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta); virtual int setBitrate(int bitrate); virtual void close(); @@ -63,7 +64,7 @@ class FileBase : public InputBase { class MPEGFile : public FileBase { public: - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); virtual int setBitrate(int bitrate); private: @@ -72,13 +73,13 @@ class MPEGFile : public FileBase { class RawFile : public FileBase { public: - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); }; class PacketFile : public FileBase { public: PacketFile(bool enhancedPacketMode); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); protected: std::array m_packetData; diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp index 9f4f5dd..148e919 100644 --- a/src/input/Prbs.cpp +++ b/src/input/Prbs.cpp @@ -75,7 +75,7 @@ void Prbs::open(const string& name) rewind(); } -int Prbs::readFrame(uint8_t* buffer, size_t size) +size_t Prbs::readFrame(uint8_t *buffer, size_t size) { for (size_t i = 0; i < size; ++i) { buffer[i] = m_prbs.step(); @@ -84,6 +84,12 @@ int Prbs::readFrame(uint8_t* buffer, size_t size) return size; } +size_t Prbs::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) +{ + memset(buffer, 0, size); + return 0; +} + int Prbs::setBitrate(int bitrate) { if (bitrate <= 0) { diff --git a/src/input/Prbs.h b/src/input/Prbs.h index b76ffc7..600fd89 100644 --- a/src/input/Prbs.h +++ b/src/input/Prbs.h @@ -38,7 +38,8 @@ namespace Inputs { class Prbs : public InputBase { public: virtual void open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta); virtual int setBitrate(int bitrate); virtual void close(); diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index b909c5a..5ddc366 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -86,7 +86,7 @@ void Udp::openUdpSocket(const std::string& endpoint) etiLog.level(info) << "Opened UDP port " << address << ":" << port; } -int Udp::readFrame(uint8_t* buffer, size_t size) +size_t Udp::readFrame(uint8_t *buffer, size_t size) { // Regardless of buffer contents, try receiving data. auto packet = m_sock.receive(32768); @@ -97,12 +97,19 @@ int Udp::readFrame(uint8_t* buffer, size_t size) // in any case write the buffer if (m_buffer.size() >= (size_t)size) { std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer); + return size; } else { memset(buffer, 0x0, size); + return 0; } +} - return size; +size_t Udp::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) +{ + // Maybe there's a way to carry timestamps, but we don't need it. + memset(buffer, 0x0, size); + return 0; } int Udp::setBitrate(int bitrate) @@ -278,7 +285,7 @@ void Sti_d_Rtp::receive_packet() } } -int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) +size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size) { // Make sure we fill faster than we consume in case there // are pending packets. @@ -287,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) if (m_queue.empty()) { memset(buffer, 0x0, size); + return 0; } else if (m_queue.front().size() != size) { etiLog.level(warn) << "Invalid input data size for STI " << m_name << " : RX " << m_queue.front().size() << " expected " << size; memset(buffer, 0x0, size); m_queue.pop_front(); + return 0; } else { copy(m_queue.front().begin(), m_queue.front().end(), buffer); m_queue.pop_front(); + return size; } - - return 0; } } diff --git a/src/input/Udp.h b/src/input/Udp.h index 58e0dfd..81956f9 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -41,7 +41,8 @@ namespace Inputs { class Udp : public InputBase { public: virtual void open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta); virtual int setBitrate(int bitrate); virtual void close(); @@ -68,7 +69,7 @@ class Sti_d_Rtp : public Udp { public: virtual void open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); private: void receive_packet(void); diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index c5cc1b2..352c95d 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 Matthias P. Braendli + Copyright (C) 2019 Matthias P. Braendli http://www.opendigitalradio.org ZeroMQ input. see www.zeromq.org for more info @@ -247,16 +247,14 @@ int ZmqBase::setBitrate(int bitrate) } // size corresponds to a frame size. It is constant for a given bitrate -int ZmqBase::readFrame(uint8_t* buffer, size_t size) +size_t ZmqBase::readFrame(uint8_t* buffer, size_t size) { - int rc; - /* We must *always* read data from the ZMQ socket, * to make sure that ZMQ internal buffers are emptied * quickly. It's the only way to control the buffers * of the whole path from encoder to our frame_buffer. */ - rc = readFromSocket(size); + const auto readsize = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ if (m_frame_buffer.size() >= m_config.buffer_size) { @@ -297,10 +295,10 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) } if (m_prebuf_current > 0) { - if (rc > 0) + if (readsize > 0) m_prebuf_current--; if (m_prebuf_current == 0) - etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", + etiLog.log(info, "inputZMQ %s input pre-buffering complete", m_rc_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ @@ -313,7 +311,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) m_stats.notifyBuffer(m_frame_buffer.size() * size); if (m_frame_buffer.empty()) { - etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", + etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering", m_rc_name.c_str()); // reset prebuffering m_prebuf_current = m_config.prebuffering; @@ -333,6 +331,13 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) } } +size_t ZmqBase::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) +{ + // TODO add timestamps into the metadata and implement this + memset(buffer, 0, size); + return 0; +} + /******** MPEG input *******/ diff --git a/src/input/Zmq.h b/src/input/Zmq.h index 2b42872..899d6f2 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 Matthias P. Braendli + Copyright (C) 2019 Matthias P. Braendli http://www.opendigitalradio.org ZeroMQ input. see www.zeromq.org for more info @@ -182,7 +182,8 @@ class ZmqBase : public InputBase, public RemoteControllable { } virtual void open(const std::string& inputUri); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta); virtual int setBitrate(int bitrate); virtual void close(); diff --git a/src/input/inputs.h b/src/input/inputs.h index 9a396e0..b4bb00b 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -41,7 +41,32 @@ class InputBase { /* Throws runtime_error or invalid_argument on failure */ virtual void open(const std::string& name) = 0; - virtual int readFrame(uint8_t* buffer, size_t size) = 0; + /* read a frame from the input. Buffer management is either not necessary + * (e.g. File input) or done with pre-buffering (network-based inputs). + * + * This ignores timestamps. All inputs support this. + * + * Returns number of data bytes written to the buffer. May clear the buffer + * if no data bytes available, in which case it will return 0. + * + * Returns negative on error. + */ + virtual size_t readFrame(uint8_t *buffer, size_t size) = 0; + + /* read a frame from the input, taking into account timestamp. The timestamp of the data + * returned is not more recent than the timestamp specified in seconds and tsta. + * + * seconds and tsta are in the format used by EDI. + * + * Returns number of data bytes written to the buffer. May clear the buffer + * if no data bytes available, in which case it will return 0. + * + * Returns negative on error. + * + * Calling this function on inputs that do not support timestamps returns 0. This allows + * changing the buffer management at runtime without risking an crash due to an exception. + */ + virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) = 0; /* Returns the effectively used bitrate, or throws invalid_argument on invalid bitrate */ virtual int setBitrate(int bitrate) = 0; -- cgit v1.2.3