diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2025-03-23 23:05:14 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2025-03-23 23:05:14 +0100 |
commit | ef6ea5ab6b927fcf9e5152fbf44f72646848d2c9 (patch) | |
tree | bc438ec3094b00eeee973766074e55e56f47cfd2 /lib | |
parent | d57e0e9635a18f226394b9f41feef1658a2e051c (diff) | |
download | dabmux-next.tar.gz dabmux-next.tar.bz2 dabmux-next.zip |
Common b23da85: make PFT per-output configurablenext
Diffstat (limited to 'lib')
-rw-r--r-- | lib/edioutput/EDIConfig.h | 31 | ||||
-rw-r--r-- | lib/edioutput/PFT.cpp | 12 | ||||
-rw-r--r-- | lib/edioutput/PFT.h | 15 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp | 288 | ||||
-rw-r--r-- | lib/edioutput/Transport.h | 93 |
5 files changed, 252 insertions, 187 deletions
diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 1997210..7016e87 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -36,17 +36,31 @@ namespace edi { /** Configuration for EDI output */ +struct pft_settings_t { + // protection and fragmentation settings + bool verbose = false; + bool enable_pft = false; + unsigned chunk_len = 207; // RSk, data length of each chunk + unsigned fec = 0; // number of fragments that can be recovered + double fragment_spreading_factor = 0.95; + // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) + // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. +}; + struct destination_t { virtual ~destination_t() {}; + + pft_settings_t pft_settings = {}; }; + // Can represent both unicast and multicast destinations struct udp_destination_t : public destination_t { std::string dest_addr; - unsigned int dest_port = 0; + uint16_t dest_port = 0; std::string source_addr; - unsigned int source_port = 0; - unsigned int ttl = 10; + uint16_t source_port = 0; + uint8_t ttl = 10; }; // TCP server that can accept multiple connections @@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t { }; struct configuration_t { - unsigned chunk_len = 207; // RSk, data length of each chunk - unsigned fec = 0; // number of fragments that can be recovered - bool dump = false; // dump a file with the EDI packets - bool verbose = false; - bool enable_pft = false; // Enable protection and fragmentation + bool verbose = false; unsigned int tagpacket_alignment = 0; std::vector<std::shared_ptr<destination_t> > destinations; - double fragment_spreading_factor = 0.95; - // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) - // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp index 7e0e8e9..f65fd67 100644 --- a/lib/edioutput/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,7 +31,6 @@ */ #include <vector> -#include <list> #include <cstdio> #include <cstring> #include <cstdint> @@ -41,6 +40,7 @@ #include "PFT.h" #include "crc.h" #include "ReedSolomon.h" +#include "Log.h" namespace edi { @@ -51,11 +51,10 @@ using namespace std; PFT::PFT() { } -PFT::PFT(const configuration_t &conf) : +PFT::PFT(const pft_settings_t& conf) : + m_enabled(conf.enable_pft), m_k(conf.chunk_len), m_m(conf.fec), - m_pseq(0), - m_num_chunks(0), m_verbose(conf.verbose) { if (m_k > 207) { @@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq) m_pseq = pseq; } -} - +} // namespace edi diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h index 42569a0..52e9f46 100644 --- a/lib/edioutput/PFT.h +++ b/lib/edioutput/PFT.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,12 +33,8 @@ #pragma once #include <vector> -#include <list> -#include <stdexcept> #include <cstdint> #include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h" #include "EDIConfig.h" namespace edi { @@ -52,21 +48,24 @@ class PFT static constexpr int PARITYBYTES = 48; PFT(); - PFT(const configuration_t& conf); + PFT(const pft_settings_t& conf); + + bool is_enabled() const { return m_enabled and m_k > 0; } // return a list of PFT fragments with the correct // PFT headers - std::vector< PFTFragment > Assemble(AFPacket af_packet); + std::vector<PFTFragment> Assemble(AFPacket af_packet); // Apply Reed-Solomon FEC to the AF Packet RSBlock Protect(AFPacket af_packet); // Cut a RSBlock into several fragments that can be transmitted - std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); + std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet); void OverridePSeq(uint16_t pseq); private: + bool m_enabled = false; unsigned int m_k = 207; // length of RS data word unsigned int m_m = 3; // number of fragments that can be recovered if lost uint16_t m_pseq = 0; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index a5e0bc3..e9559b5 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -25,7 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "Transport.h" -#include <iterator> +#include "Log.h" #include <cmath> #include <thread> @@ -57,13 +57,18 @@ void configuration_t::print() const else { throw logic_error("EDI destination not implemented"); } + etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft; + if (edi_dest->pft_settings.enable_pft) { + etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec; + etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len; + etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor; + } } } Sender::Sender(const configuration_t& conf) : - m_conf(conf), - edi_pft(m_conf) + m_conf(conf) { if (m_conf.verbose) { etiLog.level(info) << "Setup EDI Output"; @@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); + Socket::UDPSocket udp_socket(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - udp_socket->setMulticastTTL(udp_dest->ttl); + udp_socket.setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket.setMulticastTTL(udp_dest->ttl); } - udp_sockets.emplace(udp_dest.get(), udp_socket); + auto sender = make_shared<udp_sender_t>( + udp_dest->dest_addr, + udp_dest->dest_port, + std::move(udp_socket)); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(udp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { - auto dispatcher = make_shared<Socket::TCPDataDispatcher>( - tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers); - - dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); - tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); + auto sender = make_shared<tcp_dispatcher_t>( + tcp_dest->listen_port, + tcp_dest->max_frames_queued, + tcp_dest->tcp_server_preroll_buffers); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { - auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); - tcp_senders.emplace(tcp_dest.get(), tcp_send_client); + auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else { throw logic_error("EDI destination not implemented"); } } - if (m_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (m_conf.enable_pft) { - unique_lock<mutex> lock(m_mutex); + { m_running = true; m_thread = thread(&Sender::run, this); } @@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) : } } +void Sender::write(const TagPacket& tagpacket) +{ + // Assemble into one AF Packet + edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket); + + write(af_packet); +} + +void Sender::write(const AFPacket& af_packet) +{ + for (auto& sender : m_pft_spreaders) { + sender->send_af_packet(af_packet); + } +} + +void Sender::override_af_sequence(uint16_t seq) +{ + edi_af_packetiser.OverrideSeq(seq); +} + +void Sender::override_pft_sequence(uint16_t pseq) +{ + for (auto& spreader : m_pft_spreaders) { + spreader->edi_pft.OverridePSeq(pseq); + } +} + +std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +{ + std::vector<Sender::stats_t> stats; + + for (auto& spreader : m_pft_spreaders) { + if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) { + Sender::stats_t s; + s.listen_port = sender->listen_port; + s.stats = sender->sock.get_stats(); + stats.push_back(s); + } + } + + return stats; +} + Sender::~Sender() { { - unique_lock<mutex> lock(m_mutex); m_running = false; } @@ -123,36 +172,89 @@ Sender::~Sender() } } -void Sender::write(const TagPacket& tagpacket) +void Sender::run() { - // Assemble into one AF Packet - edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + while (m_running) { + const auto now = chrono::steady_clock::now(); + for (auto& spreader : m_pft_spreaders) { + spreader->tick(now); + } - write(af_packet); + this_thread::sleep_for(chrono::microseconds(500)); + } } -void Sender::write(const AFPacket& af_packet) + +void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame) +{ + Socket::InetAddress addr; + addr.resolveUdpDestination(dest_addr, dest_port); + sock.send(frame, addr); +} + +void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame) +{ + sock.write(frame); +} + +void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame) +{ + sock.sendall(frame); +} + +Sender::udp_sender_t::udp_sender_t(std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock) : + dest_addr(dest_addr), + dest_port(dest_port), + sock(std::move(sock)) +{ +} + +Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers) : + listen_port(listen_port), + sock(max_frames_queued, tcp_server_preroll_buffers) { - if (m_conf.enable_pft) { + sock.start(listen_port, "0.0.0.0"); +} + +Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr, + uint16_t dest_port) : + sock(dest_addr, dest_port) +{ +} + +Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) : + sender(sender), + edi_pft(conf) +{ +} + +void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet) +{ + using namespace std::chrono; + if (edi_pft.is_enabled()) { // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); - if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { + if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) { etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); - m_last_num_pft_fragments = edi_fragments.size(); + last_num_pft_fragments = edi_fragments.size(); } /* Spread out the transmission of all fragments over part of the 24ms AF packet duration * to reduce the risk of losing a burst of fragments because of congestion. */ - using namespace std::chrono; auto inter_fragment_wait_time = microseconds(1); if (edi_fragments.size() > 1) { - if (m_conf.fragment_spreading_factor > 0) { + if (settings.fragment_spreading_factor > 0) { inter_fragment_wait_time = - microseconds( - llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) - ); + microseconds(llrint( + settings.fragment_spreading_factor * 24000.0 / + edi_fragments.size() + )); } } @@ -162,121 +264,35 @@ void Sender::write(const AFPacket& af_packet) auto tp = now; unique_lock<mutex> lock(m_mutex); for (auto& edi_frag : edi_fragments) { - m_pending_frames[tp] = move(edi_frag); + m_pending_frames[tp] = std::move(edi_frag); tp += inter_fragment_wait_time; } } - - // Transmission done in run() function } else /* PFT disabled */ { - // Send over ethernet - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(af_packet.begin(), af_packet.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { - fprintf(stderr, "EDI Output: AF packet larger than 1400," - " consider using PFT to avoid UP fragmentation.\n"); - m_udp_fragmentation_warning_printed = true; - } - - udp_sockets.at(udp_dest.get())->send(af_packet, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(af_packet); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet); - - if (m_conf.verbose and error_stats.has_seen_new_errors) { - fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n", - tcp_dest->dest_addr.c_str(), - tcp_dest->dest_port, - error_stats.num_reconnects, - error_stats.last_error.c_str()); - } - } - else { - throw logic_error("EDI destination not implemented"); - } - } + const auto now = steady_clock::now(); + unique_lock<mutex> lock(m_mutex); + m_pending_frames[now] = std::move(af_packet); } -} -void Sender::override_af_sequence(uint16_t seq) -{ - edi_afPacketiser.OverrideSeq(seq); -} - -void Sender::override_pft_sequence(uint16_t pseq) -{ - edi_pft.OverridePSeq(pseq); + // Actual transmission done in tick() function } -std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now) { - std::vector<Sender::stats_t> stats; + unique_lock<mutex> lock(m_mutex); - for (auto& el : tcp_dispatchers) { - Sender::stats_t s; - s.listen_port = el.first->listen_port; - s.stats = el.second->get_stats(); - stats.push_back(s); - } - - return stats; -} + for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { + const auto& edi_frag = it->second; -void Sender::run() -{ - while (m_running) { - unique_lock<mutex> lock(m_mutex); - const auto now = chrono::steady_clock::now(); - - // Send over ethernet - for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { - const auto& edi_frag = it->second; - - if (it->first <= now) { - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - it = m_pending_frames.erase(it); - } - else { - ++it; - } + if (it->first <= now) { + sender->send_packet(edi_frag); + it = m_pending_frames.erase(it); + } + else { + ++it; } - - lock.unlock(); - this_thread::sleep_for(chrono::microseconds(500)); } } -} +} // namespace edi diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 2ca638e..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -33,8 +33,6 @@ #include "Socket.h" #include <chrono> #include <map> -#include <unordered_map> -#include <fstream> #include <cstdint> #include <thread> #include <mutex> @@ -43,12 +41,13 @@ namespace edi { /** ETI/STI sender for EDI output */ - class Sender { public: Sender(const configuration_t& conf); Sender(const Sender&) = delete; - Sender operator=(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(Sender&&) = delete; ~Sender(); // Assemble the tagpacket into an AF packet, and if needed, @@ -72,32 +71,78 @@ class Sender { std::vector<stats_t> get_tcp_server_stats() const; private: - void run(); - - bool m_udp_fragmentation_warning_printed = false; - configuration_t m_conf; - std::ofstream edi_debug_file; // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; + edi::AFPacketiser edi_af_packetiser; - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; + // PFT spreading requires sending UDP packets at specific time, + // independently of time when write() gets called + bool m_running = false; + std::thread m_thread; + virtual void run(); - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; - // PFT spreading requires sending UDP packets at specific time, independently of - // time when write() gets called - std::thread m_thread; - std::mutex m_mutex; - bool m_running = false; - std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; - size_t m_last_num_pft_fragments = 0; -}; -} + struct i_sender { + virtual void send_packet(const std::vector<uint8_t> &frame) = 0; + virtual ~i_sender() { } + }; + + struct udp_sender_t : public i_sender { + udp_sender_t( + std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock); + + std::string dest_addr; + uint16_t dest_port; + Socket::UDPSocket sock; + + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_dispatcher_t : public i_sender { + tcp_dispatcher_t( + uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers); + + uint16_t listen_port; + Socket::TCPDataDispatcher sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_send_client_t : public i_sender { + tcp_send_client_t( + const std::string& dest_addr, + uint16_t dest_port); + + Socket::TCPSendClient sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + class PFTSpreader { + public: + using sender_sp = std::shared_ptr<i_sender>; + PFTSpreader(const pft_settings_t &conf, sender_sp sender); + sender_sp sender; + edi::PFT edi_pft; + + void send_af_packet(const AFPacket &af_packet); + void tick(const std::chrono::steady_clock::time_point& now); + + private: + // send_af_packet() and tick() are called from different threads, both + // are accessing m_pending_frames + std::mutex m_mutex; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + pft_settings_t settings; + size_t last_num_pft_fragments = 0; + }; + + std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} |