/*
Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
EDI output,
UDP and TCP transports and their configuration
*/
/*
This file is part of the ODR-mmbTools.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#include "Transport.h"
#include "Log.h"
#include
#include
using namespace std;
namespace edi {
void configuration_t::print() const
{
etiLog.level(info) << "EDI Output";
etiLog.level(info) << " verbose " << verbose;
for (auto edi_dest : destinations) {
if (auto udp_dest = dynamic_pointer_cast(edi_dest)) {
etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port;
if (not udp_dest->source_addr.empty()) {
etiLog.level(info) << " source " << udp_dest->source_addr;
etiLog.level(info) << " ttl " << udp_dest->ttl;
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
else {
throw logic_error("EDI destination not implemented");
}
etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft;
if (edi_dest->pft_settings.enable_pft) {
etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec;
etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len;
etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor;
}
}
}
Sender::Sender(const configuration_t& conf) :
m_conf(conf)
{
if (m_conf.verbose) {
etiLog.level(info) << "Setup EDI Output";
}
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) {
Socket::UDPSocket udp_socket(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
udp_socket.setMulticastSource(udp_dest->source_addr.c_str());
udp_socket.setMulticastTTL(udp_dest->ttl);
}
auto sender = make_shared(
udp_dest->dest_addr,
udp_dest->dest_port,
std::move(udp_socket));
m_pft_spreaders.emplace_back(
make_shared(udp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
auto sender = make_shared(
tcp_dest->listen_port,
tcp_dest->max_frames_queued,
tcp_dest->tcp_server_preroll_buffers);
m_pft_spreaders.emplace_back(
make_shared(tcp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
auto sender = make_shared(tcp_dest->dest_addr, tcp_dest->dest_port);
m_pft_spreaders.emplace_back(
make_shared(tcp_dest->pft_settings, sender));
}
else {
throw logic_error("EDI destination not implemented");
}
}
{
m_running = true;
m_thread = thread(&Sender::run, this);
}
if (m_conf.verbose) {
etiLog.log(info, "EDI output set up");
}
}
void Sender::write(const TagPacket& tagpacket)
{
// Assemble into one AF Packet
edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket);
write(af_packet);
}
void Sender::write(const AFPacket& af_packet)
{
for (auto& sender : m_pft_spreaders) {
sender->send_af_packet(af_packet);
}
}
void Sender::override_af_sequence(uint16_t seq)
{
edi_af_packetiser.OverrideSeq(seq);
}
void Sender::override_pft_sequence(uint16_t pseq)
{
for (auto& spreader : m_pft_spreaders) {
spreader->edi_pft.OverridePSeq(pseq);
}
}
std::vector Sender::get_tcp_server_stats() const
{
std::vector stats;
for (auto& spreader : m_pft_spreaders) {
if (auto sender = std::dynamic_pointer_cast(spreader->sender)) {
Sender::stats_t s;
s.listen_port = sender->listen_port;
s.stats = sender->sock.get_stats();
stats.push_back(s);
}
}
return stats;
}
Sender::~Sender()
{
{
m_running = false;
}
if (m_thread.joinable()) {
m_thread.join();
}
}
void Sender::run()
{
while (m_running) {
const auto now = chrono::steady_clock::now();
for (auto& spreader : m_pft_spreaders) {
spreader->tick(now);
}
this_thread::sleep_for(chrono::microseconds(500));
}
}
void Sender::udp_sender_t::send_packet(const std::vector &frame)
{
Socket::InetAddress addr;
addr.resolveUdpDestination(dest_addr, dest_port);
sock.send(frame, addr);
}
void Sender::tcp_dispatcher_t::send_packet(const std::vector &frame)
{
sock.write(frame);
}
void Sender::tcp_send_client_t::send_packet(const std::vector &frame)
{
sock.sendall(frame);
}
Sender::udp_sender_t::udp_sender_t(std::string dest_addr,
uint16_t dest_port,
Socket::UDPSocket&& sock) :
dest_addr(dest_addr),
dest_port(dest_port),
sock(std::move(sock))
{
}
Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,
size_t max_frames_queued,
size_t tcp_server_preroll_buffers) :
listen_port(listen_port),
sock(max_frames_queued, tcp_server_preroll_buffers)
{
sock.start(listen_port, "0.0.0.0");
}
Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr,
uint16_t dest_port) :
sock(dest_addr, dest_port)
{
}
Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) :
sender(sender),
edi_pft(conf)
{
}
void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet)
{
using namespace std::chrono;
if (edi_pft.is_enabled()) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector edi_fragments = edi_pft.Assemble(af_packet);
if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) {
etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
* to reduce the risk of losing a burst of fragments because of congestion. */
auto inter_fragment_wait_time = microseconds(1);
if (edi_fragments.size() > 1) {
if (settings.fragment_spreading_factor > 0) {
inter_fragment_wait_time =
microseconds(llrint(
settings.fragment_spreading_factor * 24000.0 /
edi_fragments.size()
));
}
}
/* Separate insertion into map and transmission so as to make spreading possible */
const auto now = steady_clock::now();
{
auto tp = now;
unique_lock lock(m_mutex);
for (auto& edi_frag : edi_fragments) {
m_pending_frames[tp] = std::move(edi_frag);
tp += inter_fragment_wait_time;
}
}
}
else /* PFT disabled */ {
const auto now = steady_clock::now();
unique_lock lock(m_mutex);
m_pending_frames[now] = std::move(af_packet);
}
// Actual transmission done in tick() function
}
void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now)
{
unique_lock lock(m_mutex);
for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
const auto& edi_frag = it->second;
if (it->first <= now) {
sender->send_packet(edi_frag);
it = m_pending_frames.erase(it);
}
else {
++it;
}
}
}
} // namespace edi