summaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
Diffstat (limited to 'src/input')
-rw-r--r--src/input/Edi.cpp427
-rw-r--r--src/input/Edi.h126
-rw-r--r--src/input/File.cpp37
-rw-r--r--src/input/File.h13
-rw-r--r--src/input/Prbs.cpp18
-rw-r--r--src/input/Prbs.h7
-rw-r--r--src/input/Udp.cpp89
-rw-r--r--src/input/Udp.h15
-rw-r--r--src/input/Zmq.cpp34
-rw-r--r--src/input/Zmq.h13
-rw-r--r--src/input/inputs.h54
11 files changed, 721 insertions, 112 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
new file mode 100644
index 0000000..b5301d2
--- /dev/null
+++ b/src/input/Edi.cpp
@@ -0,0 +1,427 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Edi.h"
+
+#include <regex>
+#include <chrono>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <cmath>
+#include <cstdlib>
+#include <cerrno>
+#include <climits>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+constexpr bool VERBOSE = false;
+constexpr size_t TCP_BLOCKSIZE = 2048;
+
+Edi::Edi(const std::string& name, const dab_input_edi_config_t& config) :
+ RemoteControllable(name),
+ m_tcp_receive_server(TCP_BLOCKSIZE),
+ m_sti_writer(bind(&Edi::m_new_sti_frame_callback, this, placeholders::_1)),
+ m_sti_decoder(m_sti_writer, VERBOSE),
+ m_max_frames_overrun(config.buffer_size),
+ m_num_frames_prebuffering(config.prebuffering),
+ m_name(name),
+ m_stats(name)
+{
+ RC_ADD_PARAMETER(buffermanagement,
+ "Set type of buffer management to use [prebuffering, timestamped]");
+
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [24ms frames]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [24ms frames]");
+
+ RC_ADD_PARAMETER(tistdelay, "TIST delay to add [ms]");
+}
+
+Edi::~Edi() {
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void Edi::open(const std::string& name)
+{
+ const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_tcp("tcp://(.*):([0-9]+)");
+
+ lock_guard<mutex> lock(m_mutex);
+
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+
+ std::smatch m;
+ if (std::regex_match(name, m, re_udp)) {
+ const int udp_port = std::stoi(m[1].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port);
+ m_udp_sock.setBlocking(false);
+ // TODO multicast
+ }
+ else if (std::regex_match(name, m, re_tcp)) {
+ m_input_used = InputUsed::TCP;
+ const string addr = m[1].str();
+ const int tcp_port = std::stoi(m[2].str());
+ m_tcp_receive_server.start(tcp_port, addr);
+ }
+ else {
+ throw runtime_error("Cannot parse EDI input URI");
+ }
+
+ m_stats.registerAtServer();
+
+ m_running = true;
+ m_thread = std::thread(&Edi::m_run, this);
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size)
+{
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frames.size() * size);
+
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ else if (not m_pending_sti_frame.frame.empty()) {
+ // Can only happen when switching from timestamp-based buffer management!
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right);
+
+ copy(m_pending_sti_frame.frame.begin(),
+ m_pending_sti_frame.frame.end(),
+ buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ }
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
+ etiLog.level(debug) << "EDI input " << m_name << " empty frame";
+ return 0;
+ }
+ else if (sti.frame.size() == size) {
+ // Steady-state when everything works well
+ if (m_frames.size() > m_max_frames_overrun) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length. */
+ size_t over_max = m_frames.size() - m_num_frames_prebuffering;
+
+ while (over_max--) {
+ EdiDecoder::sti_frame_t discard;
+ m_frames.try_pop(discard);
+ }
+ }
+
+ if (not sti.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti.version_data.version,
+ sti.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right);
+
+ copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ return size;
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ sti.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ }
+ else {
+ memset(buffer, 0, size * sizeof(*buffer));
+ m_is_prebuffering = true;
+ etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ m_stats.notifyUnderrun();
+ return 0;
+ }
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ m_stats.notifyBuffer(m_frames.size() * size);
+
+ if (m_is_prebuffering) {
+ if (m_pending_sti_frame.frame.empty()) {
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (m_pending_sti_frame.frame.size() == size) {
+ // readFrame gets called every 24ms, so we allow max 24ms
+ // difference between the input frame timestamp and the requested
+ // timestamp.
+ if (m_pending_sti_frame.timestamp.valid()) {
+ auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
+ ts_req += m_tist_delay;
+ const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
+
+ if (offset < 24e-3) {
+ m_is_prebuffering = false;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " valid timestamp, pre-buffering complete";
+
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ else {
+ // Wait more, but erase the front of the frame queue to avoid
+ // stalling on one frame with incorrect timestamp
+ if (m_frames.size() >= m_max_frames_overrun) {
+ m_pending_sti_frame.frame.clear();
+ }
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name <<
+ " skipping frame without timestamp";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ if (m_pending_sti_frame.frame.empty()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " empty, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
+ return 0;
+ }
+ else if (not m_pending_sti_frame.timestamp.valid()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " invalid timestamp, ignoring";
+ memset(buffer, 0, size);
+ m_pending_sti_frame.frame.clear();
+ m_stats.notifyUnderrun();
+ return 0;
+ }
+ else {
+ auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
+ ts_req += m_tist_delay;
+ const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
+
+ if (offset > 24e-3) {
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
+ m_pending_sti_frame.frame.clear();
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " timestamp out of bounds, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ }
+ }
+}
+
+void Edi::m_run()
+{
+ while (m_running) {
+ switch (m_input_used) {
+ case InputUsed::UDP:
+ {
+ constexpr size_t packsize = 2048;
+ const auto packet = m_udp_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ if (not packet.buffer.empty()) {
+ m_sti_decoder.push_packet(packet.buffer);
+ }
+ else {
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+ break;
+ case InputUsed::TCP:
+ {
+ auto packet = m_tcp_receive_server.receive();
+ if (not packet.empty()) {
+ m_sti_decoder.push_bytes(packet);
+ }
+ else {
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+ break;
+ default:
+ throw logic_error("unimplemented input");
+ }
+ }
+}
+
+void Edi::m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& sti) {
+ if (not sti.frame.empty()) {
+ // We should not wait here, because we want the complete input buffering
+ // happening inside m_frames. Using the blocking function is only a protection
+ // against runaway memory usage if something goes wrong in the consumer.
+ m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2);
+ }
+}
+
+int Edi::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
+ }
+
+ return bitrate;
+}
+
+void Edi::close()
+{
+ m_udp_sock.close();
+}
+
+
+void Edi::set_parameter(const std::string& parameter, const std::string& value)
+{
+ if (parameter == "buffer") {
+ size_t new_limit = atol(value.c_str());
+ m_max_frames_overrun = new_limit;
+ }
+ else if (parameter == "prebuffering") {
+ size_t new_limit = atol(value.c_str());
+ m_num_frames_prebuffering = new_limit;
+ }
+ else if (parameter == "buffermanagement") {
+ if (value == "prebuffering") {
+ setBufferManagement(Inputs::BufferManagement::Prebuffering);
+ }
+ else if (value == "timestamped") {
+ setBufferManagement(Inputs::BufferManagement::Timestamped);
+ }
+ else {
+ throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name());
+ }
+ }
+ else if (parameter == "tistdelay") {
+ m_tist_delay = chrono::milliseconds(stoi(value));
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+}
+
+const std::string Edi::get_parameter(const std::string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << m_max_frames_overrun;
+ }
+ else if (parameter == "prebuffering") {
+ ss << m_num_frames_prebuffering;
+ }
+ else if (parameter == "buffermanagement") {
+ switch (getBufferManagement()) {
+ case Inputs::BufferManagement::Prebuffering:
+ ss << "prebuffering";
+ break;
+ case Inputs::BufferManagement::Timestamped:
+ ss << "Timestamped";
+ break;
+ }
+ }
+ else if (parameter == "tistdelay") {
+ ss << m_tist_delay.count();
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+ return ss.str();
+}
+
+}
diff --git a/src/input/Edi.h b/src/input/Edi.h
new file mode 100644
index 0000000..ca465bd
--- /dev/null
+++ b/src/input/Edi.h
@@ -0,0 +1,126 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <deque>
+#include <thread>
+#include <mutex>
+#include "Socket.h"
+#include "input/inputs.h"
+#include "edi/STIDecoder.hpp"
+#include "edi/STIWriter.hpp"
+#include "ThreadsafeQueue.h"
+#include "ManagementServer.h"
+
+namespace Inputs {
+
+struct dab_input_edi_config_t
+{
+ /* The size of the internal buffer, measured in number
+ * of elements.
+ *
+ * Each element corresponds to one frame, i.e. 24ms
+ */
+ size_t buffer_size = 100;
+
+ /* The amount of prebuffering to do before we start streaming
+ *
+ * Same units as buffer_size
+ */
+ size_t prebuffering = 30;
+};
+
+/*
+ * Receives EDI from UDP or TCP in a separate thread and pushes that data
+ * into the STIDecoder. Complete frames are then put into a queue for the consumer.
+ *
+ * This way, the EDI decoding happens in a separate thread.
+ */
+class Edi : public InputBase, public RemoteControllable {
+ public:
+ Edi(const std::string& name, const dab_input_edi_config_t& config);
+ Edi(const Edi&) = delete;
+ Edi& operator=(const Edi&) = delete;
+ ~Edi();
+
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
+ virtual int setBitrate(int bitrate);
+ virtual void close();
+
+ /* Remote control */
+ virtual void set_parameter(const std::string& parameter, const std::string& value);
+ virtual const std::string get_parameter(const std::string& parameter) const;
+
+ protected:
+ void m_run();
+
+ void m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& frame);
+
+ std::mutex m_mutex;
+
+ enum class InputUsed { Invalid, UDP, TCP };
+ InputUsed m_input_used = InputUsed::Invalid;
+ Socket::UDPSocket m_udp_sock;
+ Socket::TCPReceiveServer m_tcp_receive_server;
+
+ EdiDecoder::STIWriter m_sti_writer;
+ EdiDecoder::STIDecoder m_sti_decoder;
+ std::thread m_thread;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+
+ // InputBase defines bufferManagement and tist delay
+
+ // Used in timestamp-based buffer management
+ EdiDecoder::sti_frame_t m_pending_sti_frame;
+
+ // State variable used in prebuffering-based buffer management
+ bool m_is_prebuffering = true;
+
+ /* When using prebuffering, consider the buffer to be full on the
+ * receive side if it's above the overrun threshold.
+ *
+ * When using timestamping, start discarding the front of the queue once the queue
+ * is this full. Must be smaller than m_max_frames_queued.
+ *
+ * Parameter 'buffer' inside RC. */
+ std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000);
+
+ /* When not using timestamping, how many frames to prebuffer.
+ * Parameter 'prebuffering' inside RC. */
+ std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10);
+
+ std::string m_name;
+ InputStat m_stats;
+};
+
+};
+
diff --git a/src/input/File.cpp b/src/input/File.cpp
index 20036ae..46bfb59 100644
--- a/src/input/File.cpp
+++ b/src/input/File.cpp
@@ -35,6 +35,8 @@
#include "mpeg.h"
#include "ReedSolomon.h"
+using namespace std;
+
namespace Inputs {
#ifdef _WIN32
@@ -58,7 +60,7 @@ __attribute((packed))
;
-int FileBase::open(const std::string& name)
+void FileBase::open(const std::string& name)
{
int flags = O_RDONLY | O_BINARY;
if (m_nonblock) {
@@ -67,30 +69,35 @@ int FileBase::open(const std::string& name)
m_fd = ::open(name.c_str(), flags);
if (m_fd == -1) {
- throw std::runtime_error("Could not open input file " + name + ": " +
+ throw runtime_error("Could not open input file " + name + ": " +
strerror(errno));
}
+}
+
+size_t FileBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // Will not be implemented, as there is no obvious way to carry timestamps
+ // in files.
+ memset(buffer, 0, size);
return 0;
}
int FileBase::setBitrate(int bitrate)
{
if (bitrate <= 0) {
- etiLog.log(error, "Invalid bitrate (%i)", bitrate);
- return -1;
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate));
}
return bitrate;
}
-int FileBase::close()
+void FileBase::close()
{
if (m_fd != -1) {
::close(m_fd);
m_fd = -1;
}
- return 0;
}
void FileBase::setNonblocking(bool nonblock)
@@ -182,7 +189,7 @@ ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size)
return size;
}
-int MPEGFile::readFrame(uint8_t* buffer, size_t size)
+size_t MPEGFile::readFrame(uint8_t *buffer, size_t size)
{
int result;
bool do_rewind = false;
@@ -275,12 +282,18 @@ MUTE_SUBCHANNEL:
}
}
}
+
+ // TODO this is probably wrong, because it should return
+ // the number of bytes written.
return result;
}
int MPEGFile::setBitrate(int bitrate)
{
- if (bitrate == 0) {
+ if (bitrate < 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate));
+ }
+ else if (bitrate == 0) {
uint8_t buffer[4];
if (readFrame(buffer, 4) == 0) {
@@ -294,7 +307,7 @@ int MPEGFile::setBitrate(int bitrate)
return bitrate;
}
-int RawFile::readFrame(uint8_t* buffer, size_t size)
+size_t RawFile::readFrame(uint8_t *buffer, size_t size)
{
return readFromFile(buffer, size);
}
@@ -304,7 +317,7 @@ PacketFile::PacketFile(bool enhancedPacketMode)
m_enhancedPacketEnabled = enhancedPacketMode;
}
-int PacketFile::readFrame(uint8_t* buffer, size_t size)
+size_t PacketFile::readFrame(uint8_t *buffer, size_t size)
{
size_t written = 0;
int length;
@@ -357,7 +370,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
length = 24;
}
else {
- std::copy(m_packetData.begin(),
+ copy(m_packetData.begin(),
m_packetData.begin() + m_packetLength,
buffer);
length = m_packetLength;
@@ -365,7 +378,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
}
}
else {
- std::copy(m_packetData.begin(),
+ copy(m_packetData.begin(),
m_packetData.begin() + m_packetLength,
buffer);
length = m_packetLength;
diff --git a/src/input/File.h b/src/input/File.h
index b574c39..39ce7fd 100644
--- a/src/input/File.h
+++ b/src/input/File.h
@@ -36,10 +36,11 @@ namespace Inputs {
class FileBase : public InputBase {
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size) = 0;
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
virtual void setNonblocking(bool nonblock);
@@ -63,7 +64,7 @@ class FileBase : public InputBase {
class MPEGFile : public FileBase {
public:
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
virtual int setBitrate(int bitrate);
private:
@@ -72,13 +73,13 @@ class MPEGFile : public FileBase {
class RawFile : public FileBase {
public:
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
};
class PacketFile : public FileBase {
public:
PacketFile(bool enhancedPacketMode);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
protected:
std::array<uint8_t, 96> m_packetData;
diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp
index 7856a46..155e625 100644
--- a/src/input/Prbs.cpp
+++ b/src/input/Prbs.cpp
@@ -44,7 +44,7 @@ namespace Inputs {
// Preferred polynomial is G(x) = x^20 + x^17 + 1
const uint32_t PRBS_DEFAULT_POLY = (1 << 20) | (1 << 17) | (1 << 0);
-int Prbs::open(const string& name)
+void Prbs::open(const string& name)
{
if (name.substr(0, 7) != "prbs://") {
throw logic_error("Invalid PRBS name");
@@ -73,11 +73,9 @@ int Prbs::open(const string& name)
m_prbs.setup(polynomial);
}
rewind();
-
- return 0;
}
-int Prbs::readFrame(uint8_t* buffer, size_t size)
+size_t Prbs::readFrame(uint8_t *buffer, size_t size)
{
for (size_t i = 0; i < size; ++i) {
buffer[i] = m_prbs.step();
@@ -86,14 +84,22 @@ int Prbs::readFrame(uint8_t* buffer, size_t size)
return size;
}
+size_t Prbs::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ memset(buffer, 0, size);
+ return 0;
+}
+
int Prbs::setBitrate(int bitrate)
{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate));
+ }
return bitrate;
}
-int Prbs::close()
+void Prbs::close()
{
- return 0;
}
int Prbs::rewind()
diff --git a/src/input/Prbs.h b/src/input/Prbs.h
index 51b7756..e2b94ec 100644
--- a/src/input/Prbs.h
+++ b/src/input/Prbs.h
@@ -37,10 +37,11 @@ namespace Inputs {
class Prbs : public InputBase {
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
private:
virtual int rewind();
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index 2cb49e7..a37ee21 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -38,7 +38,7 @@ using namespace std;
namespace Inputs {
-int Udp::open(const std::string& name)
+void Udp::open(const std::string& name)
{
// Skip the udp:// part if it is present
const string endpoint = (name.substr(0, 6) == "udp://") ?
@@ -57,8 +57,6 @@ int Udp::open(const std::string& name)
m_name = name;
openUdpSocket(endpoint);
-
- return 0;
}
void Udp::openUdpSocket(const std::string& endpoint)
@@ -82,61 +80,50 @@ void Udp::openUdpSocket(const std::string& endpoint)
throw out_of_range("can't use port number 0 in udp address");
}
- if (m_sock.reinit(port, address) == -1) {
- stringstream ss;
- ss << "Could not init UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (m_sock.setBlocking(false) == -1) {
- stringstream ss;
- ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ m_sock.reinit(port, address);
+ m_sock.setBlocking(false);
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
-int Udp::readFrame(uint8_t* buffer, size_t size)
+size_t Udp::readFrame(uint8_t *buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- std::copy(packet.getData(), packet.getData() + packet.getSize(),
- back_inserter(m_buffer));
+ std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));
// Take data from the buffer if it contains enough data,
// in any case write the buffer
if (m_buffer.size() >= (size_t)size) {
std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer);
+ return size;
}
else {
memset(buffer, 0x0, size);
+ return 0;
}
+}
- return size;
+size_t Udp::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // Maybe there's a way to carry timestamps, but we don't need it.
+ memset(buffer, 0x0, size);
+ return 0;
}
int Udp::setBitrate(int bitrate)
{
if (bitrate <= 0) {
- etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
- return -1;
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
}
return bitrate;
}
-int Udp::close()
+void Udp::close()
{
- return m_sock.close();
+ m_sock.close();
}
@@ -165,10 +152,10 @@ static uint16_t unpack2(const uint8_t *buf)
return (((uint16_t)buf[0]) << 8) | buf[1];
}
-int Sti_d_Rtp::open(const std::string& name)
+void Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -176,43 +163,34 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}
m_name = name;
openUdpSocket(endpoint);
-
- return 0;
}
void Sti_d_Rtp::receive_packet()
{
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
+ auto packet = m_sock.receive(32768);
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (packet.getSize() == 0) {
+ if (packet.buffer.empty()) {
// No packet was received
return;
}
const size_t STI_FC_LEN = 8;
- if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
etiLog.level(info) << "Received too small RTP packet for " <<
m_name;
return;
}
- if (not rtpHeaderValid(packet.getData())) {
+ if (not rtpHeaderValid(packet.buffer.data())) {
etiLog.level(info) << "Received invalid RTP header for " <<
m_name;
return;
@@ -220,7 +198,7 @@ void Sti_d_Rtp::receive_packet()
// STI(PI, X)
size_t index = RTP_HEADER_LEN;
- const uint8_t *buf = packet.getData();
+ const uint8_t *buf = packet.buffer.data();
// SYNC
index++; // Advance over STAT
@@ -242,7 +220,7 @@ void Sti_d_Rtp::receive_packet()
m_name;
return;
}
- if (packet.getSize() < index + DFS) {
+ if (packet.buffer.size() < index + DFS) {
etiLog.level(info) << "Received STI too small for given DFS for " <<
m_name;
return;
@@ -270,9 +248,9 @@ void Sti_d_Rtp::receive_packet()
uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
index += 2;
- if (packet.getSize() < index + 4*NST) {
+ if (packet.buffer.size() < index + 4*NST) {
etiLog.level(info) << "Received STI too small to contain NST for " <<
- m_name << " packet: " << packet.getSize() << " need " <<
+ m_name << " packet: " << packet.buffer.size() << " need " <<
index + 4*NST;
return;
}
@@ -307,7 +285,7 @@ void Sti_d_Rtp::receive_packet()
}
}
-int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
+size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size)
{
// Make sure we fill faster than we consume in case there
// are pending packets.
@@ -316,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
if (m_queue.empty()) {
memset(buffer, 0x0, size);
+ return 0;
}
else if (m_queue.front().size() != size) {
etiLog.level(warn) << "Invalid input data size for STI " << m_name <<
" : RX " << m_queue.front().size() << " expected " << size;
memset(buffer, 0x0, size);
m_queue.pop_front();
+ return 0;
}
else {
copy(m_queue.front().begin(), m_queue.front().end(), buffer);
m_queue.pop_front();
+ return size;
}
-
- return 0;
}
}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index dc01486..e5961c7 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -31,7 +31,7 @@
#include <deque>
#include <boost/thread.hpp>
#include "input/inputs.h"
-#include "UdpSocket.h"
+#include "Socket.h"
namespace Inputs {
@@ -40,13 +40,14 @@ namespace Inputs {
*/
class Udp : public InputBase {
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
protected:
- UdpSocket m_sock;
+ Socket::UDPSocket m_sock;
std::string m_name;
void openUdpSocket(const std::string& endpoint);
@@ -67,8 +68,8 @@ class Sti_d_Rtp : public Udp {
using vec_u8 = std::vector<uint8_t>;
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
private:
void receive_packet(void);
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index 2e35907..0a9d59d 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018 Matthias P. Braendli
+ Copyright (C) 2019 Matthias P. Braendli
http://www.opendigitalradio.org
ZeroMQ input. see www.zeromq.org for more info
@@ -220,7 +220,7 @@ void ZmqBase::rebind()
}
}
-int ZmqBase::open(const std::string& inputUri)
+void ZmqBase::open(const std::string& inputUri)
{
m_inputUri = inputUri;
@@ -229,33 +229,32 @@ int ZmqBase::open(const std::string& inputUri)
// We want to appear in the statistics !
m_stats.registerAtServer();
-
- return 0;
}
-int ZmqBase::close()
+void ZmqBase::close()
{
m_zmq_sock.close();
- return 0;
}
int ZmqBase::setBitrate(int bitrate)
{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
+ }
+
m_bitrate = bitrate;
- return bitrate; // TODO do a nice check here
+ return bitrate;
}
// size corresponds to a frame size. It is constant for a given bitrate
-int ZmqBase::readFrame(uint8_t* buffer, size_t size)
+size_t ZmqBase::readFrame(uint8_t* buffer, size_t size)
{
- int rc;
-
/* We must *always* read data from the ZMQ socket,
* to make sure that ZMQ internal buffers are emptied
* quickly. It's the only way to control the buffers
* of the whole path from encoder to our frame_buffer.
*/
- rc = readFromSocket(size);
+ const auto readsize = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
if (m_frame_buffer.size() >= m_config.buffer_size) {
@@ -296,10 +295,10 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
}
if (m_prebuf_current > 0) {
- if (rc > 0)
+ if (readsize > 0)
m_prebuf_current--;
if (m_prebuf_current == 0)
- etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
+ etiLog.log(info, "inputZMQ %s input pre-buffering complete",
m_rc_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
@@ -312,7 +311,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
m_stats.notifyBuffer(m_frame_buffer.size() * size);
if (m_frame_buffer.empty()) {
- etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
+ etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering",
m_rc_name.c_str());
// reset prebuffering
m_prebuf_current = m_config.prebuffering;
@@ -332,6 +331,13 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
}
}
+size_t ZmqBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // TODO add timestamps into the metadata and implement this
+ memset(buffer, 0, size);
+ return 0;
+}
+
/******** MPEG input *******/
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
index eb67fe5..2e37b5f 100644
--- a/src/input/Zmq.h
+++ b/src/input/Zmq.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2017 Matthias P. Braendli
+ Copyright (C) 2019 Matthias P. Braendli
http://www.opendigitalradio.org
ZeroMQ input. see www.zeromq.org for more info
@@ -45,7 +45,7 @@
#include <list>
#include <string>
-#include <stdint.h>
+#include <cstdint>
#include "zmq.hpp"
#include "input/inputs.h"
#include "ManagementServer.h"
@@ -156,6 +156,7 @@ class ZmqBase : public InputBase, public RemoteControllable {
m_bitrate(0),
m_enable_input(true),
m_config(config),
+ m_name(name),
m_stats(name),
m_prebuf_current(config.prebuffering) {
RC_ADD_PARAMETER(enable,
@@ -180,10 +181,11 @@ class ZmqBase : public InputBase, public RemoteControllable {
INVALIDATE_KEY(m_curve_encoder_key);
}
- virtual int open(const std::string& inputUri);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& inputUri);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
/* Remote control */
virtual void set_parameter(const std::string& parameter,
@@ -220,6 +222,7 @@ class ZmqBase : public InputBase, public RemoteControllable {
char m_curve_encoder_key[CURVE_KEYLEN+1];
std::string m_inputUri;
+ std::string m_name;
InputStat m_stats;
diff --git a/src/input/inputs.h b/src/input/inputs.h
index bfb1fb6..83cdbf2 100644
--- a/src/input/inputs.h
+++ b/src/input/inputs.h
@@ -2,7 +2,7 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -35,17 +35,63 @@
namespace Inputs {
+enum class BufferManagement {
+ // Use a buffer in the input that doesn't consider timestamps
+ Prebuffering,
+
+ // Buffer incoming data until a given timestamp is reached
+ Timestamped,
+};
+
+
/* New input object base */
class InputBase {
public:
- virtual int open(const std::string& name) = 0;
- virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ /* Throws runtime_error or invalid_argument on failure */
+ virtual void open(const std::string& name) = 0;
+
+ /* read a frame from the input. Buffer management is either not necessary
+ * (e.g. File input) or done with pre-buffering (network-based inputs).
+ *
+ * This ignores timestamps. All inputs support this.
+ *
+ * Returns number of data bytes written to the buffer. May clear the buffer
+ * if no data bytes available, in which case it will return 0.
+ *
+ * Returns negative on error.
+ */
+ virtual size_t readFrame(uint8_t *buffer, size_t size) = 0;
+
+ /* read a frame from the input, taking into account timestamp. The timestamp of the data
+ * returned is not more recent than the timestamp specified in seconds and tsta.
+ *
+ * seconds is in UNIX epoch, utco is the TAI-UTC offset, tsta is in the format used by ETI.
+ *
+ * Returns number of data bytes written to the buffer. May clear the buffer
+ * if no data bytes available, in which case it will return 0.
+ *
+ * Returns negative on error.
+ *
+ * Calling this function on inputs that do not support timestamps returns 0. This allows
+ * changing the buffer management at runtime without risking an crash due to an exception.
+ */
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) = 0;
+
+ /* Returns the effectively used bitrate, or throws invalid_argument on invalid bitrate */
virtual int setBitrate(int bitrate) = 0;
- virtual int close() = 0;
+ virtual void close() = 0;
virtual ~InputBase() {}
+
+ void setTistDelay(const std::chrono::milliseconds& ms) { m_tist_delay = ms; }
+ void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; }
+ BufferManagement getBufferManagement() const { return m_bufferManagement; }
+
protected:
InputBase() {}
+
+ std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering);
+ std::chrono::milliseconds m_tist_delay;
};
};