aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog10
-rw-r--r--configure.ac4
-rw-r--r--doc/advanced.mux9
-rw-r--r--lib/Json.cpp4
-rw-r--r--lib/Json.h4
-rw-r--r--lib/edioutput/EDIConfig.h31
-rw-r--r--lib/edioutput/PFT.cpp12
-rw-r--r--lib/edioutput/PFT.h15
-rw-r--r--lib/edioutput/Transport.cpp288
-rw-r--r--lib/edioutput/Transport.h93
-rw-r--r--man/odr-dabmux.12
-rw-r--r--src/DabMultiplexer.cpp35
-rw-r--r--src/DabMultiplexer.h7
-rw-r--r--src/DabMux.cpp52
-rw-r--r--src/fig/FIG.h10
-rw-r--r--src/fig/FIG0_10.cpp11
-rw-r--r--src/fig/FIGCarousel.cpp7
-rw-r--r--src/fig/FIGCarousel.h4
-rw-r--r--src/utils.cpp24
-rw-r--r--src/utils.h6
20 files changed, 365 insertions, 263 deletions
diff --git a/ChangeLog b/ChangeLog
index c006e15..07624c3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,16 @@
This file contains information about the changes done to
ODR-DabMux in this repository
+2025-05-19: Matthias P. Braendli <matthias@mpb.li>
+ (v5.2.0):
+ Rework FIG0/10 DAB time indication to match EDI time.
+ Make PFT per-output configurable.
+
+2025-03-18: Matthias P. Braendli <matthias@mpb.li>
+ (v5.1.0):
+ Fix startup value of DLFC and FCT.
+ Add statistics for EDI/TCP outputs.
+
2024-10-03: Matthias P. Braendli <matthias@mpb.li>
(v5.0.0):
Remove odr-zmq2edi.
diff --git a/configure.ac b/configure.ac
index 2d3231e..b706c05 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,7 +1,7 @@
# Copyright (C) 2008, 2009 Her Majesty the Queen in Right of Canada
# (Communications Research Center Canada)
#
-# Copyright (C) 2024 Matthias P. Braendli, http://opendigitalradio.org
+# Copyright (C) 2025 Matthias P. Braendli, http://opendigitalradio.org
# This file is part of ODR-DabMux.
#
@@ -19,7 +19,7 @@
# along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
AC_PREREQ([2.69])
-AC_INIT([ODR-DabMux],[5.0.0],[matthias.braendli@mpb.li])
+AC_INIT([ODR-DabMux],[5.2.0],[matthias.braendli@mpb.li])
AC_CONFIG_AUX_DIR([build-aux])
AC_CONFIG_MACRO_DIR([m4])
AC_CANONICAL_TARGET
diff --git a/doc/advanced.mux b/doc/advanced.mux
index c07a2b2..0fc1b53 100644
--- a/doc/advanced.mux
+++ b/doc/advanced.mux
@@ -438,6 +438,10 @@ outputs {
destination "192.168.23.23"
port 12000
+ enable_pft true
+ fec 1
+ verbose true
+
; For compatibility: if port is not specified in the destination itself,
; it is taken from the parent 'destinations' block.
}
@@ -452,6 +456,8 @@ outputs {
; The multicast TTL has to be adapted according to your network
ttl 1
+ enable_pft true
+ fec 1
}
example_tcp {
; example for EDI TCP server. TCP is reliable, so it is counterproductive to
@@ -469,7 +475,8 @@ outputs {
}
}
- ; The settings below apply to all destinations
+ ; The settings below apply to all destinations, unless they are overridden
+ ; inside a destination
; Enable the PFT subsystem. If false, AFPackets are sent.
; PFT is not necessary when using TCP.
diff --git a/lib/Json.cpp b/lib/Json.cpp
index 361a149..ee33671 100644
--- a/lib/Json.cpp
+++ b/lib/Json.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -27,7 +27,7 @@
#include <sstream>
#include <iomanip>
#include <string>
-#include <algorithm>
+#include <stdexcept>
#include "Json.h"
diff --git a/lib/Json.h b/lib/Json.h
index b082f92..0168583 100644
--- a/lib/Json.h
+++ b/lib/Json.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -34,10 +34,10 @@
#include <vector>
#include <memory>
#include <optional>
-#include <stdexcept>
#include <string>
#include <unordered_map>
#include <variant>
+#include <cstdint>
namespace json {
diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h
index 1997210..7016e87 100644
--- a/lib/edioutput/EDIConfig.h
+++ b/lib/edioutput/EDIConfig.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -36,17 +36,31 @@ namespace edi {
/** Configuration for EDI output */
+struct pft_settings_t {
+ // protection and fragmentation settings
+ bool verbose = false;
+ bool enable_pft = false;
+ unsigned chunk_len = 207; // RSk, data length of each chunk
+ unsigned fec = 0; // number of fragments that can be recovered
+ double fragment_spreading_factor = 0.95;
+ // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
+ // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
+};
+
struct destination_t {
virtual ~destination_t() {};
+
+ pft_settings_t pft_settings = {};
};
+
// Can represent both unicast and multicast destinations
struct udp_destination_t : public destination_t {
std::string dest_addr;
- unsigned int dest_port = 0;
+ uint16_t dest_port = 0;
std::string source_addr;
- unsigned int source_port = 0;
- unsigned int ttl = 10;
+ uint16_t source_port = 0;
+ uint8_t ttl = 10;
};
// TCP server that can accept multiple connections
@@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t {
};
struct configuration_t {
- unsigned chunk_len = 207; // RSk, data length of each chunk
- unsigned fec = 0; // number of fragments that can be recovered
- bool dump = false; // dump a file with the EDI packets
- bool verbose = false;
- bool enable_pft = false; // Enable protection and fragmentation
+ bool verbose = false;
unsigned int tagpacket_alignment = 0;
std::vector<std::shared_ptr<destination_t> > destinations;
- double fragment_spreading_factor = 0.95;
- // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
- // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
bool enabled() const { return destinations.size() > 0; }
diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp
index 7e0e8e9..f65fd67 100644
--- a/lib/edioutput/PFT.cpp
+++ b/lib/edioutput/PFT.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,7 +31,6 @@
*/
#include <vector>
-#include <list>
#include <cstdio>
#include <cstring>
#include <cstdint>
@@ -41,6 +40,7 @@
#include "PFT.h"
#include "crc.h"
#include "ReedSolomon.h"
+#include "Log.h"
namespace edi {
@@ -51,11 +51,10 @@ using namespace std;
PFT::PFT() { }
-PFT::PFT(const configuration_t &conf) :
+PFT::PFT(const pft_settings_t& conf) :
+ m_enabled(conf.enable_pft),
m_k(conf.chunk_len),
m_m(conf.fec),
- m_pseq(0),
- m_num_chunks(0),
m_verbose(conf.verbose)
{
if (m_k > 207) {
@@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq)
m_pseq = pseq;
}
-}
-
+} // namespace edi
diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h
index 42569a0..52e9f46 100644
--- a/lib/edioutput/PFT.h
+++ b/lib/edioutput/PFT.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,12 +33,8 @@
#pragma once
#include <vector>
-#include <list>
-#include <stdexcept>
#include <cstdint>
#include "AFPacket.h"
-#include "Log.h"
-#include "ReedSolomon.h"
#include "EDIConfig.h"
namespace edi {
@@ -52,21 +48,24 @@ class PFT
static constexpr int PARITYBYTES = 48;
PFT();
- PFT(const configuration_t& conf);
+ PFT(const pft_settings_t& conf);
+
+ bool is_enabled() const { return m_enabled and m_k > 0; }
// return a list of PFT fragments with the correct
// PFT headers
- std::vector< PFTFragment > Assemble(AFPacket af_packet);
+ std::vector<PFTFragment> Assemble(AFPacket af_packet);
// Apply Reed-Solomon FEC to the AF Packet
RSBlock Protect(AFPacket af_packet);
// Cut a RSBlock into several fragments that can be transmitted
- std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet);
+ std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet);
void OverridePSeq(uint16_t pseq);
private:
+ bool m_enabled = false;
unsigned int m_k = 207; // length of RS data word
unsigned int m_m = 3; // number of fragments that can be recovered if lost
uint16_t m_pseq = 0;
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index a5e0bc3..e9559b5 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -25,7 +25,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Transport.h"
-#include <iterator>
+#include "Log.h"
#include <cmath>
#include <thread>
@@ -57,13 +57,18 @@ void configuration_t::print() const
else {
throw logic_error("EDI destination not implemented");
}
+ etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft;
+ if (edi_dest->pft_settings.enable_pft) {
+ etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec;
+ etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len;
+ etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor;
+ }
}
}
Sender::Sender(const configuration_t& conf) :
- m_conf(conf),
- edi_pft(m_conf)
+ m_conf(conf)
{
if (m_conf.verbose) {
etiLog.level(info) << "Setup EDI Output";
@@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
+ Socket::UDPSocket udp_socket(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- udp_socket->setMulticastTTL(udp_dest->ttl);
+ udp_socket.setMulticastSource(udp_dest->source_addr.c_str());
+ udp_socket.setMulticastTTL(udp_dest->ttl);
}
- udp_sockets.emplace(udp_dest.get(), udp_socket);
+ auto sender = make_shared<udp_sender_t>(
+ udp_dest->dest_addr,
+ udp_dest->dest_port,
+ std::move(udp_socket));
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(udp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
- auto dispatcher = make_shared<Socket::TCPDataDispatcher>(
- tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers);
-
- dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
- tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
+ auto sender = make_shared<tcp_dispatcher_t>(
+ tcp_dest->listen_port,
+ tcp_dest->max_frames_queued,
+ tcp_dest->tcp_server_preroll_buffers);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
- auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port);
- tcp_senders.emplace(tcp_dest.get(), tcp_send_client);
+ auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else {
throw logic_error("EDI destination not implemented");
}
}
- if (m_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (m_conf.enable_pft) {
- unique_lock<mutex> lock(m_mutex);
+ {
m_running = true;
m_thread = thread(&Sender::run, this);
}
@@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) :
}
}
+void Sender::write(const TagPacket& tagpacket)
+{
+ // Assemble into one AF Packet
+ edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket);
+
+ write(af_packet);
+}
+
+void Sender::write(const AFPacket& af_packet)
+{
+ for (auto& sender : m_pft_spreaders) {
+ sender->send_af_packet(af_packet);
+ }
+}
+
+void Sender::override_af_sequence(uint16_t seq)
+{
+ edi_af_packetiser.OverrideSeq(seq);
+}
+
+void Sender::override_pft_sequence(uint16_t pseq)
+{
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->edi_pft.OverridePSeq(pseq);
+ }
+}
+
+std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const
+{
+ std::vector<Sender::stats_t> stats;
+
+ for (auto& spreader : m_pft_spreaders) {
+ if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) {
+ Sender::stats_t s;
+ s.listen_port = sender->listen_port;
+ s.stats = sender->sock.get_stats();
+ stats.push_back(s);
+ }
+ }
+
+ return stats;
+}
+
Sender::~Sender()
{
{
- unique_lock<mutex> lock(m_mutex);
m_running = false;
}
@@ -123,36 +172,89 @@ Sender::~Sender()
}
}
-void Sender::write(const TagPacket& tagpacket)
+void Sender::run()
{
- // Assemble into one AF Packet
- edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
+ while (m_running) {
+ const auto now = chrono::steady_clock::now();
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->tick(now);
+ }
- write(af_packet);
+ this_thread::sleep_for(chrono::microseconds(500));
+ }
}
-void Sender::write(const AFPacket& af_packet)
+
+void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(dest_addr, dest_port);
+ sock.send(frame, addr);
+}
+
+void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ sock.write(frame);
+}
+
+void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ sock.sendall(frame);
+}
+
+Sender::udp_sender_t::udp_sender_t(std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock) :
+ dest_addr(dest_addr),
+ dest_port(dest_port),
+ sock(std::move(sock))
+{
+}
+
+Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers) :
+ listen_port(listen_port),
+ sock(max_frames_queued, tcp_server_preroll_buffers)
{
- if (m_conf.enable_pft) {
+ sock.start(listen_port, "0.0.0.0");
+}
+
+Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr,
+ uint16_t dest_port) :
+ sock(dest_addr, dest_port)
+{
+}
+
+Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) :
+ sender(sender),
+ edi_pft(conf)
+{
+}
+
+void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet)
+{
+ using namespace std::chrono;
+ if (edi_pft.is_enabled()) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
- if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) {
+ if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) {
etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
- m_last_num_pft_fragments = edi_fragments.size();
+ last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
* to reduce the risk of losing a burst of fragments because of congestion. */
- using namespace std::chrono;
auto inter_fragment_wait_time = microseconds(1);
if (edi_fragments.size() > 1) {
- if (m_conf.fragment_spreading_factor > 0) {
+ if (settings.fragment_spreading_factor > 0) {
inter_fragment_wait_time =
- microseconds(
- llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size())
- );
+ microseconds(llrint(
+ settings.fragment_spreading_factor * 24000.0 /
+ edi_fragments.size()
+ ));
}
}
@@ -162,121 +264,35 @@ void Sender::write(const AFPacket& af_packet)
auto tp = now;
unique_lock<mutex> lock(m_mutex);
for (auto& edi_frag : edi_fragments) {
- m_pending_frames[tp] = move(edi_frag);
+ m_pending_frames[tp] = std::move(edi_frag);
tp += inter_fragment_wait_time;
}
}
-
- // Transmission done in run() function
}
else /* PFT disabled */ {
- // Send over ethernet
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
- fprintf(stderr, "EDI Output: AF packet larger than 1400,"
- " consider using PFT to avoid UP fragmentation.\n");
- m_udp_fragmentation_warning_printed = true;
- }
-
- udp_sockets.at(udp_dest.get())->send(af_packet, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet);
-
- if (m_conf.verbose and error_stats.has_seen_new_errors) {
- fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n",
- tcp_dest->dest_addr.c_str(),
- tcp_dest->dest_port,
- error_stats.num_reconnects,
- error_stats.last_error.c_str());
- }
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
+ const auto now = steady_clock::now();
+ unique_lock<mutex> lock(m_mutex);
+ m_pending_frames[now] = std::move(af_packet);
}
-}
-void Sender::override_af_sequence(uint16_t seq)
-{
- edi_afPacketiser.OverrideSeq(seq);
-}
-
-void Sender::override_pft_sequence(uint16_t pseq)
-{
- edi_pft.OverridePSeq(pseq);
+ // Actual transmission done in tick() function
}
-std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const
+void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now)
{
- std::vector<Sender::stats_t> stats;
+ unique_lock<mutex> lock(m_mutex);
- for (auto& el : tcp_dispatchers) {
- Sender::stats_t s;
- s.listen_port = el.first->listen_port;
- s.stats = el.second->get_stats();
- stats.push_back(s);
- }
-
- return stats;
-}
+ for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
+ const auto& edi_frag = it->second;
-void Sender::run()
-{
- while (m_running) {
- unique_lock<mutex> lock(m_mutex);
- const auto now = chrono::steady_clock::now();
-
- // Send over ethernet
- for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
- const auto& edi_frag = it->second;
-
- if (it->first <= now) {
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
- it = m_pending_frames.erase(it);
- }
- else {
- ++it;
- }
+ if (it->first <= now) {
+ sender->send_packet(edi_frag);
+ it = m_pending_frames.erase(it);
+ }
+ else {
+ ++it;
}
-
- lock.unlock();
- this_thread::sleep_for(chrono::microseconds(500));
}
}
-}
+} // namespace edi
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index 2ca638e..b8a9008 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -33,8 +33,6 @@
#include "Socket.h"
#include <chrono>
#include <map>
-#include <unordered_map>
-#include <fstream>
#include <cstdint>
#include <thread>
#include <mutex>
@@ -43,12 +41,13 @@
namespace edi {
/** ETI/STI sender for EDI output */
-
class Sender {
public:
Sender(const configuration_t& conf);
Sender(const Sender&) = delete;
- Sender operator=(const Sender&) = delete;
+ Sender& operator=(const Sender&) = delete;
+ Sender(Sender&&) = delete;
+ Sender& operator=(Sender&&) = delete;
~Sender();
// Assemble the tagpacket into an AF packet, and if needed,
@@ -72,32 +71,78 @@ class Sender {
std::vector<stats_t> get_tcp_server_stats() const;
private:
- void run();
-
- bool m_udp_fragmentation_warning_printed = false;
-
configuration_t m_conf;
- std::ofstream edi_debug_file;
// The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
+ edi::AFPacketiser edi_af_packetiser;
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
+ // PFT spreading requires sending UDP packets at specific time,
+ // independently of time when write() gets called
+ bool m_running = false;
+ std::thread m_thread;
+ virtual void run();
- std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
- std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
- std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
- // PFT spreading requires sending UDP packets at specific time, independently of
- // time when write() gets called
- std::thread m_thread;
- std::mutex m_mutex;
- bool m_running = false;
- std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
- size_t m_last_num_pft_fragments = 0;
-};
-}
+ struct i_sender {
+ virtual void send_packet(const std::vector<uint8_t> &frame) = 0;
+ virtual ~i_sender() { }
+ };
+
+ struct udp_sender_t : public i_sender {
+ udp_sender_t(
+ std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock);
+
+ std::string dest_addr;
+ uint16_t dest_port;
+ Socket::UDPSocket sock;
+
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_dispatcher_t : public i_sender {
+ tcp_dispatcher_t(
+ uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers);
+
+ uint16_t listen_port;
+ Socket::TCPDataDispatcher sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_send_client_t : public i_sender {
+ tcp_send_client_t(
+ const std::string& dest_addr,
+ uint16_t dest_port);
+
+ Socket::TCPSendClient sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ class PFTSpreader {
+ public:
+ using sender_sp = std::shared_ptr<i_sender>;
+ PFTSpreader(const pft_settings_t &conf, sender_sp sender);
+ sender_sp sender;
+ edi::PFT edi_pft;
+
+ void send_af_packet(const AFPacket &af_packet);
+ void tick(const std::chrono::steady_clock::time_point& now);
+
+ private:
+ // send_af_packet() and tick() are called from different threads, both
+ // are accessing m_pending_frames
+ std::mutex m_mutex;
+ std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
+ pft_settings_t settings;
+ size_t last_num_pft_fragments = 0;
+ };
+
+ std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders;
+};
+}
diff --git a/man/odr-dabmux.1 b/man/odr-dabmux.1
index 03b8df1..fcf11c5 100644
--- a/man/odr-dabmux.1
+++ b/man/odr-dabmux.1
@@ -1,4 +1,4 @@
-.TH ODR-DABMUX "1" "October 2024" "odr-dabmux 5.0.0" "User Commands"
+.TH ODR-DABMUX "1" "May 2025" "odr-dabmux 5.2.0" "User Commands"
.SH NAME
\fBodr\-dabmux\fR \- A software DAB multiplexer
.SH SYNOPSIS
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index b9575fc..e6e6782 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -49,6 +49,8 @@ static vector<string> split_pipe_separated_string(const std::string& s)
uint64_t MuxTime::init(uint32_t tist_at_fct0_us)
{
+ m_tist_at_fct0_us = tist_at_fct0_us;
+
/* At startup, derive edi_time, TIST and CIF count such that there is
* a consistency across mux restarts. Ensure edi_time and TIST represent
* current time.
@@ -98,9 +100,11 @@ uint64_t MuxTime::init(uint32_t tist_at_fct0_us)
return currentFrame;
}
+constexpr int TIMESTAMP_LEVEL_2_SHIFT = 14;
+
void MuxTime::increment_timestamp()
{
- m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2
+ m_timestamp += 24 << TIMESTAMP_LEVEL_2_SHIFT; // Shift 24ms by 14 to Timestamp level 2
if (m_timestamp > 0xf9FFff) {
m_timestamp -= 0xfa0000; // Subtract 16384000, corresponding to one second
m_edi_time += 1;
@@ -110,16 +114,21 @@ void MuxTime::increment_timestamp()
}
}
-std::pair<uint32_t, std::time_t> MuxTime::get_time()
+std::pair<uint32_t, std::time_t> MuxTime::get_tist_seconds()
{
+ // The user-visible configuration tist_offset is the effective
+ // offset, but since we implicitly add the tist_at_fct0 to it,
+ // we must compensate.
+ double corrected_tist_offset = tist_offset - (m_tist_at_fct0_us / 1e6);
+
// negative tist_offset not supported, because the calculation is annoying
- if (tist_offset < 0) return {m_timestamp, m_edi_time};
+ if (corrected_tist_offset < 0) return {m_timestamp, m_edi_time};
- double fractional_part = tist_offset - std::floor(tist_offset);
+ double fractional_part = corrected_tist_offset - std::floor(corrected_tist_offset);
const size_t steps = std::lround(std::floor(fractional_part / 24e-3));
- uint32_t timestamp = m_timestamp + (24 << 14) * steps;
+ uint32_t timestamp = m_timestamp + (24 << TIMESTAMP_LEVEL_2_SHIFT) * steps;
- std::time_t edi_time = m_edi_time + std::lround(std::floor(tist_offset));
+ std::time_t edi_time = m_edi_time + std::lround(std::floor(corrected_tist_offset));
if (timestamp > 0xf9FFff) {
edi_time += 1;
@@ -128,13 +137,20 @@ std::pair<uint32_t, std::time_t> MuxTime::get_time()
return {timestamp % 0xfa0000, edi_time};
}
+std::pair<uint32_t, std::time_t> MuxTime::get_milliseconds_seconds()
+{
+ auto tist_seconds = get_tist_seconds();
+ return {tist_seconds.first >> TIMESTAMP_LEVEL_2_SHIFT, tist_seconds.second};
+}
+
DabMultiplexer::DabMultiplexer(boost::property_tree::ptree pt) :
RemoteControllable("mux"),
m_pt(pt),
+ m_time(),
ensemble(std::make_shared<dabEnsemble>()),
m_clock_tai(split_pipe_separated_string(pt.get("general.tai_clock_bulletins", ""))),
- fig_carousel(ensemble)
+ fig_carousel(ensemble, [&]() { return m_time.get_milliseconds_seconds(); })
{
RC_ADD_PARAMETER(frames, "Show number of frames generated [read-only]");
RC_ADD_PARAMETER(tist_offset, "Timestamp offset in fractional number of seconds");
@@ -190,7 +206,7 @@ void DabMultiplexer::prepare(bool require_tai_clock)
bool tist_enabled = m_pt.get("general.tist", false);
m_time.tist_offset = m_pt.get<double>("general.tist_offset", 0.0);
- auto tist_edi_time = m_time.get_time();
+ auto tist_edi_time = m_time.get_tist_seconds();
const auto timestamp = tist_edi_time.first;
const auto edi_time = tist_edi_time.second;
m_time.mnsc_time = edi_time;
@@ -467,9 +483,8 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp";
}
}
- update_dab_time();
- auto tist_edi_time = m_time.get_time();
+ auto tist_edi_time = m_time.get_tist_seconds();
const auto timestamp = tist_edi_time.first;
const auto edi_time = tist_edi_time.second;
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index aa6adfb..5a0d906 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -47,9 +47,11 @@ class MuxTime {
private:
uint32_t m_timestamp = 0;
std::time_t m_edi_time = 0;
+ uint32_t m_tist_at_fct0_us = 0;
public:
- std::pair<uint32_t, std::time_t> get_time();
+ std::pair<uint32_t, std::time_t> get_tist_seconds();
+ std::pair<uint32_t, std::time_t> get_milliseconds_seconds();
double tist_offset = 0;
@@ -80,8 +82,6 @@ class DabMultiplexer : public RemoteControllable {
void prepare(bool require_tai_clock);
- uint64_t getCurrentFrame() const { return currentFrame; }
-
void mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs);
void print_info(void);
@@ -115,6 +115,5 @@ class DabMultiplexer : public RemoteControllable {
bool m_tai_clock_required = false;
ClockTAI m_clock_tai;
- /* New FIG Carousel */
FIC::FIGCarousel fig_carousel;
};
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 1a367da..bf525c1 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -327,6 +327,38 @@ int main(int argc, char *argv[])
if (outputuid == "edi") {
ptree pt_edi = pt_outputs.get_child("edi");
+ bool default_enable_pft = pt_edi.get<bool>("enable_pft", false);
+ edi_conf.verbose = pt_edi.get<bool>("verbose", false);
+
+ unsigned int default_fec = pt_edi.get<unsigned int>("fec", 3);
+ unsigned int default_chunk_len = pt_edi.get<unsigned int>("chunk_len", 207);
+
+ auto check_spreading_factor = [](int percent) {
+ if (percent < 0) {
+ throw std::runtime_error("EDI output: negative packet_spread value is invalid.");
+ }
+ double factor = (double)percent / 100.0;
+ if (factor > 30000) {
+ throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ }
+ return factor;
+ };
+
+ double default_spreading_factor = check_spreading_factor(pt_edi.get<int>("packet_spread", 95));
+
+ using pt_t = boost::property_tree::basic_ptree<std::basic_string<char>, std::basic_string<char>>;
+ auto handle_overrides = [&](edi::pft_settings_t& pft_settings, pt_t pt) {
+ pft_settings.chunk_len = pt.get<unsigned int>("chunk_len", default_chunk_len);
+ pft_settings.enable_pft = pt.get<bool>("enable_pft", default_enable_pft);
+ pft_settings.fec = pt.get<unsigned int>("fec", default_fec);
+ pft_settings.fragment_spreading_factor = default_spreading_factor;
+ auto override_spread_percent = pt.get_optional<int>("packet_spread");
+ if (override_spread_percent) {
+ pft_settings.fragment_spreading_factor = check_spreading_factor(*override_spread_percent);
+ }
+ pft_settings.verbose = pt.get<bool>("verbose", edi_conf.verbose);
+ };
+
for (auto pt_edi_dest : pt_edi.get_child("destinations")) {
const auto proto = pt_edi_dest.second.get<string>("protocol", "udp");
if (proto == "udp") {
@@ -346,6 +378,8 @@ int main(int argc, char *argv[])
dest->dest_port = pt_edi.get<unsigned int>("port");
}
+ handle_overrides(dest->pft_settings, pt_edi_dest.second);
+
edi_conf.destinations.push_back(dest);
}
else if (proto == "tcp") {
@@ -355,6 +389,8 @@ int main(int argc, char *argv[])
double preroll = pt_edi_dest.second.get<double>("preroll-burst", 0.0);
dest->tcp_server_preroll_buffers = ceil(preroll / 24e-3);
+ handle_overrides(dest->pft_settings, pt_edi_dest.second);
+
edi_conf.destinations.push_back(dest);
}
else {
@@ -362,22 +398,6 @@ int main(int argc, char *argv[])
}
}
- edi_conf.dump = pt_edi.get<bool>("dump", false);
- edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false);
- edi_conf.verbose = pt_edi.get<bool>("verbose", false);
-
- edi_conf.fec = pt_edi.get<unsigned int>("fec", 3);
- edi_conf.chunk_len = pt_edi.get<unsigned int>("chunk_len", 207);
-
- int spread_percent = pt_edi.get<int>("packet_spread", 95);
- if (spread_percent < 0) {
- throw std::runtime_error("EDI output: negative packet_spread value is invalid.");
- }
- edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0;
- if (edi_conf.fragment_spreading_factor > 30000) {
- throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
- }
-
edi_conf.tagpacket_alignment = pt_edi.get<unsigned int>("tagpacket_alignment", 8);
mux.set_edi_config(edi_conf);
diff --git a/src/fig/FIG.h b/src/fig/FIG.h
index 9752245..eda4671 100644
--- a/src/fig/FIG.h
+++ b/src/fig/FIG.h
@@ -35,11 +35,19 @@ namespace FIC {
class FIGRuntimeInformation {
public:
- FIGRuntimeInformation(std::shared_ptr<dabEnsemble>& e) :
+
+ using dab_time_t = std::pair<uint32_t /* milliseconds */, time_t>;
+ using get_time_func_t = std::function<dab_time_t()>;
+
+ FIGRuntimeInformation(
+ std::shared_ptr<dabEnsemble>& e,
+ get_time_func_t getTimeFunc) :
+ getTimeFunc(getTimeFunc),
currentFrame(0),
ensemble(e),
factumAnalyzer(false) {}
+ get_time_func_t getTimeFunc;
unsigned long currentFrame;
std::shared_ptr<dabEnsemble> ensemble;
bool factumAnalyzer;
diff --git a/src/fig/FIG0_10.cpp b/src/fig/FIG0_10.cpp
index 56ce9fb..240aa19 100644
--- a/src/fig/FIG0_10.cpp
+++ b/src/fig/FIG0_10.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -23,7 +23,6 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "fig/FIG0structs.h"
#include "fig/FIG0_10.h"
#include "utils.h"
@@ -89,7 +88,7 @@ FillStatus FIG0_10::fill(uint8_t *buf, size_t max_size)
return fs;
}
- //Time and country identifier
+ // Time and country identifier
auto fig0_10 = (FIGtype0_10_LongForm*)buf;
fig0_10->FIGtypeNumber = 0;
@@ -102,9 +101,9 @@ FillStatus FIG0_10::fill(uint8_t *buf, size_t max_size)
remaining -= 2;
struct tm timeData;
- time_t dab_time_seconds = 0;
- uint32_t dab_time_millis = 0;
- get_dab_time(&dab_time_seconds, &dab_time_millis);
+ const auto dab_time = m_rti->getTimeFunc();
+ time_t dab_time_seconds = dab_time.second;
+ uint32_t dab_time_millis = dab_time.first;
gmtime_r(&dab_time_seconds, &timeData);
fig0_10->RFU = 0;
diff --git a/src/fig/FIGCarousel.cpp b/src/fig/FIGCarousel.cpp
index 9748dbf..ceda275 100644
--- a/src/fig/FIGCarousel.cpp
+++ b/src/fig/FIGCarousel.cpp
@@ -68,8 +68,11 @@ bool FIGCarouselElement::check_deadline()
/**************** FIGCarousel *****************/
-FIGCarousel::FIGCarousel(std::shared_ptr<dabEnsemble> ensemble) :
- m_rti(ensemble),
+FIGCarousel::FIGCarousel(
+ std::shared_ptr<dabEnsemble> ensemble,
+ FIGRuntimeInformation::get_time_func_t getTimeFunc
+ ) :
+ m_rti(ensemble, getTimeFunc),
m_fig0_0(&m_rti),
m_fig0_1(&m_rti),
m_fig0_2(&m_rti),
diff --git a/src/fig/FIGCarousel.h b/src/fig/FIGCarousel.h
index 1e33577..a2a8022 100644
--- a/src/fig/FIGCarousel.h
+++ b/src/fig/FIGCarousel.h
@@ -67,7 +67,9 @@ enum class FIBAllocation {
class FIGCarousel {
public:
- FIGCarousel(std::shared_ptr<dabEnsemble> ensemble);
+ FIGCarousel(
+ std::shared_ptr<dabEnsemble> ensemble,
+ FIGRuntimeInformation::get_time_func_t getTimeFunc);
/* Write all FIBs to the buffer, including correct padding and crc.
* Returns number of bytes written.
diff --git a/src/utils.cpp b/src/utils.cpp
index 1a13caf..7ea6293 100644
--- a/src/utils.cpp
+++ b/src/utils.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -34,30 +34,8 @@
using namespace std;
-static time_t dab_time_seconds = 0;
-static int dab_time_millis = 0;
-
static void printServices(const vector<shared_ptr<DabService> >& services);
-void update_dab_time()
-{
- if (dab_time_seconds == 0) {
- dab_time_seconds = time(nullptr);
- } else {
- dab_time_millis+= 24;
- if (dab_time_millis >= 1000) {
- dab_time_millis -= 1000;
- ++dab_time_seconds;
- }
- }
-}
-
-void get_dab_time(time_t *time, uint32_t *millis)
-{
- *time = dab_time_seconds;
- *millis = dab_time_millis;
-}
-
uint32_t gregorian2mjd(int year, int month, int day)
{
diff --git a/src/utils.h b/src/utils.h
index 331a0b2..d037bb3 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
This file contains a set of utility functions that are used to show
@@ -34,10 +34,6 @@
#include <memory>
#include "MuxElements.h"
-/* Must be called once per ETI frame to update the time */
-void update_dab_time(void);
-void get_dab_time(time_t *time, uint32_t *millis);
-
/* Convert a date and time into the modified Julian date
* used in FIG 0/10
*