/*
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This file is part of ODR-DabMux.
ODR-DabMux is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMux is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMux. If not, see .
*/
#include "input/Edi.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Socket.h"
#include "edi/common.hpp"
#include "utils.h"
using namespace std;
namespace Inputs {
constexpr size_t TCP_BLOCKSIZE = 2048;
Edi::Edi(const std::string& name, const dab_input_edi_config_t& config) :
RemoteControllable(name),
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(bind(&Edi::m_new_sti_frame_callback, this, placeholders::_1)),
m_sti_decoder(m_sti_writer),
m_max_frames_overrun(config.buffer_size),
m_num_frames_prebuffering(config.prebuffering),
m_name(name),
m_stats(name)
{
constexpr bool VERBOSE = false;
m_sti_decoder.set_verbose(VERBOSE);
RC_ADD_PARAMETER(buffermanagement,
"Set type of buffer management to use [prebuffering, timestamped]");
RC_ADD_PARAMETER(buffer,
"Size of the input buffer [24ms frames]");
RC_ADD_PARAMETER(prebuffering,
"Min buffer level before streaming starts [24ms frames]");
RC_ADD_PARAMETER(tistdelay, "TIST delay to add [ms]");
}
Edi::~Edi() {
m_running = false;
if (m_thread.joinable()) {
m_thread.join();
}
}
void Edi::open(const std::string& name)
{
const std::regex re_udp("udp://:([0-9]+)");
const std::regex re_udp_multicast("udp://@([0-9.]+):([0-9]+)");
const std::regex re_udp_multicast_bindto("udp://([0-9.])+@([0-9.]+):([0-9]+)");
const std::regex re_tcp("tcp://(.*):([0-9]+)");
lock_guard lock(m_mutex);
m_running = false;
if (m_thread.joinable()) {
m_thread.join();
}
std::smatch m;
if (std::regex_match(name, m, re_udp)) {
const int udp_port = std::stoi(m[1].str());
m_input_used = InputUsed::UDP;
m_udp_sock.reinit(udp_port);
m_udp_sock.setBlocking(false);
}
else if (std::regex_match(name, m, re_udp_multicast_bindto)) {
const string bind_to = m[1].str();
const string multicast_address = m[2].str();
const int udp_port = std::stoi(m[3].str());
m_input_used = InputUsed::UDP;
if (IN_MULTICAST(ntohl(inet_addr(multicast_address.c_str())))) {
m_udp_sock.init_receive_multicast(udp_port, bind_to, multicast_address);
}
else {
throw runtime_error(string("Address ") + multicast_address + " is not a multicast address");
}
m_udp_sock.setBlocking(false);
}
else if (std::regex_match(name, m, re_udp_multicast)) {
const string multicast_address = m[1].str();
const int udp_port = std::stoi(m[2].str());
m_input_used = InputUsed::UDP;
if (IN_MULTICAST(ntohl(inet_addr(multicast_address.c_str())))) {
m_udp_sock.init_receive_multicast(udp_port, "0.0.0.0", multicast_address);
}
else {
throw runtime_error(string("Address ") + multicast_address + " is not a multicast address");
}
m_udp_sock.setBlocking(false);
}
else if (std::regex_match(name, m, re_tcp)) {
m_input_used = InputUsed::TCP;
const string addr = m[1].str();
const int tcp_port = std::stoi(m[2].str());
m_tcp_receive_server.start(tcp_port, addr);
}
else {
throw runtime_error(string("Cannot parse EDI input URI '") + name + "'");
}
m_stats.registerAtServer();
m_running = true;
m_thread = std::thread(&Edi::m_run, this);
}
size_t Edi::readFrame(uint8_t *buffer, size_t size)
{
// Save stats data in bytes, not in frames
m_stats.notifyBuffer(m_frames.size() * size);
m_stats.notifyTimestampOffset(0);
EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering;
if (not m_is_prebuffering) {
etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
}
memset(buffer, 0, size * sizeof(*buffer));
m_stats.notifyUnderrun();
return 0;
}
else if (not m_pending_sti_frame.frame.empty()) {
// Can only happen when switching from timestamp-based buffer management!
if (m_pending_sti_frame.frame.size() != size) {
if (not m_size_mismatch_printed) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
m_size_mismatch_printed = true;
}
memset(buffer, 0, size * sizeof(*buffer));
m_stats.notifyUnderrun();
return 0;
}
else {
if (not m_pending_sti_frame.version_data.version.empty()) {
m_stats.notifyVersion(
m_pending_sti_frame.version_data.version,
m_pending_sti_frame.version_data.uptime_s);
}
m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right);
copy(m_pending_sti_frame.frame.begin(),
m_pending_sti_frame.frame.end(),
buffer);
m_pending_sti_frame.frame.clear();
m_size_mismatch_printed = false;
return size;
}
}
else if (m_frames.try_pop(sti)) {
if (sti.frame.size() == 0) {
etiLog.level(debug) << "EDI input " << m_name << " empty frame";
memset(buffer, 0, size * sizeof(*buffer));
m_stats.notifyUnderrun();
return 0;
}
else if (sti.frame.size() == size) {
// Steady-state when everything works well
if (m_frames.size() > m_max_frames_overrun) {
m_stats.notifyOverrun();
/* If the buffer is too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length. */
size_t over_max = m_frames.size() - m_num_frames_prebuffering;
while (over_max--) {
EdiDecoder::sti_frame_t discard;
m_frames.try_pop(discard);
}
}
if (not sti.version_data.version.empty()) {
m_stats.notifyVersion(
sti.version_data.version,
sti.version_data.uptime_s);
}
m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right);
copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
m_size_mismatch_printed = false;
return size;
}
else {
if (not m_size_mismatch_printed) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
sti.frame.size() << " received, " << size << " requested";
m_size_mismatch_printed = true;
}
memset(buffer, 0, size * sizeof(*buffer));
m_stats.notifyUnderrun();
return 0;
}
}
else {
memset(buffer, 0, size * sizeof(*buffer));
m_is_prebuffering = true;
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
m_size_mismatch_printed = false;
m_stats.notifyUnderrun();
return 0;
}
}
size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
{
if (m_pending_sti_frame.frame.empty()) {
m_frames.try_pop(m_pending_sti_frame);
}
m_stats.notifyBuffer(m_frames.size() * size);
if (m_is_prebuffering) {
size_t num_discarded_wrong_size = 0;
size_t num_discarded_invalid_ts = 0;
size_t num_discarded_late = 0;
while (not m_pending_sti_frame.frame.empty()) {
if (m_pending_sti_frame.frame.size() == size) {
if (m_pending_sti_frame.timestamp.is_valid()) {
auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
ts_req += m_tist_delay;
const double offset = ts_req.diff_s(m_pending_sti_frame.timestamp);
m_stats.notifyTimestampOffset(offset);
if (offset < 0) {
// Too far in the future
break;
}
else if (offset < 24e-3) {
// Just right
m_is_prebuffering = false;
etiLog.level(info) << "EDI input " << m_name << " valid timestamp, pre-buffering complete." <<
" Wrong size: " << num_discarded_wrong_size <<
" Invalid TS: " << num_discarded_invalid_ts <<
" Late: " << num_discarded_late;
if (not m_pending_sti_frame.version_data.version.empty()) {
m_stats.notifyVersion(
m_pending_sti_frame.version_data.version,
m_pending_sti_frame.version_data.uptime_s);
}
m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
m_pending_sti_frame.audio_levels.right);
copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
m_pending_sti_frame.frame.clear();
return size;
}
else {
// Too late
num_discarded_late++;
}
}
else {
num_discarded_invalid_ts++;
}
}
else {
num_discarded_wrong_size++;
}
m_pending_sti_frame.frame.clear();
m_frames.try_pop(m_pending_sti_frame);
}
if (num_discarded_wrong_size > 0) {
etiLog.level(warn) << "EDI input " << m_name << ": " <<
num_discarded_wrong_size << " packets with wrong size.";
}
if (num_discarded_invalid_ts > 0) {
etiLog.level(warn) << "EDI input " << m_name << ": " <<
num_discarded_invalid_ts << " packets with invalid timestamp.";
}
memset(buffer, 0, size);
m_stats.notifyUnderrun();
return 0;
}
else {
if (m_pending_sti_frame.frame.empty()) {
etiLog.level(warn) << "EDI input " << m_name <<
" empty, re-enabling pre-buffering";
memset(buffer, 0, size);
m_stats.notifyUnderrun();
m_is_prebuffering = true;
return 0;
}
else if (not m_pending_sti_frame.timestamp.is_valid()) {
etiLog.level(warn) << "EDI input " << m_name <<
" invalid timestamp, ignoring";
memset(buffer, 0, size);
m_pending_sti_frame.frame.clear();
m_stats.notifyUnderrun();
return 0;
}
else {
auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
ts_req += m_tist_delay;
const double offset = m_pending_sti_frame.timestamp.diff_s(ts_req);
m_stats.notifyTimestampOffset(offset);
if (-24e-3 < offset and offset <= 0) {
if (not m_pending_sti_frame.version_data.version.empty()) {
m_stats.notifyVersion(
m_pending_sti_frame.version_data.version,
m_pending_sti_frame.version_data.uptime_s);
}
m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
m_pending_sti_frame.audio_levels.right);
copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
m_pending_sti_frame.frame.clear();
return size;
}
else {
m_stats.notifyUnderrun();
m_is_prebuffering = true;
m_pending_sti_frame.frame.clear();
etiLog.level(warn) << "EDI input " << m_name <<
" timestamp out of bounds, re-enabling pre-buffering";
memset(buffer, 0, size);
return 0;
}
}
}
}
void Edi::m_run()
{
while (m_running) {
try {
switch (m_input_used) {
case InputUsed::UDP:
{
constexpr size_t packsize = 2048;
auto packet = m_udp_sock.receive(packsize);
if (packet.buffer.size() == packsize) {
fprintf(stderr, "Warning, possible UDP truncation\n");
}
if (not packet.buffer.empty()) {
EdiDecoder::Packet p(move(packet.buffer));
m_sti_decoder.push_packet(p);
}
else {
this_thread::sleep_for(chrono::milliseconds(12));
}
}
break;
case InputUsed::TCP:
{
auto message = m_tcp_receive_server.receive();
if (auto data = dynamic_pointer_cast(message)) {
m_sti_decoder.push_bytes(data->data);
}
else if (dynamic_pointer_cast(message)) {
etiLog.level(info) << "EDI input " << m_name << " disconnected";
m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state
}
else if (dynamic_pointer_cast(message)) {
this_thread::sleep_for(chrono::milliseconds(12));
}
else {
throw logic_error("unimplemented TCPReceiveMessage type");
}
}
break;
default:
throw logic_error("unimplemented input");
}
}
catch (const invalid_argument& e) {
etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what();
m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state
this_thread::sleep_for(chrono::milliseconds(8));
}
catch (const runtime_error& e) {
etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what();
m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state
this_thread::sleep_for(chrono::milliseconds(8));
}
}
}
void Edi::m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& sti) {
if (not sti.frame.empty()) {
// We should not wait here, because we want the complete input buffering
// happening inside m_frames. Using the blocking function is only a protection
// against runaway memory usage if something goes wrong in the consumer.
m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2);
}
}
int Edi::setBitrate(int bitrate)
{
if (bitrate <= 0) {
throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
}
return bitrate;
}
void Edi::close()
{
m_udp_sock.close();
}
void Edi::set_parameter(const std::string& parameter, const std::string& value)
{
if (parameter == "buffer") {
size_t new_limit = atol(value.c_str());
m_max_frames_overrun = new_limit;
}
else if (parameter == "prebuffering") {
size_t new_limit = atol(value.c_str());
m_num_frames_prebuffering = new_limit;
}
else if (parameter == "buffermanagement") {
if (value == "prebuffering") {
setBufferManagement(Inputs::BufferManagement::Prebuffering);
}
else if (value == "timestamped") {
setBufferManagement(Inputs::BufferManagement::Timestamped);
}
else {
throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name());
}
}
else if (parameter == "tistdelay") {
m_tist_delay = chrono::milliseconds(stoi(value));
}
else {
throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
}
}
const std::string Edi::get_parameter(const std::string& parameter) const
{
stringstream ss;
if (parameter == "buffer") {
ss << m_max_frames_overrun;
}
else if (parameter == "prebuffering") {
ss << m_num_frames_prebuffering;
}
else if (parameter == "buffermanagement") {
switch (getBufferManagement()) {
case Inputs::BufferManagement::Prebuffering:
ss << "prebuffering";
break;
case Inputs::BufferManagement::Timestamped:
ss << "timestamped";
break;
}
}
else if (parameter == "tistdelay") {
ss << m_tist_delay.count();
}
else {
throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
}
return ss.str();
}
const json::map_t Edi::get_all_values() const
{
json::map_t map;
map["buffer"].v = m_max_frames_overrun;
map["prebuffering"].v = m_num_frames_prebuffering;
switch (getBufferManagement()) {
case Inputs::BufferManagement::Prebuffering:
map["buffermanagement"].v = "prebuffering";
break;
case Inputs::BufferManagement::Timestamped:
map["buffermanagement"].v = "timestamped";
break;
}
map["tistdelay"].v = m_tist_delay.count();
return map;
}
}