diff options
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/Edi.cpp | 427 | ||||
-rw-r--r-- | src/input/Edi.h | 126 | ||||
-rw-r--r-- | src/input/File.cpp | 37 | ||||
-rw-r--r-- | src/input/File.h | 13 | ||||
-rw-r--r-- | src/input/Prbs.cpp | 18 | ||||
-rw-r--r-- | src/input/Prbs.h | 7 | ||||
-rw-r--r-- | src/input/Udp.cpp | 89 | ||||
-rw-r--r-- | src/input/Udp.h | 15 | ||||
-rw-r--r-- | src/input/Zmq.cpp | 34 | ||||
-rw-r--r-- | src/input/Zmq.h | 13 | ||||
-rw-r--r-- | src/input/inputs.h | 54 |
11 files changed, 721 insertions, 112 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp new file mode 100644 index 0000000..b5301d2 --- /dev/null +++ b/src/input/Edi.cpp @@ -0,0 +1,427 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "input/Edi.h" + +#include <regex> +#include <chrono> +#include <stdexcept> +#include <sstream> +#include <cstring> +#include <cmath> +#include <cstdlib> +#include <cerrno> +#include <climits> +#include "utils.h" + +using namespace std; + +namespace Inputs { + +constexpr bool VERBOSE = false; +constexpr size_t TCP_BLOCKSIZE = 2048; + +Edi::Edi(const std::string& name, const dab_input_edi_config_t& config) : + RemoteControllable(name), + m_tcp_receive_server(TCP_BLOCKSIZE), + m_sti_writer(bind(&Edi::m_new_sti_frame_callback, this, placeholders::_1)), + m_sti_decoder(m_sti_writer, VERBOSE), + m_max_frames_overrun(config.buffer_size), + m_num_frames_prebuffering(config.prebuffering), + m_name(name), + m_stats(name) +{ + RC_ADD_PARAMETER(buffermanagement, + "Set type of buffer management to use [prebuffering, timestamped]"); + + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [24ms frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [24ms frames]"); + + RC_ADD_PARAMETER(tistdelay, "TIST delay to add [ms]"); +} + +Edi::~Edi() { + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void Edi::open(const std::string& name) +{ + const std::regex re_udp("udp://:([0-9]+)"); + const std::regex re_tcp("tcp://(.*):([0-9]+)"); + + lock_guard<mutex> lock(m_mutex); + + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + } + + std::smatch m; + if (std::regex_match(name, m, re_udp)) { + const int udp_port = std::stoi(m[1].str()); + m_input_used = InputUsed::UDP; + m_udp_sock.reinit(udp_port); + m_udp_sock.setBlocking(false); + // TODO multicast + } + else if (std::regex_match(name, m, re_tcp)) { + m_input_used = InputUsed::TCP; + const string addr = m[1].str(); + const int tcp_port = std::stoi(m[2].str()); + m_tcp_receive_server.start(tcp_port, addr); + } + else { + throw runtime_error("Cannot parse EDI input URI"); + } + + m_stats.registerAtServer(); + + m_running = true; + m_thread = std::thread(&Edi::m_run, this); +} + +size_t Edi::readFrame(uint8_t *buffer, size_t size) +{ + // Save stats data in bytes, not in frames + m_stats.notifyBuffer(m_frames.size() * size); + + EdiDecoder::sti_frame_t sti; + if (m_is_prebuffering) { + m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering; + if (not m_is_prebuffering) { + etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + } + memset(buffer, 0, size * sizeof(*buffer)); + return 0; + } + else if (not m_pending_sti_frame.frame.empty()) { + // Can only happen when switching from timestamp-based buffer management! + if (m_pending_sti_frame.frame.size() != size) { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + memset(buffer, 0, size * sizeof(*buffer)); + return 0; + } + else { + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right); + + copy(m_pending_sti_frame.frame.begin(), + m_pending_sti_frame.frame.end(), + buffer); + m_pending_sti_frame.frame.clear(); + return size; + } + } + else if (m_frames.try_pop(sti)) { + if (sti.frame.size() == 0) { + etiLog.level(debug) << "EDI input " << m_name << " empty frame"; + return 0; + } + else if (sti.frame.size() == size) { + // Steady-state when everything works well + if (m_frames.size() > m_max_frames_overrun) { + m_stats.notifyOverrun(); + + /* If the buffer is too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. */ + size_t over_max = m_frames.size() - m_num_frames_prebuffering; + + while (over_max--) { + EdiDecoder::sti_frame_t discard; + m_frames.try_pop(discard); + } + } + + if (not sti.version_data.version.empty()) { + m_stats.notifyVersion( + sti.version_data.version, + sti.version_data.uptime_s); + } + m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right); + + copy(sti.frame.cbegin(), sti.frame.cend(), buffer); + return size; + } + else { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + sti.frame.size() << " received, " << size << " requested"; + memset(buffer, 0, size * sizeof(*buffer)); + return 0; + } + } + else { + memset(buffer, 0, size * sizeof(*buffer)); + m_is_prebuffering = true; + etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + m_stats.notifyUnderrun(); + return 0; + } +} + +size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ + if (m_pending_sti_frame.frame.empty()) { + m_frames.try_pop(m_pending_sti_frame); + } + + m_stats.notifyBuffer(m_frames.size() * size); + + if (m_is_prebuffering) { + if (m_pending_sti_frame.frame.empty()) { + memset(buffer, 0, size); + return 0; + } + else if (m_pending_sti_frame.frame.size() == size) { + // readFrame gets called every 24ms, so we allow max 24ms + // difference between the input frame timestamp and the requested + // timestamp. + if (m_pending_sti_frame.timestamp.valid()) { + auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta); + ts_req += m_tist_delay; + const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req); + + if (offset < 24e-3) { + m_is_prebuffering = false; + etiLog.level(warn) << "EDI input " << m_name << + " valid timestamp, pre-buffering complete"; + + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, + m_pending_sti_frame.audio_levels.right); + copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); + m_pending_sti_frame.frame.clear(); + return size; + } + else { + // Wait more, but erase the front of the frame queue to avoid + // stalling on one frame with incorrect timestamp + if (m_frames.size() >= m_max_frames_overrun) { + m_pending_sti_frame.frame.clear(); + } + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return 0; + } + } + else { + etiLog.level(debug) << "EDI input " << m_name << + " skipping frame without timestamp"; + m_pending_sti_frame.frame.clear(); + memset(buffer, 0, size); + return 0; + } + } + else { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + m_pending_sti_frame.frame.clear(); + memset(buffer, 0, size); + return 0; + } + } + else { + if (m_pending_sti_frame.frame.empty()) { + etiLog.level(warn) << "EDI input " << m_name << + " empty, re-enabling pre-buffering"; + memset(buffer, 0, size); + m_stats.notifyUnderrun(); + m_is_prebuffering = true; + return 0; + } + else if (not m_pending_sti_frame.timestamp.valid()) { + etiLog.level(warn) << "EDI input " << m_name << + " invalid timestamp, ignoring"; + memset(buffer, 0, size); + m_pending_sti_frame.frame.clear(); + m_stats.notifyUnderrun(); + return 0; + } + else { + auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta); + ts_req += m_tist_delay; + const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req); + + if (offset > 24e-3) { + m_stats.notifyUnderrun(); + m_is_prebuffering = true; + m_pending_sti_frame.frame.clear(); + etiLog.level(warn) << "EDI input " << m_name << + " timestamp out of bounds, re-enabling pre-buffering"; + memset(buffer, 0, size); + return 0; + } + else { + if (not m_pending_sti_frame.version_data.version.empty()) { + m_stats.notifyVersion( + m_pending_sti_frame.version_data.version, + m_pending_sti_frame.version_data.uptime_s); + } + + m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, + m_pending_sti_frame.audio_levels.right); + copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); + m_pending_sti_frame.frame.clear(); + return size; + } + } + } +} + +void Edi::m_run() +{ + while (m_running) { + switch (m_input_used) { + case InputUsed::UDP: + { + constexpr size_t packsize = 2048; + const auto packet = m_udp_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + if (not packet.buffer.empty()) { + m_sti_decoder.push_packet(packet.buffer); + } + else { + this_thread::sleep_for(chrono::milliseconds(12)); + } + } + break; + case InputUsed::TCP: + { + auto packet = m_tcp_receive_server.receive(); + if (not packet.empty()) { + m_sti_decoder.push_bytes(packet); + } + else { + this_thread::sleep_for(chrono::milliseconds(12)); + } + } + break; + default: + throw logic_error("unimplemented input"); + } + } +} + +void Edi::m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& sti) { + if (not sti.frame.empty()) { + // We should not wait here, because we want the complete input buffering + // happening inside m_frames. Using the blocking function is only a protection + // against runaway memory usage if something goes wrong in the consumer. + m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2); + } +} + +int Edi::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name); + } + + return bitrate; +} + +void Edi::close() +{ + m_udp_sock.close(); +} + + +void Edi::set_parameter(const std::string& parameter, const std::string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + m_max_frames_overrun = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_limit = atol(value.c_str()); + m_num_frames_prebuffering = new_limit; + } + else if (parameter == "buffermanagement") { + if (value == "prebuffering") { + setBufferManagement(Inputs::BufferManagement::Prebuffering); + } + else if (value == "timestamped") { + setBufferManagement(Inputs::BufferManagement::Timestamped); + } + else { + throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name()); + } + } + else if (parameter == "tistdelay") { + m_tist_delay = chrono::milliseconds(stoi(value)); + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } +} + +const std::string Edi::get_parameter(const std::string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_max_frames_overrun; + } + else if (parameter == "prebuffering") { + ss << m_num_frames_prebuffering; + } + else if (parameter == "buffermanagement") { + switch (getBufferManagement()) { + case Inputs::BufferManagement::Prebuffering: + ss << "prebuffering"; + break; + case Inputs::BufferManagement::Timestamped: + ss << "Timestamped"; + break; + } + } + else if (parameter == "tistdelay") { + ss << m_tist_delay.count(); + } + else { + throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); + } + return ss.str(); +} + +} diff --git a/src/input/Edi.h b/src/input/Edi.h new file mode 100644 index 0000000..ca465bd --- /dev/null +++ b/src/input/Edi.h @@ -0,0 +1,126 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> +#include <vector> +#include <deque> +#include <thread> +#include <mutex> +#include "Socket.h" +#include "input/inputs.h" +#include "edi/STIDecoder.hpp" +#include "edi/STIWriter.hpp" +#include "ThreadsafeQueue.h" +#include "ManagementServer.h" + +namespace Inputs { + +struct dab_input_edi_config_t +{ + /* The size of the internal buffer, measured in number + * of elements. + * + * Each element corresponds to one frame, i.e. 24ms + */ + size_t buffer_size = 100; + + /* The amount of prebuffering to do before we start streaming + * + * Same units as buffer_size + */ + size_t prebuffering = 30; +}; + +/* + * Receives EDI from UDP or TCP in a separate thread and pushes that data + * into the STIDecoder. Complete frames are then put into a queue for the consumer. + * + * This way, the EDI decoding happens in a separate thread. + */ +class Edi : public InputBase, public RemoteControllable { + public: + Edi(const std::string& name, const dab_input_edi_config_t& config); + Edi(const Edi&) = delete; + Edi& operator=(const Edi&) = delete; + ~Edi(); + + virtual void open(const std::string& name); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); + virtual int setBitrate(int bitrate); + virtual void close(); + + /* Remote control */ + virtual void set_parameter(const std::string& parameter, const std::string& value); + virtual const std::string get_parameter(const std::string& parameter) const; + + protected: + void m_run(); + + void m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& frame); + + std::mutex m_mutex; + + enum class InputUsed { Invalid, UDP, TCP }; + InputUsed m_input_used = InputUsed::Invalid; + Socket::UDPSocket m_udp_sock; + Socket::TCPReceiveServer m_tcp_receive_server; + + EdiDecoder::STIWriter m_sti_writer; + EdiDecoder::STIDecoder m_sti_decoder; + std::thread m_thread; + std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); + ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; + + // InputBase defines bufferManagement and tist delay + + // Used in timestamp-based buffer management + EdiDecoder::sti_frame_t m_pending_sti_frame; + + // State variable used in prebuffering-based buffer management + bool m_is_prebuffering = true; + + /* When using prebuffering, consider the buffer to be full on the + * receive side if it's above the overrun threshold. + * + * When using timestamping, start discarding the front of the queue once the queue + * is this full. Must be smaller than m_max_frames_queued. + * + * Parameter 'buffer' inside RC. */ + std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000); + + /* When not using timestamping, how many frames to prebuffer. + * Parameter 'prebuffering' inside RC. */ + std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10); + + std::string m_name; + InputStat m_stats; +}; + +}; + diff --git a/src/input/File.cpp b/src/input/File.cpp index 20036ae..46bfb59 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -35,6 +35,8 @@ #include "mpeg.h" #include "ReedSolomon.h" +using namespace std; + namespace Inputs { #ifdef _WIN32 @@ -58,7 +60,7 @@ __attribute((packed)) ; -int FileBase::open(const std::string& name) +void FileBase::open(const std::string& name) { int flags = O_RDONLY | O_BINARY; if (m_nonblock) { @@ -67,30 +69,35 @@ int FileBase::open(const std::string& name) m_fd = ::open(name.c_str(), flags); if (m_fd == -1) { - throw std::runtime_error("Could not open input file " + name + ": " + + throw runtime_error("Could not open input file " + name + ": " + strerror(errno)); } +} + +size_t FileBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ + // Will not be implemented, as there is no obvious way to carry timestamps + // in files. + memset(buffer, 0, size); return 0; } int FileBase::setBitrate(int bitrate) { if (bitrate <= 0) { - etiLog.log(error, "Invalid bitrate (%i)", bitrate); - return -1; + throw invalid_argument("Invalid bitrate " + to_string(bitrate)); } return bitrate; } -int FileBase::close() +void FileBase::close() { if (m_fd != -1) { ::close(m_fd); m_fd = -1; } - return 0; } void FileBase::setNonblocking(bool nonblock) @@ -182,7 +189,7 @@ ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size) return size; } -int MPEGFile::readFrame(uint8_t* buffer, size_t size) +size_t MPEGFile::readFrame(uint8_t *buffer, size_t size) { int result; bool do_rewind = false; @@ -275,12 +282,18 @@ MUTE_SUBCHANNEL: } } } + + // TODO this is probably wrong, because it should return + // the number of bytes written. return result; } int MPEGFile::setBitrate(int bitrate) { - if (bitrate == 0) { + if (bitrate < 0) { + throw invalid_argument("Invalid bitrate " + to_string(bitrate)); + } + else if (bitrate == 0) { uint8_t buffer[4]; if (readFrame(buffer, 4) == 0) { @@ -294,7 +307,7 @@ int MPEGFile::setBitrate(int bitrate) return bitrate; } -int RawFile::readFrame(uint8_t* buffer, size_t size) +size_t RawFile::readFrame(uint8_t *buffer, size_t size) { return readFromFile(buffer, size); } @@ -304,7 +317,7 @@ PacketFile::PacketFile(bool enhancedPacketMode) m_enhancedPacketEnabled = enhancedPacketMode; } -int PacketFile::readFrame(uint8_t* buffer, size_t size) +size_t PacketFile::readFrame(uint8_t *buffer, size_t size) { size_t written = 0; int length; @@ -357,7 +370,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size) length = 24; } else { - std::copy(m_packetData.begin(), + copy(m_packetData.begin(), m_packetData.begin() + m_packetLength, buffer); length = m_packetLength; @@ -365,7 +378,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size) } } else { - std::copy(m_packetData.begin(), + copy(m_packetData.begin(), m_packetData.begin() + m_packetLength, buffer); length = m_packetLength; diff --git a/src/input/File.h b/src/input/File.h index b574c39..39ce7fd 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -36,10 +36,11 @@ namespace Inputs { class FileBase : public InputBase { public: - virtual int open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size) = 0; + virtual void open(const std::string& name); + virtual size_t readFrame(uint8_t *buffer, size_t size) = 0; + virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); virtual int setBitrate(int bitrate); - virtual int close(); + virtual void close(); virtual void setNonblocking(bool nonblock); @@ -63,7 +64,7 @@ class FileBase : public InputBase { class MPEGFile : public FileBase { public: - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); virtual int setBitrate(int bitrate); private: @@ -72,13 +73,13 @@ class MPEGFile : public FileBase { class RawFile : public FileBase { public: - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); }; class PacketFile : public FileBase { public: PacketFile(bool enhancedPacketMode); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size); protected: std::array<uint8_t, 96> m_packetData; diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp index 7856a46..155e625 100644 --- a/src/input/Prbs.cpp +++ b/src/input/Prbs.cpp @@ -44,7 +44,7 @@ namespace Inputs { // Preferred polynomial is G(x) = x^20 + x^17 + 1 const uint32_t PRBS_DEFAULT_POLY = (1 << 20) | (1 << 17) | (1 << 0); -int Prbs::open(const string& name) +void Prbs::open(const string& name) { if (name.substr(0, 7) != "prbs://") { throw logic_error("Invalid PRBS name"); @@ -73,11 +73,9 @@ int Prbs::open(const string& name) m_prbs.setup(polynomial); } rewind(); - - return 0; } -int Prbs::readFrame(uint8_t* buffer, size_t size) +size_t Prbs::readFrame(uint8_t *buffer, size_t size) { for (size_t i = 0; i < size; ++i) { buffer[i] = m_prbs.step(); @@ -86,14 +84,22 @@ int Prbs::readFrame(uint8_t* buffer, size_t size) return size; } +size_t Prbs::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ + memset(buffer, 0, size); + return 0; +} + int Prbs::setBitrate(int bitrate) { + if (bitrate <= 0) { + throw invalid_argument("Invalid bitrate " + to_string(bitrate)); + } return bitrate; } -int Prbs::close() +void Prbs::close() { - return 0; } int Prbs::rewind() diff --git a/src/input/Prbs.h b/src/input/Prbs.h index 51b7756..e2b94ec 100644 --- a/src/input/Prbs.h +++ b/src/input/Prbs.h @@ -37,10 +37,11 @@ namespace Inputs { class Prbs : public InputBase { public: - virtual int open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual void open(const std::string& name); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); virtual int setBitrate(int bitrate); - virtual int close(); + virtual void close(); private: virtual int rewind(); diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..a37ee21 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -38,7 +38,7 @@ using namespace std; namespace Inputs { -int Udp::open(const std::string& name) +void Udp::open(const std::string& name) { // Skip the udp:// part if it is present const string endpoint = (name.substr(0, 6) == "udp://") ? @@ -57,8 +57,6 @@ int Udp::open(const std::string& name) m_name = name; openUdpSocket(endpoint); - - return 0; } void Udp::openUdpSocket(const std::string& endpoint) @@ -82,61 +80,50 @@ void Udp::openUdpSocket(const std::string& endpoint) throw out_of_range("can't use port number 0 in udp address"); } - if (m_sock.reinit(port, address) == -1) { - stringstream ss; - ss << "Could not init UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } - - if (m_sock.setBlocking(false) == -1) { - stringstream ss; - ss << "Could not set non-blocking UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + m_sock.reinit(port, address); + m_sock.setBlocking(false); etiLog.level(info) << "Opened UDP port " << address << ":" << port; } -int Udp::readFrame(uint8_t* buffer, size_t size) +size_t Udp::readFrame(uint8_t *buffer, size_t size) { // Regardless of buffer contents, try receiving data. - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - std::copy(packet.getData(), packet.getData() + packet.getSize(), - back_inserter(m_buffer)); + std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer)); // Take data from the buffer if it contains enough data, // in any case write the buffer if (m_buffer.size() >= (size_t)size) { std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer); + return size; } else { memset(buffer, 0x0, size); + return 0; } +} - return size; +size_t Udp::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ + // Maybe there's a way to carry timestamps, but we don't need it. + memset(buffer, 0x0, size); + return 0; } int Udp::setBitrate(int bitrate) { if (bitrate <= 0) { - etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); - return -1; + throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name); } return bitrate; } -int Udp::close() +void Udp::close() { - return m_sock.close(); + m_sock.close(); } @@ -165,10 +152,10 @@ static uint16_t unpack2(const uint8_t *buf) return (((uint16_t)buf[0]) << 8) | buf[1]; } -int Sti_d_Rtp::open(const std::string& name) +void Sti_d_Rtp::open(const std::string& name) { - // Skip the sti-rtp:// part if it is present - const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? + // Skip the rtp:// part if it is present + const string endpoint = (name.substr(0, 10) == "rtp://") ? name.substr(10) : name; // The endpoint should be address:port @@ -176,43 +163,34 @@ int Sti_d_Rtp::open(const std::string& name) if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << - " is an invalid format for sti-rtp address: " - "expected [sti-rtp://]address:port"; + " is an invalid format for rtp address: " + "expected [rtp://]address:port"; throw invalid_argument(ss.str()); } m_name = name; openUdpSocket(endpoint); - - return 0; } void Sti_d_Rtp::receive_packet() { - UdpPacket packet(32768); - int ret = m_sock.receive(packet); + auto packet = m_sock.receive(32768); - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } - - if (packet.getSize() == 0) { + if (packet.buffer.empty()) { // No packet was received return; } const size_t STI_FC_LEN = 8; - if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { + if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { etiLog.level(info) << "Received too small RTP packet for " << m_name; return; } - if (not rtpHeaderValid(packet.getData())) { + if (not rtpHeaderValid(packet.buffer.data())) { etiLog.level(info) << "Received invalid RTP header for " << m_name; return; @@ -220,7 +198,7 @@ void Sti_d_Rtp::receive_packet() // STI(PI, X) size_t index = RTP_HEADER_LEN; - const uint8_t *buf = packet.getData(); + const uint8_t *buf = packet.buffer.data(); // SYNC index++; // Advance over STAT @@ -242,7 +220,7 @@ void Sti_d_Rtp::receive_packet() m_name; return; } - if (packet.getSize() < index + DFS) { + if (packet.buffer.size() < index + DFS) { etiLog.level(info) << "Received STI too small for given DFS for " << m_name; return; @@ -270,9 +248,9 @@ void Sti_d_Rtp::receive_packet() uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits index += 2; - if (packet.getSize() < index + 4*NST) { + if (packet.buffer.size() < index + 4*NST) { etiLog.level(info) << "Received STI too small to contain NST for " << - m_name << " packet: " << packet.getSize() << " need " << + m_name << " packet: " << packet.buffer.size() << " need " << index + 4*NST; return; } @@ -307,7 +285,7 @@ void Sti_d_Rtp::receive_packet() } } -int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) +size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size) { // Make sure we fill faster than we consume in case there // are pending packets. @@ -316,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) if (m_queue.empty()) { memset(buffer, 0x0, size); + return 0; } else if (m_queue.front().size() != size) { etiLog.level(warn) << "Invalid input data size for STI " << m_name << " : RX " << m_queue.front().size() << " expected " << size; memset(buffer, 0x0, size); m_queue.pop_front(); + return 0; } else { copy(m_queue.front().begin(), m_queue.front().end(), buffer); m_queue.pop_front(); + return size; } - - return 0; } } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..e5961c7 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@ #include <deque> #include <boost/thread.hpp> #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h" namespace Inputs { @@ -40,13 +40,14 @@ namespace Inputs { */ class Udp : public InputBase { public: - virtual int open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual void open(const std::string& name); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); virtual int setBitrate(int bitrate); - virtual int close(); + virtual void close(); protected: - UdpSocket m_sock; + Socket::UDPSocket m_sock; std::string m_name; void openUdpSocket(const std::string& endpoint); @@ -67,8 +68,8 @@ class Sti_d_Rtp : public Udp { using vec_u8 = std::vector<uint8_t>; public: - virtual int open(const std::string& name); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual void open(const std::string& name); + virtual size_t readFrame(uint8_t *buffer, size_t size); private: void receive_packet(void); diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 2e35907..0a9d59d 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 Matthias P. Braendli + Copyright (C) 2019 Matthias P. Braendli http://www.opendigitalradio.org ZeroMQ input. see www.zeromq.org for more info @@ -220,7 +220,7 @@ void ZmqBase::rebind() } } -int ZmqBase::open(const std::string& inputUri) +void ZmqBase::open(const std::string& inputUri) { m_inputUri = inputUri; @@ -229,33 +229,32 @@ int ZmqBase::open(const std::string& inputUri) // We want to appear in the statistics ! m_stats.registerAtServer(); - - return 0; } -int ZmqBase::close() +void ZmqBase::close() { m_zmq_sock.close(); - return 0; } int ZmqBase::setBitrate(int bitrate) { + if (bitrate <= 0) { + throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name); + } + m_bitrate = bitrate; - return bitrate; // TODO do a nice check here + return bitrate; } // size corresponds to a frame size. It is constant for a given bitrate -int ZmqBase::readFrame(uint8_t* buffer, size_t size) +size_t ZmqBase::readFrame(uint8_t* buffer, size_t size) { - int rc; - /* We must *always* read data from the ZMQ socket, * to make sure that ZMQ internal buffers are emptied * quickly. It's the only way to control the buffers * of the whole path from encoder to our frame_buffer. */ - rc = readFromSocket(size); + const auto readsize = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ if (m_frame_buffer.size() >= m_config.buffer_size) { @@ -296,10 +295,10 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) } if (m_prebuf_current > 0) { - if (rc > 0) + if (readsize > 0) m_prebuf_current--; if (m_prebuf_current == 0) - etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", + etiLog.log(info, "inputZMQ %s input pre-buffering complete", m_rc_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ @@ -312,7 +311,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) m_stats.notifyBuffer(m_frame_buffer.size() * size); if (m_frame_buffer.empty()) { - etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", + etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering", m_rc_name.c_str()); // reset prebuffering m_prebuf_current = m_config.prebuffering; @@ -332,6 +331,13 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) } } +size_t ZmqBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ + // TODO add timestamps into the metadata and implement this + memset(buffer, 0, size); + return 0; +} + /******** MPEG input *******/ diff --git a/src/input/Zmq.h b/src/input/Zmq.h index eb67fe5..2e37b5f 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 Matthias P. Braendli + Copyright (C) 2019 Matthias P. Braendli http://www.opendigitalradio.org ZeroMQ input. see www.zeromq.org for more info @@ -45,7 +45,7 @@ #include <list> #include <string> -#include <stdint.h> +#include <cstdint> #include "zmq.hpp" #include "input/inputs.h" #include "ManagementServer.h" @@ -156,6 +156,7 @@ class ZmqBase : public InputBase, public RemoteControllable { m_bitrate(0), m_enable_input(true), m_config(config), + m_name(name), m_stats(name), m_prebuf_current(config.prebuffering) { RC_ADD_PARAMETER(enable, @@ -180,10 +181,11 @@ class ZmqBase : public InputBase, public RemoteControllable { INVALIDATE_KEY(m_curve_encoder_key); } - virtual int open(const std::string& inputUri); - virtual int readFrame(uint8_t* buffer, size_t size); + virtual void open(const std::string& inputUri); + virtual size_t readFrame(uint8_t *buffer, size_t size); + virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); virtual int setBitrate(int bitrate); - virtual int close(); + virtual void close(); /* Remote control */ virtual void set_parameter(const std::string& parameter, @@ -220,6 +222,7 @@ class ZmqBase : public InputBase, public RemoteControllable { char m_curve_encoder_key[CURVE_KEYLEN+1]; std::string m_inputUri; + std::string m_name; InputStat m_stats; diff --git a/src/input/inputs.h b/src/input/inputs.h index bfb1fb6..83cdbf2 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -2,7 +2,7 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -35,17 +35,63 @@ namespace Inputs { +enum class BufferManagement { + // Use a buffer in the input that doesn't consider timestamps + Prebuffering, + + // Buffer incoming data until a given timestamp is reached + Timestamped, +}; + + /* New input object base */ class InputBase { public: - virtual int open(const std::string& name) = 0; - virtual int readFrame(uint8_t* buffer, size_t size) = 0; + /* Throws runtime_error or invalid_argument on failure */ + virtual void open(const std::string& name) = 0; + + /* read a frame from the input. Buffer management is either not necessary + * (e.g. File input) or done with pre-buffering (network-based inputs). + * + * This ignores timestamps. All inputs support this. + * + * Returns number of data bytes written to the buffer. May clear the buffer + * if no data bytes available, in which case it will return 0. + * + * Returns negative on error. + */ + virtual size_t readFrame(uint8_t *buffer, size_t size) = 0; + + /* read a frame from the input, taking into account timestamp. The timestamp of the data + * returned is not more recent than the timestamp specified in seconds and tsta. + * + * seconds is in UNIX epoch, utco is the TAI-UTC offset, tsta is in the format used by ETI. + * + * Returns number of data bytes written to the buffer. May clear the buffer + * if no data bytes available, in which case it will return 0. + * + * Returns negative on error. + * + * Calling this function on inputs that do not support timestamps returns 0. This allows + * changing the buffer management at runtime without risking an crash due to an exception. + */ + virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) = 0; + + /* Returns the effectively used bitrate, or throws invalid_argument on invalid bitrate */ virtual int setBitrate(int bitrate) = 0; - virtual int close() = 0; + virtual void close() = 0; virtual ~InputBase() {} + + void setTistDelay(const std::chrono::milliseconds& ms) { m_tist_delay = ms; } + void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; } + BufferManagement getBufferManagement() const { return m_bufferManagement; } + protected: InputBase() {} + + std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering); + std::chrono::milliseconds m_tist_delay; }; }; |