aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-08-27 11:02:23 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-08-27 11:02:23 +0200
commit0298076eea4f92685f9a01974261da41c2a01e5b (patch)
treef72831cd212da64c1a3ea124bb9b590ced8558f2
parentcac39dedee89d62ebf5d0135b84ccaa2e387a7cb (diff)
downloaddabmux-0298076eea4f92685f9a01974261da41c2a01e5b.tar.gz
dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.tar.bz2
dabmux-0298076eea4f92685f9a01974261da41c2a01e5b.zip
EDI input: add new buffer management
-rw-r--r--src/input/Edi.cpp158
-rw-r--r--src/input/Edi.h6
-rw-r--r--src/input/File.cpp17
-rw-r--r--src/input/File.h9
-rw-r--r--src/input/Prbs.cpp8
-rw-r--r--src/input/Prbs.h3
-rw-r--r--src/input/Udp.cpp18
-rw-r--r--src/input/Udp.h5
-rw-r--r--src/input/Zmq.cpp21
-rw-r--r--src/input/Zmq.h5
-rw-r--r--src/input/inputs.h27
11 files changed, 215 insertions, 62 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 2d82902..95fac53 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -42,8 +42,19 @@ namespace Inputs {
constexpr bool VERBOSE = false;
constexpr size_t TCP_BLOCKSIZE = 2048;
+
+/* Absolute max number of frames to be queued, both with and without timestamping */
constexpr size_t MAX_FRAMES_QUEUED = 1000;
+/* When using timestamping, start discarding the front of the queue once the queue
+ * is this full. Must be smaller than MAX_FRAMES_QUEUED. */
+constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;
+
+/* When not using timestamping, how many frames to prebuffer.
+ * TODO should be configurable as ZMQ.
+ */
+constexpr size_t NUM_FRAMES_PREBUFFERING = 10;
+
Edi::Edi() :
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(),
@@ -93,43 +104,30 @@ void Edi::open(const std::string& name)
m_thread = std::thread(&Edi::m_run, this);
}
-int Edi::readFrame(uint8_t* buffer, size_t size)
+size_t Edi::readFrame(uint8_t *buffer, size_t size)
{
- if (m_pending_sti_frame.frame.empty()) {
- m_frames.try_pop(m_pending_sti_frame);
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
}
-
- if (not m_pending_sti_frame.frame.empty()) {
+ else if (not m_pending_sti_frame.frame.empty()) {
if (m_pending_sti_frame.frame.size() != size) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
}
else {
- const auto now = chrono::system_clock::now();
-
- if (m_pending_sti_frame.timestamp.to_system_clock() <= now) {
- etiLog.level(debug) << "EDI input take frame with TS " <<
- m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
-
- std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
- m_pending_sti_frame.frame.clear();
- }
- else {
- etiLog.level(debug) << "EDI input skip frame with TS " <<
- m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
- }
- }
- }
-
- return size;
-
-#if 0
- EdiDecoder::sti_frame_t sti;
- if (m_is_prebuffering) {
- m_is_prebuffering = m_frames.size() < 10;
- if (not m_is_prebuffering) {
- etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ copy(m_pending_sti_frame.frame.begin(),
+ m_pending_sti_frame.frame.end(),
+ buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
}
}
else if (m_frames.try_pop(sti)) {
@@ -138,21 +136,113 @@ int Edi::readFrame(uint8_t* buffer, size_t size)
return 0;
}
else if (sti.frame.size() == size) {
- std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ return size;
}
else {
- etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<
- " received, " << size << " requested";
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ sti.frame.size() << " received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
}
}
else {
memset(buffer, 0, size * sizeof(*buffer));
m_is_prebuffering = true;
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ return 0;
+ }
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ if (m_is_prebuffering) {
+ if (m_pending_sti_frame.frame.empty()) {
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (m_pending_sti_frame.frame.size() == size) {
+ // readFrame gets called every 24ms, so we allow max 24ms
+ // difference between the input frame timestamp and the requested
+ // timestamp.
+ if (m_pending_sti_frame.timestamp.valid()) {
+ double ts_frame = (double)m_pending_sti_frame.timestamp.seconds +
+ (m_pending_sti_frame.timestamp.tsta / 16384000.0);
+
+ double ts_req = (double)seconds + (tsta / 16384000.0);
+
+ if (abs(ts_frame - ts_req) < 24e-3) {
+ m_is_prebuffering = false;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " valid timestamp, pre-buffering complete";
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ else {
+ // Wait more, but erase the front of the frame queue to avoid
+ // stalling on one frame with incorrect timestamp
+ if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {
+ m_pending_sti_frame.frame.clear();
+ }
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name <<
+ " skipping frame without timestamp";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ EdiDecoder::sti_frame_t sti_frame;
+ m_frames.try_pop(sti_frame);
+ if (sti_frame.frame.empty()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " empty, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (not sti_frame.timestamp.valid()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " invalid timestamp, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ double ts_frame = (double)sti_frame.timestamp.seconds +
+ (sti_frame.timestamp.tsta / 16384000.0);
+
+ double ts_req = (double)seconds + (tsta / 16384000.0);
+
+ if (abs(ts_frame - ts_req) > 24e-3) {
+ m_is_prebuffering = true;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " timestamp out of bounds, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer);
+ return size;
+ }
+ }
}
- return size;
-#endif
}
void Edi::m_run()
diff --git a/src/input/Edi.h b/src/input/Edi.h
index 9542e28..bf65ac9 100644
--- a/src/input/Edi.h
+++ b/src/input/Edi.h
@@ -53,7 +53,8 @@ class Edi : public InputBase {
~Edi();
virtual void open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta);
virtual int setBitrate(int bitrate);
virtual void close();
@@ -72,8 +73,11 @@ class Edi : public InputBase {
std::thread m_thread;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+
+ // Used in timestamp-based buffer management
EdiDecoder::sti_frame_t m_pending_sti_frame;
+ // Used in prebuffering-based buffer management
bool m_is_prebuffering = true;
std::string m_name;
diff --git a/src/input/File.cpp b/src/input/File.cpp
index 99dd68d..9c36263 100644
--- a/src/input/File.cpp
+++ b/src/input/File.cpp
@@ -74,6 +74,14 @@ void FileBase::open(const std::string& name)
}
}
+size_t FileBase::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta)
+{
+ // Will not be implemented, as there is no obvious way to carry timestamps
+ // in files.
+ memset(buffer, 0, size);
+ return 0;
+}
+
int FileBase::setBitrate(int bitrate)
{
if (bitrate <= 0) {
@@ -181,7 +189,7 @@ ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size)
return size;
}
-int MPEGFile::readFrame(uint8_t* buffer, size_t size)
+size_t MPEGFile::readFrame(uint8_t *buffer, size_t size)
{
int result;
bool do_rewind = false;
@@ -274,6 +282,9 @@ MUTE_SUBCHANNEL:
}
}
}
+
+ // TODO this is probably wrong, because it should return
+ // the number of bytes written.
return result;
}
@@ -296,7 +307,7 @@ int MPEGFile::setBitrate(int bitrate)
return bitrate;
}
-int RawFile::readFrame(uint8_t* buffer, size_t size)
+size_t RawFile::readFrame(uint8_t *buffer, size_t size)
{
return readFromFile(buffer, size);
}
@@ -306,7 +317,7 @@ PacketFile::PacketFile(bool enhancedPacketMode)
m_enhancedPacketEnabled = enhancedPacketMode;
}
-int PacketFile::readFrame(uint8_t* buffer, size_t size)
+size_t PacketFile::readFrame(uint8_t *buffer, size_t size)
{
size_t written = 0;
int length;
diff --git a/src/input/File.h b/src/input/File.h
index 429a1ce..3e96ad4 100644
--- a/src/input/File.h
+++ b/src/input/File.h
@@ -37,7 +37,8 @@ namespace Inputs {
class FileBase : public InputBase {
public:
virtual void open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ virtual size_t readFrame(uint8_t *buffer, size_t size) = 0;
+ virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta);
virtual int setBitrate(int bitrate);
virtual void close();
@@ -63,7 +64,7 @@ class FileBase : public InputBase {
class MPEGFile : public FileBase {
public:
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
virtual int setBitrate(int bitrate);
private:
@@ -72,13 +73,13 @@ class MPEGFile : public FileBase {
class RawFile : public FileBase {
public:
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
};
class PacketFile : public FileBase {
public:
PacketFile(bool enhancedPacketMode);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
protected:
std::array<uint8_t, 96> m_packetData;
diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp
index 9f4f5dd..148e919 100644
--- a/src/input/Prbs.cpp
+++ b/src/input/Prbs.cpp
@@ -75,7 +75,7 @@ void Prbs::open(const string& name)
rewind();
}
-int Prbs::readFrame(uint8_t* buffer, size_t size)
+size_t Prbs::readFrame(uint8_t *buffer, size_t size)
{
for (size_t i = 0; i < size; ++i) {
buffer[i] = m_prbs.step();
@@ -84,6 +84,12 @@ int Prbs::readFrame(uint8_t* buffer, size_t size)
return size;
}
+size_t Prbs::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta)
+{
+ memset(buffer, 0, size);
+ return 0;
+}
+
int Prbs::setBitrate(int bitrate)
{
if (bitrate <= 0) {
diff --git a/src/input/Prbs.h b/src/input/Prbs.h
index b76ffc7..600fd89 100644
--- a/src/input/Prbs.h
+++ b/src/input/Prbs.h
@@ -38,7 +38,8 @@ namespace Inputs {
class Prbs : public InputBase {
public:
virtual void open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta);
virtual int setBitrate(int bitrate);
virtual void close();
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index b909c5a..5ddc366 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -86,7 +86,7 @@ void Udp::openUdpSocket(const std::string& endpoint)
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
-int Udp::readFrame(uint8_t* buffer, size_t size)
+size_t Udp::readFrame(uint8_t *buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
auto packet = m_sock.receive(32768);
@@ -97,12 +97,19 @@ int Udp::readFrame(uint8_t* buffer, size_t size)
// in any case write the buffer
if (m_buffer.size() >= (size_t)size) {
std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer);
+ return size;
}
else {
memset(buffer, 0x0, size);
+ return 0;
}
+}
- return size;
+size_t Udp::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta)
+{
+ // Maybe there's a way to carry timestamps, but we don't need it.
+ memset(buffer, 0x0, size);
+ return 0;
}
int Udp::setBitrate(int bitrate)
@@ -278,7 +285,7 @@ void Sti_d_Rtp::receive_packet()
}
}
-int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
+size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size)
{
// Make sure we fill faster than we consume in case there
// are pending packets.
@@ -287,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
if (m_queue.empty()) {
memset(buffer, 0x0, size);
+ return 0;
}
else if (m_queue.front().size() != size) {
etiLog.level(warn) << "Invalid input data size for STI " << m_name <<
" : RX " << m_queue.front().size() << " expected " << size;
memset(buffer, 0x0, size);
m_queue.pop_front();
+ return 0;
}
else {
copy(m_queue.front().begin(), m_queue.front().end(), buffer);
m_queue.pop_front();
+ return size;
}
-
- return 0;
}
}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index 58e0dfd..81956f9 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -41,7 +41,8 @@ namespace Inputs {
class Udp : public InputBase {
public:
virtual void open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta);
virtual int setBitrate(int bitrate);
virtual void close();
@@ -68,7 +69,7 @@ class Sti_d_Rtp : public Udp {
public:
virtual void open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
private:
void receive_packet(void);
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index c5cc1b2..352c95d 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018 Matthias P. Braendli
+ Copyright (C) 2019 Matthias P. Braendli
http://www.opendigitalradio.org
ZeroMQ input. see www.zeromq.org for more info
@@ -247,16 +247,14 @@ int ZmqBase::setBitrate(int bitrate)
}
// size corresponds to a frame size. It is constant for a given bitrate
-int ZmqBase::readFrame(uint8_t* buffer, size_t size)
+size_t ZmqBase::readFrame(uint8_t* buffer, size_t size)
{
- int rc;
-
/* We must *always* read data from the ZMQ socket,
* to make sure that ZMQ internal buffers are emptied
* quickly. It's the only way to control the buffers
* of the whole path from encoder to our frame_buffer.
*/
- rc = readFromSocket(size);
+ const auto readsize = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
if (m_frame_buffer.size() >= m_config.buffer_size) {
@@ -297,10 +295,10 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
}
if (m_prebuf_current > 0) {
- if (rc > 0)
+ if (readsize > 0)
m_prebuf_current--;
if (m_prebuf_current == 0)
- etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
+ etiLog.log(info, "inputZMQ %s input pre-buffering complete",
m_rc_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
@@ -313,7 +311,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
m_stats.notifyBuffer(m_frame_buffer.size() * size);
if (m_frame_buffer.empty()) {
- etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
+ etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering",
m_rc_name.c_str());
// reset prebuffering
m_prebuf_current = m_config.prebuffering;
@@ -333,6 +331,13 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
}
}
+size_t ZmqBase::readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta)
+{
+ // TODO add timestamps into the metadata and implement this
+ memset(buffer, 0, size);
+ return 0;
+}
+
/******** MPEG input *******/
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
index 2b42872..899d6f2 100644
--- a/src/input/Zmq.h
+++ b/src/input/Zmq.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2017 Matthias P. Braendli
+ Copyright (C) 2019 Matthias P. Braendli
http://www.opendigitalradio.org
ZeroMQ input. see www.zeromq.org for more info
@@ -182,7 +182,8 @@ class ZmqBase : public InputBase, public RemoteControllable {
}
virtual void open(const std::string& inputUri);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta);
virtual int setBitrate(int bitrate);
virtual void close();
diff --git a/src/input/inputs.h b/src/input/inputs.h
index 9a396e0..b4bb00b 100644
--- a/src/input/inputs.h
+++ b/src/input/inputs.h
@@ -41,7 +41,32 @@ class InputBase {
/* Throws runtime_error or invalid_argument on failure */
virtual void open(const std::string& name) = 0;
- virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ /* read a frame from the input. Buffer management is either not necessary
+ * (e.g. File input) or done with pre-buffering (network-based inputs).
+ *
+ * This ignores timestamps. All inputs support this.
+ *
+ * Returns number of data bytes written to the buffer. May clear the buffer
+ * if no data bytes available, in which case it will return 0.
+ *
+ * Returns negative on error.
+ */
+ virtual size_t readFrame(uint8_t *buffer, size_t size) = 0;
+
+ /* read a frame from the input, taking into account timestamp. The timestamp of the data
+ * returned is not more recent than the timestamp specified in seconds and tsta.
+ *
+ * seconds and tsta are in the format used by EDI.
+ *
+ * Returns number of data bytes written to the buffer. May clear the buffer
+ * if no data bytes available, in which case it will return 0.
+ *
+ * Returns negative on error.
+ *
+ * Calling this function on inputs that do not support timestamps returns 0. This allows
+ * changing the buffer management at runtime without risking an crash due to an exception.
+ */
+ virtual size_t readFrame(uint8_t *buffer, size_t size, uint32_t seconds, uint32_t tsta) = 0;
/* Returns the effectively used bitrate, or throws invalid_argument on invalid bitrate */
virtual int setBitrate(int bitrate) = 0;