/*
Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
EDI output,
UDP and TCP transports and their configuration
*/
/*
This file is part of the ODR-mmbTools.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#include "Transport.h"
#include
#include
#include
using namespace std;
namespace edi {
void configuration_t::print() const
{
etiLog.level(info) << "EDI Output";
etiLog.level(info) << " verbose " << verbose;
for (auto edi_dest : destinations) {
if (auto udp_dest = dynamic_pointer_cast(edi_dest)) {
etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port;
if (not udp_dest->source_addr.empty()) {
etiLog.level(info) << " source " << udp_dest->source_addr;
etiLog.level(info) << " ttl " << udp_dest->ttl;
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
else {
throw logic_error("EDI destination not implemented");
}
}
}
Sender::Sender(const configuration_t& conf) :
m_conf(conf),
edi_pft(m_conf)
{
if (m_conf.verbose) {
etiLog.level(info) << "Setup EDI Output";
}
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) {
auto udp_socket = std::make_shared(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
udp_socket->setMulticastTTL(udp_dest->ttl);
}
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
auto dispatcher = make_shared(
tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers);
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) {
auto tcp_send_client = make_shared(tcp_dest->dest_addr, tcp_dest->dest_port);
tcp_senders.emplace(tcp_dest.get(), tcp_send_client);
}
else {
throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
edi_debug_file.open("./edi.debug");
}
if (m_conf.enable_pft) {
unique_lock lock(m_mutex);
m_running = true;
m_thread = thread(&Sender::run, this);
}
if (m_conf.verbose) {
etiLog.log(info, "EDI output set up");
}
}
Sender::~Sender()
{
{
unique_lock lock(m_mutex);
m_running = false;
}
if (m_thread.joinable()) {
m_thread.join();
}
}
void Sender::write(const TagPacket& tagpacket)
{
// Assemble into one AF Packet
edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
write(af_packet);
}
void Sender::write(const AFPacket& af_packet)
{
if (m_conf.enable_pft) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) {
etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
m_last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
* to reduce the risk of losing a burst of fragments because of congestion. */
using namespace std::chrono;
auto inter_fragment_wait_time = microseconds(1);
if (edi_fragments.size() > 1) {
if (m_conf.fragment_spreading_factor > 0) {
inter_fragment_wait_time =
microseconds(
llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size())
);
}
}
/* Separate insertion into map and transmission so as to make spreading possible */
const auto now = steady_clock::now();
{
auto tp = now;
unique_lock lock(m_mutex);
for (auto& edi_frag : edi_fragments) {
m_pending_frames[tp] = move(edi_frag);
tp += inter_fragment_wait_time;
}
}
// Transmission done in run() function
}
else /* PFT disabled */ {
// Send over ethernet
if (m_conf.dump) {
ostream_iterator debug_iterator(edi_debug_file);
copy(af_packet.begin(), af_packet.end(), debug_iterator);
}
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast(dest)) {
Socket::InetAddress addr;
addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
fprintf(stderr, "EDI Output: AF packet larger than 1400,"
" consider using PFT to avoid UP fragmentation.\n");
m_udp_fragmentation_warning_printed = true;
}
udp_sockets.at(udp_dest.get())->send(af_packet, addr);
}
else if (auto tcp_dest = dynamic_pointer_cast(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
else if (auto tcp_dest = dynamic_pointer_cast(dest)) {
const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet);
if (error_stats.has_seen_new_errors) {
fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n",
tcp_dest->dest_addr.c_str(),
tcp_dest->dest_port,
error_stats.num_reconnects,
error_stats.last_error.c_str());
}
}
else {
throw logic_error("EDI destination not implemented");
}
}
}
}
void Sender::override_af_sequence(uint16_t seq)
{
edi_afPacketiser.OverrideSeq(seq);
}
void Sender::override_pft_sequence(uint16_t pseq)
{
edi_pft.OverridePSeq(pseq);
}
void Sender::run()
{
while (m_running) {
unique_lock lock(m_mutex);
const auto now = chrono::steady_clock::now();
// Send over ethernet
for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
const auto& edi_frag = it->second;
if (it->first <= now) {
if (m_conf.dump) {
ostream_iterator debug_iterator(edi_debug_file);
copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast(dest)) {
Socket::InetAddress addr;
addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
}
else if (auto tcp_dest = dynamic_pointer_cast(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
else if (auto tcp_dest = dynamic_pointer_cast(dest)) {
tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
}
else {
throw logic_error("EDI destination not implemented");
}
}
it = m_pending_frames.erase(it);
}
else {
++it;
}
}
lock.unlock();
this_thread::sleep_for(chrono::microseconds(500));
}
}
}