diff options
-rw-r--r-- | ChangeLog | 10 | ||||
-rw-r--r-- | configure.ac | 4 | ||||
-rw-r--r-- | doc/advanced.mux | 9 | ||||
-rw-r--r-- | lib/Json.cpp | 4 | ||||
-rw-r--r-- | lib/Json.h | 4 | ||||
-rw-r--r-- | lib/edioutput/EDIConfig.h | 31 | ||||
-rw-r--r-- | lib/edioutput/PFT.cpp | 12 | ||||
-rw-r--r-- | lib/edioutput/PFT.h | 15 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp | 288 | ||||
-rw-r--r-- | lib/edioutput/Transport.h | 93 | ||||
-rw-r--r-- | man/odr-dabmux.1 | 2 | ||||
-rw-r--r-- | src/DabMultiplexer.cpp | 35 | ||||
-rw-r--r-- | src/DabMultiplexer.h | 7 | ||||
-rw-r--r-- | src/DabMux.cpp | 52 | ||||
-rw-r--r-- | src/fig/FIG.h | 10 | ||||
-rw-r--r-- | src/fig/FIG0_10.cpp | 11 | ||||
-rw-r--r-- | src/fig/FIGCarousel.cpp | 7 | ||||
-rw-r--r-- | src/fig/FIGCarousel.h | 4 | ||||
-rw-r--r-- | src/utils.cpp | 24 | ||||
-rw-r--r-- | src/utils.h | 6 |
20 files changed, 365 insertions, 263 deletions
@@ -1,6 +1,16 @@ This file contains information about the changes done to ODR-DabMux in this repository +2025-05-19: Matthias P. Braendli <matthias@mpb.li> + (v5.2.0): + Rework FIG0/10 DAB time indication to match EDI time. + Make PFT per-output configurable. + +2025-03-18: Matthias P. Braendli <matthias@mpb.li> + (v5.1.0): + Fix startup value of DLFC and FCT. + Add statistics for EDI/TCP outputs. + 2024-10-03: Matthias P. Braendli <matthias@mpb.li> (v5.0.0): Remove odr-zmq2edi. diff --git a/configure.ac b/configure.ac index 2d3231e..b706c05 100644 --- a/configure.ac +++ b/configure.ac @@ -1,7 +1,7 @@ # Copyright (C) 2008, 2009 Her Majesty the Queen in Right of Canada # (Communications Research Center Canada) # -# Copyright (C) 2024 Matthias P. Braendli, http://opendigitalradio.org +# Copyright (C) 2025 Matthias P. Braendli, http://opendigitalradio.org # This file is part of ODR-DabMux. # @@ -19,7 +19,7 @@ # along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. AC_PREREQ([2.69]) -AC_INIT([ODR-DabMux],[5.0.0],[matthias.braendli@mpb.li]) +AC_INIT([ODR-DabMux],[5.2.0],[matthias.braendli@mpb.li]) AC_CONFIG_AUX_DIR([build-aux]) AC_CONFIG_MACRO_DIR([m4]) AC_CANONICAL_TARGET diff --git a/doc/advanced.mux b/doc/advanced.mux index c07a2b2..0fc1b53 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -438,6 +438,10 @@ outputs { destination "192.168.23.23" port 12000 + enable_pft true + fec 1 + verbose true + ; For compatibility: if port is not specified in the destination itself, ; it is taken from the parent 'destinations' block. } @@ -452,6 +456,8 @@ outputs { ; The multicast TTL has to be adapted according to your network ttl 1 + enable_pft true + fec 1 } example_tcp { ; example for EDI TCP server. TCP is reliable, so it is counterproductive to @@ -469,7 +475,8 @@ outputs { } } - ; The settings below apply to all destinations + ; The settings below apply to all destinations, unless they are overridden + ; inside a destination ; Enable the PFT subsystem. If false, AFPackets are sent. ; PFT is not necessary when using TCP. diff --git a/lib/Json.cpp b/lib/Json.cpp index 361a149..ee33671 100644 --- a/lib/Json.cpp +++ b/lib/Json.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -27,7 +27,7 @@ #include <sstream> #include <iomanip> #include <string> -#include <algorithm> +#include <stdexcept> #include "Json.h" @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -34,10 +34,10 @@ #include <vector> #include <memory> #include <optional> -#include <stdexcept> #include <string> #include <unordered_map> #include <variant> +#include <cstdint> namespace json { diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 1997210..7016e87 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -36,17 +36,31 @@ namespace edi { /** Configuration for EDI output */ +struct pft_settings_t { + // protection and fragmentation settings + bool verbose = false; + bool enable_pft = false; + unsigned chunk_len = 207; // RSk, data length of each chunk + unsigned fec = 0; // number of fragments that can be recovered + double fragment_spreading_factor = 0.95; + // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) + // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. +}; + struct destination_t { virtual ~destination_t() {}; + + pft_settings_t pft_settings = {}; }; + // Can represent both unicast and multicast destinations struct udp_destination_t : public destination_t { std::string dest_addr; - unsigned int dest_port = 0; + uint16_t dest_port = 0; std::string source_addr; - unsigned int source_port = 0; - unsigned int ttl = 10; + uint16_t source_port = 0; + uint8_t ttl = 10; }; // TCP server that can accept multiple connections @@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t { }; struct configuration_t { - unsigned chunk_len = 207; // RSk, data length of each chunk - unsigned fec = 0; // number of fragments that can be recovered - bool dump = false; // dump a file with the EDI packets - bool verbose = false; - bool enable_pft = false; // Enable protection and fragmentation + bool verbose = false; unsigned int tagpacket_alignment = 0; std::vector<std::shared_ptr<destination_t> > destinations; - double fragment_spreading_factor = 0.95; - // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) - // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp index 7e0e8e9..f65fd67 100644 --- a/lib/edioutput/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,7 +31,6 @@ */ #include <vector> -#include <list> #include <cstdio> #include <cstring> #include <cstdint> @@ -41,6 +40,7 @@ #include "PFT.h" #include "crc.h" #include "ReedSolomon.h" +#include "Log.h" namespace edi { @@ -51,11 +51,10 @@ using namespace std; PFT::PFT() { } -PFT::PFT(const configuration_t &conf) : +PFT::PFT(const pft_settings_t& conf) : + m_enabled(conf.enable_pft), m_k(conf.chunk_len), m_m(conf.fec), - m_pseq(0), - m_num_chunks(0), m_verbose(conf.verbose) { if (m_k > 207) { @@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq) m_pseq = pseq; } -} - +} // namespace edi diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h index 42569a0..52e9f46 100644 --- a/lib/edioutput/PFT.h +++ b/lib/edioutput/PFT.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,12 +33,8 @@ #pragma once #include <vector> -#include <list> -#include <stdexcept> #include <cstdint> #include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h" #include "EDIConfig.h" namespace edi { @@ -52,21 +48,24 @@ class PFT static constexpr int PARITYBYTES = 48; PFT(); - PFT(const configuration_t& conf); + PFT(const pft_settings_t& conf); + + bool is_enabled() const { return m_enabled and m_k > 0; } // return a list of PFT fragments with the correct // PFT headers - std::vector< PFTFragment > Assemble(AFPacket af_packet); + std::vector<PFTFragment> Assemble(AFPacket af_packet); // Apply Reed-Solomon FEC to the AF Packet RSBlock Protect(AFPacket af_packet); // Cut a RSBlock into several fragments that can be transmitted - std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); + std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet); void OverridePSeq(uint16_t pseq); private: + bool m_enabled = false; unsigned int m_k = 207; // length of RS data word unsigned int m_m = 3; // number of fragments that can be recovered if lost uint16_t m_pseq = 0; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index a5e0bc3..e9559b5 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -25,7 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "Transport.h" -#include <iterator> +#include "Log.h" #include <cmath> #include <thread> @@ -57,13 +57,18 @@ void configuration_t::print() const else { throw logic_error("EDI destination not implemented"); } + etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft; + if (edi_dest->pft_settings.enable_pft) { + etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec; + etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len; + etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor; + } } } Sender::Sender(const configuration_t& conf) : - m_conf(conf), - edi_pft(m_conf) + m_conf(conf) { if (m_conf.verbose) { etiLog.level(info) << "Setup EDI Output"; @@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); + Socket::UDPSocket udp_socket(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - udp_socket->setMulticastTTL(udp_dest->ttl); + udp_socket.setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket.setMulticastTTL(udp_dest->ttl); } - udp_sockets.emplace(udp_dest.get(), udp_socket); + auto sender = make_shared<udp_sender_t>( + udp_dest->dest_addr, + udp_dest->dest_port, + std::move(udp_socket)); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(udp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { - auto dispatcher = make_shared<Socket::TCPDataDispatcher>( - tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers); - - dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); - tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); + auto sender = make_shared<tcp_dispatcher_t>( + tcp_dest->listen_port, + tcp_dest->max_frames_queued, + tcp_dest->tcp_server_preroll_buffers); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { - auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); - tcp_senders.emplace(tcp_dest.get(), tcp_send_client); + auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else { throw logic_error("EDI destination not implemented"); } } - if (m_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (m_conf.enable_pft) { - unique_lock<mutex> lock(m_mutex); + { m_running = true; m_thread = thread(&Sender::run, this); } @@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) : } } +void Sender::write(const TagPacket& tagpacket) +{ + // Assemble into one AF Packet + edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket); + + write(af_packet); +} + +void Sender::write(const AFPacket& af_packet) +{ + for (auto& sender : m_pft_spreaders) { + sender->send_af_packet(af_packet); + } +} + +void Sender::override_af_sequence(uint16_t seq) +{ + edi_af_packetiser.OverrideSeq(seq); +} + +void Sender::override_pft_sequence(uint16_t pseq) +{ + for (auto& spreader : m_pft_spreaders) { + spreader->edi_pft.OverridePSeq(pseq); + } +} + +std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +{ + std::vector<Sender::stats_t> stats; + + for (auto& spreader : m_pft_spreaders) { + if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) { + Sender::stats_t s; + s.listen_port = sender->listen_port; + s.stats = sender->sock.get_stats(); + stats.push_back(s); + } + } + + return stats; +} + Sender::~Sender() { { - unique_lock<mutex> lock(m_mutex); m_running = false; } @@ -123,36 +172,89 @@ Sender::~Sender() } } -void Sender::write(const TagPacket& tagpacket) +void Sender::run() { - // Assemble into one AF Packet - edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + while (m_running) { + const auto now = chrono::steady_clock::now(); + for (auto& spreader : m_pft_spreaders) { + spreader->tick(now); + } - write(af_packet); + this_thread::sleep_for(chrono::microseconds(500)); + } } -void Sender::write(const AFPacket& af_packet) + +void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame) +{ + Socket::InetAddress addr; + addr.resolveUdpDestination(dest_addr, dest_port); + sock.send(frame, addr); +} + +void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame) +{ + sock.write(frame); +} + +void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame) +{ + sock.sendall(frame); +} + +Sender::udp_sender_t::udp_sender_t(std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock) : + dest_addr(dest_addr), + dest_port(dest_port), + sock(std::move(sock)) +{ +} + +Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers) : + listen_port(listen_port), + sock(max_frames_queued, tcp_server_preroll_buffers) { - if (m_conf.enable_pft) { + sock.start(listen_port, "0.0.0.0"); +} + +Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr, + uint16_t dest_port) : + sock(dest_addr, dest_port) +{ +} + +Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) : + sender(sender), + edi_pft(conf) +{ +} + +void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet) +{ + using namespace std::chrono; + if (edi_pft.is_enabled()) { // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); - if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { + if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) { etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); - m_last_num_pft_fragments = edi_fragments.size(); + last_num_pft_fragments = edi_fragments.size(); } /* Spread out the transmission of all fragments over part of the 24ms AF packet duration * to reduce the risk of losing a burst of fragments because of congestion. */ - using namespace std::chrono; auto inter_fragment_wait_time = microseconds(1); if (edi_fragments.size() > 1) { - if (m_conf.fragment_spreading_factor > 0) { + if (settings.fragment_spreading_factor > 0) { inter_fragment_wait_time = - microseconds( - llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) - ); + microseconds(llrint( + settings.fragment_spreading_factor * 24000.0 / + edi_fragments.size() + )); } } @@ -162,121 +264,35 @@ void Sender::write(const AFPacket& af_packet) auto tp = now; unique_lock<mutex> lock(m_mutex); for (auto& edi_frag : edi_fragments) { - m_pending_frames[tp] = move(edi_frag); + m_pending_frames[tp] = std::move(edi_frag); tp += inter_fragment_wait_time; } } - - // Transmission done in run() function } else /* PFT disabled */ { - // Send over ethernet - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(af_packet.begin(), af_packet.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { - fprintf(stderr, "EDI Output: AF packet larger than 1400," - " consider using PFT to avoid UP fragmentation.\n"); - m_udp_fragmentation_warning_printed = true; - } - - udp_sockets.at(udp_dest.get())->send(af_packet, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(af_packet); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet); - - if (m_conf.verbose and error_stats.has_seen_new_errors) { - fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n", - tcp_dest->dest_addr.c_str(), - tcp_dest->dest_port, - error_stats.num_reconnects, - error_stats.last_error.c_str()); - } - } - else { - throw logic_error("EDI destination not implemented"); - } - } + const auto now = steady_clock::now(); + unique_lock<mutex> lock(m_mutex); + m_pending_frames[now] = std::move(af_packet); } -} -void Sender::override_af_sequence(uint16_t seq) -{ - edi_afPacketiser.OverrideSeq(seq); -} - -void Sender::override_pft_sequence(uint16_t pseq) -{ - edi_pft.OverridePSeq(pseq); + // Actual transmission done in tick() function } -std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now) { - std::vector<Sender::stats_t> stats; + unique_lock<mutex> lock(m_mutex); - for (auto& el : tcp_dispatchers) { - Sender::stats_t s; - s.listen_port = el.first->listen_port; - s.stats = el.second->get_stats(); - stats.push_back(s); - } - - return stats; -} + for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { + const auto& edi_frag = it->second; -void Sender::run() -{ - while (m_running) { - unique_lock<mutex> lock(m_mutex); - const auto now = chrono::steady_clock::now(); - - // Send over ethernet - for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { - const auto& edi_frag = it->second; - - if (it->first <= now) { - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - it = m_pending_frames.erase(it); - } - else { - ++it; - } + if (it->first <= now) { + sender->send_packet(edi_frag); + it = m_pending_frames.erase(it); + } + else { + ++it; } - - lock.unlock(); - this_thread::sleep_for(chrono::microseconds(500)); } } -} +} // namespace edi diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 2ca638e..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -33,8 +33,6 @@ #include "Socket.h" #include <chrono> #include <map> -#include <unordered_map> -#include <fstream> #include <cstdint> #include <thread> #include <mutex> @@ -43,12 +41,13 @@ namespace edi { /** ETI/STI sender for EDI output */ - class Sender { public: Sender(const configuration_t& conf); Sender(const Sender&) = delete; - Sender operator=(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(Sender&&) = delete; ~Sender(); // Assemble the tagpacket into an AF packet, and if needed, @@ -72,32 +71,78 @@ class Sender { std::vector<stats_t> get_tcp_server_stats() const; private: - void run(); - - bool m_udp_fragmentation_warning_printed = false; - configuration_t m_conf; - std::ofstream edi_debug_file; // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; + edi::AFPacketiser edi_af_packetiser; - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; + // PFT spreading requires sending UDP packets at specific time, + // independently of time when write() gets called + bool m_running = false; + std::thread m_thread; + virtual void run(); - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; - // PFT spreading requires sending UDP packets at specific time, independently of - // time when write() gets called - std::thread m_thread; - std::mutex m_mutex; - bool m_running = false; - std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; - size_t m_last_num_pft_fragments = 0; -}; -} + struct i_sender { + virtual void send_packet(const std::vector<uint8_t> &frame) = 0; + virtual ~i_sender() { } + }; + + struct udp_sender_t : public i_sender { + udp_sender_t( + std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock); + + std::string dest_addr; + uint16_t dest_port; + Socket::UDPSocket sock; + + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_dispatcher_t : public i_sender { + tcp_dispatcher_t( + uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers); + + uint16_t listen_port; + Socket::TCPDataDispatcher sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_send_client_t : public i_sender { + tcp_send_client_t( + const std::string& dest_addr, + uint16_t dest_port); + + Socket::TCPSendClient sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + class PFTSpreader { + public: + using sender_sp = std::shared_ptr<i_sender>; + PFTSpreader(const pft_settings_t &conf, sender_sp sender); + sender_sp sender; + edi::PFT edi_pft; + + void send_af_packet(const AFPacket &af_packet); + void tick(const std::chrono::steady_clock::time_point& now); + + private: + // send_af_packet() and tick() are called from different threads, both + // are accessing m_pending_frames + std::mutex m_mutex; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + pft_settings_t settings; + size_t last_num_pft_fragments = 0; + }; + + std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} diff --git a/man/odr-dabmux.1 b/man/odr-dabmux.1 index 03b8df1..fcf11c5 100644 --- a/man/odr-dabmux.1 +++ b/man/odr-dabmux.1 @@ -1,4 +1,4 @@ -.TH ODR-DABMUX "1" "October 2024" "odr-dabmux 5.0.0" "User Commands" +.TH ODR-DABMUX "1" "May 2025" "odr-dabmux 5.2.0" "User Commands" .SH NAME \fBodr\-dabmux\fR \- A software DAB multiplexer .SH SYNOPSIS diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index b9575fc..e6e6782 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -49,6 +49,8 @@ static vector<string> split_pipe_separated_string(const std::string& s) uint64_t MuxTime::init(uint32_t tist_at_fct0_us) { + m_tist_at_fct0_us = tist_at_fct0_us; + /* At startup, derive edi_time, TIST and CIF count such that there is * a consistency across mux restarts. Ensure edi_time and TIST represent * current time. @@ -98,9 +100,11 @@ uint64_t MuxTime::init(uint32_t tist_at_fct0_us) return currentFrame; } +constexpr int TIMESTAMP_LEVEL_2_SHIFT = 14; + void MuxTime::increment_timestamp() { - m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 + m_timestamp += 24 << TIMESTAMP_LEVEL_2_SHIFT; // Shift 24ms by 14 to Timestamp level 2 if (m_timestamp > 0xf9FFff) { m_timestamp -= 0xfa0000; // Subtract 16384000, corresponding to one second m_edi_time += 1; @@ -110,16 +114,21 @@ void MuxTime::increment_timestamp() } } -std::pair<uint32_t, std::time_t> MuxTime::get_time() +std::pair<uint32_t, std::time_t> MuxTime::get_tist_seconds() { + // The user-visible configuration tist_offset is the effective + // offset, but since we implicitly add the tist_at_fct0 to it, + // we must compensate. + double corrected_tist_offset = tist_offset - (m_tist_at_fct0_us / 1e6); + // negative tist_offset not supported, because the calculation is annoying - if (tist_offset < 0) return {m_timestamp, m_edi_time}; + if (corrected_tist_offset < 0) return {m_timestamp, m_edi_time}; - double fractional_part = tist_offset - std::floor(tist_offset); + double fractional_part = corrected_tist_offset - std::floor(corrected_tist_offset); const size_t steps = std::lround(std::floor(fractional_part / 24e-3)); - uint32_t timestamp = m_timestamp + (24 << 14) * steps; + uint32_t timestamp = m_timestamp + (24 << TIMESTAMP_LEVEL_2_SHIFT) * steps; - std::time_t edi_time = m_edi_time + std::lround(std::floor(tist_offset)); + std::time_t edi_time = m_edi_time + std::lround(std::floor(corrected_tist_offset)); if (timestamp > 0xf9FFff) { edi_time += 1; @@ -128,13 +137,20 @@ std::pair<uint32_t, std::time_t> MuxTime::get_time() return {timestamp % 0xfa0000, edi_time}; } +std::pair<uint32_t, std::time_t> MuxTime::get_milliseconds_seconds() +{ + auto tist_seconds = get_tist_seconds(); + return {tist_seconds.first >> TIMESTAMP_LEVEL_2_SHIFT, tist_seconds.second}; +} + DabMultiplexer::DabMultiplexer(boost::property_tree::ptree pt) : RemoteControllable("mux"), m_pt(pt), + m_time(), ensemble(std::make_shared<dabEnsemble>()), m_clock_tai(split_pipe_separated_string(pt.get("general.tai_clock_bulletins", ""))), - fig_carousel(ensemble) + fig_carousel(ensemble, [&]() { return m_time.get_milliseconds_seconds(); }) { RC_ADD_PARAMETER(frames, "Show number of frames generated [read-only]"); RC_ADD_PARAMETER(tist_offset, "Timestamp offset in fractional number of seconds"); @@ -190,7 +206,7 @@ void DabMultiplexer::prepare(bool require_tai_clock) bool tist_enabled = m_pt.get("general.tist", false); m_time.tist_offset = m_pt.get<double>("general.tist_offset", 0.0); - auto tist_edi_time = m_time.get_time(); + auto tist_edi_time = m_time.get_tist_seconds(); const auto timestamp = tist_edi_time.first; const auto edi_time = tist_edi_time.second; m_time.mnsc_time = edi_time; @@ -467,9 +483,8 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp"; } } - update_dab_time(); - auto tist_edi_time = m_time.get_time(); + auto tist_edi_time = m_time.get_tist_seconds(); const auto timestamp = tist_edi_time.first; const auto edi_time = tist_edi_time.second; diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index aa6adfb..5a0d906 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -47,9 +47,11 @@ class MuxTime { private: uint32_t m_timestamp = 0; std::time_t m_edi_time = 0; + uint32_t m_tist_at_fct0_us = 0; public: - std::pair<uint32_t, std::time_t> get_time(); + std::pair<uint32_t, std::time_t> get_tist_seconds(); + std::pair<uint32_t, std::time_t> get_milliseconds_seconds(); double tist_offset = 0; @@ -80,8 +82,6 @@ class DabMultiplexer : public RemoteControllable { void prepare(bool require_tai_clock); - uint64_t getCurrentFrame() const { return currentFrame; } - void mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs); void print_info(void); @@ -115,6 +115,5 @@ class DabMultiplexer : public RemoteControllable { bool m_tai_clock_required = false; ClockTAI m_clock_tai; - /* New FIG Carousel */ FIC::FIGCarousel fig_carousel; }; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 1a367da..bf525c1 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -327,6 +327,38 @@ int main(int argc, char *argv[]) if (outputuid == "edi") { ptree pt_edi = pt_outputs.get_child("edi"); + bool default_enable_pft = pt_edi.get<bool>("enable_pft", false); + edi_conf.verbose = pt_edi.get<bool>("verbose", false); + + unsigned int default_fec = pt_edi.get<unsigned int>("fec", 3); + unsigned int default_chunk_len = pt_edi.get<unsigned int>("chunk_len", 207); + + auto check_spreading_factor = [](int percent) { + if (percent < 0) { + throw std::runtime_error("EDI output: negative packet_spread value is invalid."); + } + double factor = (double)percent / 100.0; + if (factor > 30000) { + throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); + } + return factor; + }; + + double default_spreading_factor = check_spreading_factor(pt_edi.get<int>("packet_spread", 95)); + + using pt_t = boost::property_tree::basic_ptree<std::basic_string<char>, std::basic_string<char>>; + auto handle_overrides = [&](edi::pft_settings_t& pft_settings, pt_t pt) { + pft_settings.chunk_len = pt.get<unsigned int>("chunk_len", default_chunk_len); + pft_settings.enable_pft = pt.get<bool>("enable_pft", default_enable_pft); + pft_settings.fec = pt.get<unsigned int>("fec", default_fec); + pft_settings.fragment_spreading_factor = default_spreading_factor; + auto override_spread_percent = pt.get_optional<int>("packet_spread"); + if (override_spread_percent) { + pft_settings.fragment_spreading_factor = check_spreading_factor(*override_spread_percent); + } + pft_settings.verbose = pt.get<bool>("verbose", edi_conf.verbose); + }; + for (auto pt_edi_dest : pt_edi.get_child("destinations")) { const auto proto = pt_edi_dest.second.get<string>("protocol", "udp"); if (proto == "udp") { @@ -346,6 +378,8 @@ int main(int argc, char *argv[]) dest->dest_port = pt_edi.get<unsigned int>("port"); } + handle_overrides(dest->pft_settings, pt_edi_dest.second); + edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { @@ -355,6 +389,8 @@ int main(int argc, char *argv[]) double preroll = pt_edi_dest.second.get<double>("preroll-burst", 0.0); dest->tcp_server_preroll_buffers = ceil(preroll / 24e-3); + handle_overrides(dest->pft_settings, pt_edi_dest.second); + edi_conf.destinations.push_back(dest); } else { @@ -362,22 +398,6 @@ int main(int argc, char *argv[]) } } - edi_conf.dump = pt_edi.get<bool>("dump", false); - edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false); - edi_conf.verbose = pt_edi.get<bool>("verbose", false); - - edi_conf.fec = pt_edi.get<unsigned int>("fec", 3); - edi_conf.chunk_len = pt_edi.get<unsigned int>("chunk_len", 207); - - int spread_percent = pt_edi.get<int>("packet_spread", 95); - if (spread_percent < 0) { - throw std::runtime_error("EDI output: negative packet_spread value is invalid."); - } - edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0; - if (edi_conf.fragment_spreading_factor > 30000) { - throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); - } - edi_conf.tagpacket_alignment = pt_edi.get<unsigned int>("tagpacket_alignment", 8); mux.set_edi_config(edi_conf); diff --git a/src/fig/FIG.h b/src/fig/FIG.h index 9752245..eda4671 100644 --- a/src/fig/FIG.h +++ b/src/fig/FIG.h @@ -35,11 +35,19 @@ namespace FIC { class FIGRuntimeInformation { public: - FIGRuntimeInformation(std::shared_ptr<dabEnsemble>& e) : + + using dab_time_t = std::pair<uint32_t /* milliseconds */, time_t>; + using get_time_func_t = std::function<dab_time_t()>; + + FIGRuntimeInformation( + std::shared_ptr<dabEnsemble>& e, + get_time_func_t getTimeFunc) : + getTimeFunc(getTimeFunc), currentFrame(0), ensemble(e), factumAnalyzer(false) {} + get_time_func_t getTimeFunc; unsigned long currentFrame; std::shared_ptr<dabEnsemble> ensemble; bool factumAnalyzer; diff --git a/src/fig/FIG0_10.cpp b/src/fig/FIG0_10.cpp index 56ce9fb..240aa19 100644 --- a/src/fig/FIG0_10.cpp +++ b/src/fig/FIG0_10.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li */ /* @@ -23,7 +23,6 @@ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. */ -#include "fig/FIG0structs.h" #include "fig/FIG0_10.h" #include "utils.h" @@ -89,7 +88,7 @@ FillStatus FIG0_10::fill(uint8_t *buf, size_t max_size) return fs; } - //Time and country identifier + // Time and country identifier auto fig0_10 = (FIGtype0_10_LongForm*)buf; fig0_10->FIGtypeNumber = 0; @@ -102,9 +101,9 @@ FillStatus FIG0_10::fill(uint8_t *buf, size_t max_size) remaining -= 2; struct tm timeData; - time_t dab_time_seconds = 0; - uint32_t dab_time_millis = 0; - get_dab_time(&dab_time_seconds, &dab_time_millis); + const auto dab_time = m_rti->getTimeFunc(); + time_t dab_time_seconds = dab_time.second; + uint32_t dab_time_millis = dab_time.first; gmtime_r(&dab_time_seconds, &timeData); fig0_10->RFU = 0; diff --git a/src/fig/FIGCarousel.cpp b/src/fig/FIGCarousel.cpp index 9748dbf..ceda275 100644 --- a/src/fig/FIGCarousel.cpp +++ b/src/fig/FIGCarousel.cpp @@ -68,8 +68,11 @@ bool FIGCarouselElement::check_deadline() /**************** FIGCarousel *****************/ -FIGCarousel::FIGCarousel(std::shared_ptr<dabEnsemble> ensemble) : - m_rti(ensemble), +FIGCarousel::FIGCarousel( + std::shared_ptr<dabEnsemble> ensemble, + FIGRuntimeInformation::get_time_func_t getTimeFunc + ) : + m_rti(ensemble, getTimeFunc), m_fig0_0(&m_rti), m_fig0_1(&m_rti), m_fig0_2(&m_rti), diff --git a/src/fig/FIGCarousel.h b/src/fig/FIGCarousel.h index 1e33577..a2a8022 100644 --- a/src/fig/FIGCarousel.h +++ b/src/fig/FIGCarousel.h @@ -67,7 +67,9 @@ enum class FIBAllocation { class FIGCarousel { public: - FIGCarousel(std::shared_ptr<dabEnsemble> ensemble); + FIGCarousel( + std::shared_ptr<dabEnsemble> ensemble, + FIGRuntimeInformation::get_time_func_t getTimeFunc); /* Write all FIBs to the buffer, including correct padding and crc. * Returns number of bytes written. diff --git a/src/utils.cpp b/src/utils.cpp index 1a13caf..7ea6293 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -34,30 +34,8 @@ using namespace std; -static time_t dab_time_seconds = 0; -static int dab_time_millis = 0; - static void printServices(const vector<shared_ptr<DabService> >& services); -void update_dab_time() -{ - if (dab_time_seconds == 0) { - dab_time_seconds = time(nullptr); - } else { - dab_time_millis+= 24; - if (dab_time_millis >= 1000) { - dab_time_millis -= 1000; - ++dab_time_seconds; - } - } -} - -void get_dab_time(time_t *time, uint32_t *millis) -{ - *time = dab_time_seconds; - *millis = dab_time_millis; -} - uint32_t gregorian2mjd(int year, int month, int day) { diff --git a/src/utils.h b/src/utils.h index 331a0b2..d037bb3 100644 --- a/src/utils.h +++ b/src/utils.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2020 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li This file contains a set of utility functions that are used to show @@ -34,10 +34,6 @@ #include <memory> #include "MuxElements.h" -/* Must be called once per ETI frame to update the time */ -void update_dab_time(void); -void get_dab_time(time_t *time, uint32_t *millis); - /* Convert a date and time into the modified Julian date * used in FIG 0/10 * |