summaryrefslogtreecommitdiffstats
path: root/src/input/Edi.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/input/Edi.cpp')
-rw-r--r--src/input/Edi.cpp427
1 files changed, 427 insertions, 0 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
new file mode 100644
index 0000000..b5301d2
--- /dev/null
+++ b/src/input/Edi.cpp
@@ -0,0 +1,427 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Edi.h"
+
+#include <regex>
+#include <chrono>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <cmath>
+#include <cstdlib>
+#include <cerrno>
+#include <climits>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+constexpr bool VERBOSE = false;
+constexpr size_t TCP_BLOCKSIZE = 2048;
+
+Edi::Edi(const std::string& name, const dab_input_edi_config_t& config) :
+ RemoteControllable(name),
+ m_tcp_receive_server(TCP_BLOCKSIZE),
+ m_sti_writer(bind(&Edi::m_new_sti_frame_callback, this, placeholders::_1)),
+ m_sti_decoder(m_sti_writer, VERBOSE),
+ m_max_frames_overrun(config.buffer_size),
+ m_num_frames_prebuffering(config.prebuffering),
+ m_name(name),
+ m_stats(name)
+{
+ RC_ADD_PARAMETER(buffermanagement,
+ "Set type of buffer management to use [prebuffering, timestamped]");
+
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [24ms frames]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [24ms frames]");
+
+ RC_ADD_PARAMETER(tistdelay, "TIST delay to add [ms]");
+}
+
+Edi::~Edi() {
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void Edi::open(const std::string& name)
+{
+ const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_tcp("tcp://(.*):([0-9]+)");
+
+ lock_guard<mutex> lock(m_mutex);
+
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+
+ std::smatch m;
+ if (std::regex_match(name, m, re_udp)) {
+ const int udp_port = std::stoi(m[1].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port);
+ m_udp_sock.setBlocking(false);
+ // TODO multicast
+ }
+ else if (std::regex_match(name, m, re_tcp)) {
+ m_input_used = InputUsed::TCP;
+ const string addr = m[1].str();
+ const int tcp_port = std::stoi(m[2].str());
+ m_tcp_receive_server.start(tcp_port, addr);
+ }
+ else {
+ throw runtime_error("Cannot parse EDI input URI");
+ }
+
+ m_stats.registerAtServer();
+
+ m_running = true;
+ m_thread = std::thread(&Edi::m_run, this);
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size)
+{
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frames.size() * size);
+
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ else if (not m_pending_sti_frame.frame.empty()) {
+ // Can only happen when switching from timestamp-based buffer management!
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right);
+
+ copy(m_pending_sti_frame.frame.begin(),
+ m_pending_sti_frame.frame.end(),
+ buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ }
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
+ etiLog.level(debug) << "EDI input " << m_name << " empty frame";
+ return 0;
+ }
+ else if (sti.frame.size() == size) {
+ // Steady-state when everything works well
+ if (m_frames.size() > m_max_frames_overrun) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length. */
+ size_t over_max = m_frames.size() - m_num_frames_prebuffering;
+
+ while (over_max--) {
+ EdiDecoder::sti_frame_t discard;
+ m_frames.try_pop(discard);
+ }
+ }
+
+ if (not sti.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti.version_data.version,
+ sti.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right);
+
+ copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ return size;
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ sti.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ }
+ else {
+ memset(buffer, 0, size * sizeof(*buffer));
+ m_is_prebuffering = true;
+ etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ m_stats.notifyUnderrun();
+ return 0;
+ }
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ m_stats.notifyBuffer(m_frames.size() * size);
+
+ if (m_is_prebuffering) {
+ if (m_pending_sti_frame.frame.empty()) {
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (m_pending_sti_frame.frame.size() == size) {
+ // readFrame gets called every 24ms, so we allow max 24ms
+ // difference between the input frame timestamp and the requested
+ // timestamp.
+ if (m_pending_sti_frame.timestamp.valid()) {
+ auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
+ ts_req += m_tist_delay;
+ const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
+
+ if (offset < 24e-3) {
+ m_is_prebuffering = false;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " valid timestamp, pre-buffering complete";
+
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ else {
+ // Wait more, but erase the front of the frame queue to avoid
+ // stalling on one frame with incorrect timestamp
+ if (m_frames.size() >= m_max_frames_overrun) {
+ m_pending_sti_frame.frame.clear();
+ }
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name <<
+ " skipping frame without timestamp";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ if (m_pending_sti_frame.frame.empty()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " empty, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
+ return 0;
+ }
+ else if (not m_pending_sti_frame.timestamp.valid()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " invalid timestamp, ignoring";
+ memset(buffer, 0, size);
+ m_pending_sti_frame.frame.clear();
+ m_stats.notifyUnderrun();
+ return 0;
+ }
+ else {
+ auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
+ ts_req += m_tist_delay;
+ const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
+
+ if (offset > 24e-3) {
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
+ m_pending_sti_frame.frame.clear();
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " timestamp out of bounds, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ }
+ }
+}
+
+void Edi::m_run()
+{
+ while (m_running) {
+ switch (m_input_used) {
+ case InputUsed::UDP:
+ {
+ constexpr size_t packsize = 2048;
+ const auto packet = m_udp_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ if (not packet.buffer.empty()) {
+ m_sti_decoder.push_packet(packet.buffer);
+ }
+ else {
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+ break;
+ case InputUsed::TCP:
+ {
+ auto packet = m_tcp_receive_server.receive();
+ if (not packet.empty()) {
+ m_sti_decoder.push_bytes(packet);
+ }
+ else {
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+ break;
+ default:
+ throw logic_error("unimplemented input");
+ }
+ }
+}
+
+void Edi::m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& sti) {
+ if (not sti.frame.empty()) {
+ // We should not wait here, because we want the complete input buffering
+ // happening inside m_frames. Using the blocking function is only a protection
+ // against runaway memory usage if something goes wrong in the consumer.
+ m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2);
+ }
+}
+
+int Edi::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
+ }
+
+ return bitrate;
+}
+
+void Edi::close()
+{
+ m_udp_sock.close();
+}
+
+
+void Edi::set_parameter(const std::string& parameter, const std::string& value)
+{
+ if (parameter == "buffer") {
+ size_t new_limit = atol(value.c_str());
+ m_max_frames_overrun = new_limit;
+ }
+ else if (parameter == "prebuffering") {
+ size_t new_limit = atol(value.c_str());
+ m_num_frames_prebuffering = new_limit;
+ }
+ else if (parameter == "buffermanagement") {
+ if (value == "prebuffering") {
+ setBufferManagement(Inputs::BufferManagement::Prebuffering);
+ }
+ else if (value == "timestamped") {
+ setBufferManagement(Inputs::BufferManagement::Timestamped);
+ }
+ else {
+ throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name());
+ }
+ }
+ else if (parameter == "tistdelay") {
+ m_tist_delay = chrono::milliseconds(stoi(value));
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+}
+
+const std::string Edi::get_parameter(const std::string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << m_max_frames_overrun;
+ }
+ else if (parameter == "prebuffering") {
+ ss << m_num_frames_prebuffering;
+ }
+ else if (parameter == "buffermanagement") {
+ switch (getBufferManagement()) {
+ case Inputs::BufferManagement::Prebuffering:
+ ss << "prebuffering";
+ break;
+ case Inputs::BufferManagement::Timestamped:
+ ss << "Timestamped";
+ break;
+ }
+ }
+ else if (parameter == "tistdelay") {
+ ss << m_tist_delay.count();
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+ return ss.str();
+}
+
+}