/*
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This file is part of ODR-DabMux.
ODR-DabMux is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMux is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMux. If not, see .
*/
#include "input/Edi.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "utils.h"
using namespace std;
namespace Inputs {
constexpr bool VERBOSE = false;
constexpr size_t TCP_BLOCKSIZE = 2048;
/* Absolute max number of frames to be queued, both with and without timestamping */
constexpr size_t MAX_FRAMES_QUEUED = 1000;
/* When using timestamping, start discarding the front of the queue once the queue
* is this full. Must be smaller than MAX_FRAMES_QUEUED. */
constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;
/* When not using timestamping, how many frames to prebuffer.
* TODO should be configurable as ZMQ.
*/
constexpr size_t NUM_FRAMES_PREBUFFERING = 10;
Edi::Edi() :
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(),
m_sti_decoder(m_sti_writer, VERBOSE)
{ }
Edi::~Edi() {
m_running = false;
if (m_thread.joinable()) {
m_thread.join();
}
}
void Edi::open(const std::string& name)
{
const std::regex re_udp("udp://:([0-9]+)");
const std::regex re_tcp("tcp://(.*):([0-9]+)");
lock_guard lock(m_mutex);
m_running = false;
if (m_thread.joinable()) {
m_thread.join();
}
std::smatch m;
if (std::regex_match(name, m, re_udp)) {
const int udp_port = std::stoi(m[1].str());
m_input_used = InputUsed::UDP;
m_udp_sock.reinit(udp_port);
m_udp_sock.setBlocking(false);
// TODO multicast
}
else if (std::regex_match(name, m, re_tcp)) {
m_input_used = InputUsed::TCP;
const string addr = m[1].str();
const int tcp_port = std::stoi(m[2].str());
m_tcp_receive_server.start(tcp_port, addr);
}
else {
throw runtime_error("Cannot parse EDI input URI");
}
m_name = name;
m_running = true;
m_thread = std::thread(&Edi::m_run, this);
}
size_t Edi::readFrame(uint8_t *buffer, size_t size)
{
EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING;
if (not m_is_prebuffering) {
etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
}
memset(buffer, 0, size * sizeof(*buffer));
return 0;
}
else if (not m_pending_sti_frame.frame.empty()) {
if (m_pending_sti_frame.frame.size() != size) {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
return 0;
}
else {
copy(m_pending_sti_frame.frame.begin(),
m_pending_sti_frame.frame.end(),
buffer);
m_pending_sti_frame.frame.clear();
return size;
}
}
else if (m_frames.try_pop(sti)) {
if (sti.frame.size() == 0) {
etiLog.level(debug) << "EDI input " << m_name << " empty frame";
return 0;
}
else if (sti.frame.size() == size) {
copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
return size;
}
else {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
sti.frame.size() << " received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
return 0;
}
}
else {
memset(buffer, 0, size * sizeof(*buffer));
m_is_prebuffering = true;
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
return 0;
}
}
size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
{
if (m_pending_sti_frame.frame.empty()) {
m_frames.try_pop(m_pending_sti_frame);
}
if (m_is_prebuffering) {
if (m_pending_sti_frame.frame.empty()) {
memset(buffer, 0, size);
return 0;
}
else if (m_pending_sti_frame.frame.size() == size) {
// readFrame gets called every 24ms, so we allow max 24ms
// difference between the input frame timestamp and the requested
// timestamp.
if (m_pending_sti_frame.timestamp.valid()) {
const auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
if (offset < 24e-3) {
m_is_prebuffering = false;
etiLog.level(warn) << "EDI input " << m_name <<
" valid timestamp, pre-buffering complete";
copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
m_pending_sti_frame.frame.clear();
return size;
}
else {
// Wait more, but erase the front of the frame queue to avoid
// stalling on one frame with incorrect timestamp
if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {
m_pending_sti_frame.frame.clear();
}
memset(buffer, 0, size);
return 0;
}
}
else {
etiLog.level(debug) << "EDI input " << m_name <<
" skipping frame without timestamp";
m_pending_sti_frame.frame.clear();
memset(buffer, 0, size);
return 0;
}
}
else {
etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
m_pending_sti_frame.frame.size() << " received, " << size << " requested";
m_pending_sti_frame.frame.clear();
memset(buffer, 0, size);
return 0;
}
}
else {
EdiDecoder::sti_frame_t sti_frame;
m_frames.try_pop(sti_frame);
if (sti_frame.frame.empty()) {
etiLog.level(warn) << "EDI input " << m_name <<
" empty, re-enabling pre-buffering";
memset(buffer, 0, size);
return 0;
}
else if (not sti_frame.timestamp.valid()) {
etiLog.level(warn) << "EDI input " << m_name <<
" invalid timestamp, re-enabling pre-buffering";
memset(buffer, 0, size);
return 0;
}
else {
const auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
if (offset > 24e-3) {
m_is_prebuffering = true;
etiLog.level(warn) << "EDI input " << m_name <<
" timestamp out of bounds, re-enabling pre-buffering";
memset(buffer, 0, size);
return 0;
}
else {
copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer);
return size;
}
}
}
}
void Edi::m_run()
{
while (m_running) {
bool work_done = false;
switch (m_input_used) {
case InputUsed::UDP:
{
constexpr size_t packsize = 2048;
const auto packet = m_udp_sock.receive(packsize);
if (packet.buffer.size() == packsize) {
fprintf(stderr, "Warning, possible UDP truncation\n");
}
if (not packet.buffer.empty()) {
m_sti_decoder.push_packet(packet.buffer);
work_done = true;
}
}
break;
case InputUsed::TCP:
{
auto packet = m_tcp_receive_server.receive();
if (not packet.empty()) {
m_sti_decoder.push_bytes(packet);
work_done = true;
}
}
break;
default:
throw logic_error("unimplemented input");
}
const auto sti = m_sti_writer.getFrame();
if (not sti.frame.empty()) {
m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);
work_done = true;
}
if (not work_done) {
// Avoid fast loop
this_thread::sleep_for(chrono::milliseconds(12));
}
}
}
int Edi::setBitrate(int bitrate)
{
if (bitrate <= 0) {
throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
}
return bitrate;
}
void Edi::close()
{
m_udp_sock.close();
}
}