aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp11
-rw-r--r--src/DabMultiplexer.cpp232
-rw-r--r--src/DabMultiplexer.h77
-rw-r--r--src/DabMux.cpp52
-rw-r--r--src/ManagementServer.cpp75
-rw-r--r--src/ManagementServer.h20
-rw-r--r--src/fig/FIG.h10
-rw-r--r--src/fig/FIG0_10.cpp11
-rw-r--r--src/fig/FIG0structs.h18
-rw-r--r--src/fig/FIGCarousel.cpp7
-rw-r--r--src/fig/FIGCarousel.h4
-rw-r--r--src/input/Edi.cpp27
-rw-r--r--src/utils.cpp31
-rw-r--r--src/utils.h6
-rw-r--r--src/zmq2edi/EDISender.cpp390
-rw-r--r--src/zmq2edi/EDISender.h91
-rw-r--r--src/zmq2edi/README.md8
-rw-r--r--src/zmq2edi/Sender.cpp320
-rw-r--r--src/zmq2edi/Sender.h98
-rw-r--r--src/zmq2edi/zmq2edi.cpp360
20 files changed, 953 insertions, 895 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 74e627b..7d166b6 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -36,16 +36,13 @@
# include "config.h"
#endif
-#include "dabOutput/dabOutput.h"
#include "utils.h"
-#include "DabMux.h"
-#include "ManagementServer.h"
#include "input/Edi.h"
#include "input/Prbs.h"
#include "input/Zmq.h"
#include "input/File.h"
#include "input/Udp.h"
-#include "Eti.h"
+#include "fig/FIG0structs.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
@@ -63,6 +60,12 @@ using namespace std;
using boost::property_tree::ptree;
using boost::property_tree::ptree_error;
+constexpr uint16_t DEFAULT_DATA_BITRATE = 384;
+constexpr uint16_t DEFAULT_PACKET_BITRATE = 32;
+
+constexpr uint32_t DEFAULT_SERVICE_ID = 50;
+
+
static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
const ptree &pt,
std::shared_ptr<dabEnsemble> ensemble,
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index b1f2c75..52f053a 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2024
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -23,11 +23,14 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <cmath>
#include <set>
#include <memory>
#include "DabMultiplexer.h"
#include "ConfigParser.h"
-#include "fig/FIG.h"
+#include "ManagementServer.h"
+#include "crc.h"
+#include "utils.h"
using namespace std;
@@ -44,16 +47,101 @@ static vector<string> split_pipe_separated_string(const std::string& s)
return components;
}
-DabMultiplexer::DabMultiplexer(
- boost::property_tree::ptree pt) :
+uint64_t MuxTime::init(uint32_t tist_at_fct0_ms, double tist_offset)
+{
+ // Things we must guarantee, up to granularity of 24ms:
+ // Difference between current time and EDI time = tist_offset
+ // TIST of frame 0 = tist_at_fct0_ms
+ // In order to achieve the second, we calculate the initial
+ // counter value so that FCT0 corresponds to the desired TIST.
+ //
+ // Changing the tist_offset at runtime will throw off the TIST@FCT0 value
+ m_tist_offset_ms = std::lround(tist_offset * 1000);
+
+ using Sec = chrono::seconds;
+ const auto now = chrono::system_clock::now() +
+ chrono::milliseconds(std::lround(tist_offset * 1000.0));
+
+ const auto offset = now - chrono::time_point_cast<Sec>(now);
+ if (offset >= chrono::seconds(1)) {
+ throw std::logic_error("Invalid startup offset calculation for TIST! " +
+ to_string(chrono::duration_cast<chrono::milliseconds>(offset).count()) +
+ " ms");
+ }
+ const time_t t_now = chrono::system_clock::to_time_t(chrono::time_point_cast<Sec>(now));
+ const auto offset_ms = chrono::duration_cast<chrono::milliseconds>(offset).count();
+
+ m_edi_time = t_now;
+ m_pps_offset_ms = std::lround(offset_ms / 24.0) * 24;
+
+ const auto counter_offset = tist_at_fct0_ms / 24;
+ const auto offset_as_count = m_pps_offset_ms / 24;
+
+ etiLog.level(debug) << "Init " << counter_offset << " " << offset_as_count;
+
+ return (250 - counter_offset + offset_as_count) % 250;
+}
+
+constexpr int TIMESTAMP_LEVEL_2_SHIFT = 14;
+
+void MuxTime::increment_timestamp()
+{
+ m_pps_offset_ms += 24;
+ if (m_pps_offset_ms >= 1000) {
+ m_pps_offset_ms -= 1000;
+ m_edi_time += 1;
+
+ // Also update MNSC time for next time FP==0
+ mnsc_increment_time = true;
+ }
+}
+
+void MuxTime::set_tist_offset(double new_tist_offset)
+{
+ const int32_t new_tist_offset_ms = std::lround(new_tist_offset * 1000.0);
+ int32_t delta = m_tist_offset_ms - new_tist_offset_ms;
+ if (delta > 0) {
+ while (delta > 0) {
+ increment_timestamp();
+ delta -= 24;
+ }
+ }
+ else if (delta < 0) {
+ while (delta < 0) {
+ m_edi_time -= 1;
+ delta += 1000;
+ }
+ // compensate the we subtracted too much
+ while (delta > 0) {
+ increment_timestamp();
+ delta -= 24;
+ }
+ }
+ m_tist_offset_ms = new_tist_offset_ms;
+}
+
+std::pair<uint32_t, std::time_t> MuxTime::get_tist_seconds()
+{
+ auto timestamp = m_pps_offset_ms * 16384;
+ return {timestamp % 0xfa0000, m_edi_time};
+}
+
+std::pair<uint32_t, std::time_t> MuxTime::get_milliseconds_seconds()
+{
+ auto tist_seconds = get_tist_seconds();
+ return {tist_seconds.first >> TIMESTAMP_LEVEL_2_SHIFT, tist_seconds.second};
+}
+
+
+DabMultiplexer::DabMultiplexer(boost::property_tree::ptree pt) :
RemoteControllable("mux"),
m_pt(pt),
+ m_time(),
ensemble(std::make_shared<dabEnsemble>()),
m_clock_tai(split_pipe_separated_string(pt.get("general.tai_clock_bulletins", ""))),
- fig_carousel(ensemble)
+ fig_carousel(ensemble, [&]() { return m_time.get_milliseconds_seconds(); })
{
RC_ADD_PARAMETER(frames, "Show number of frames generated [read-only]");
- RC_ADD_PARAMETER(tist_offset, "Timestamp offset in integral number of seconds");
rcs.enrol(&m_clock_tai);
}
@@ -99,58 +187,23 @@ void DabMultiplexer::prepare(bool require_tai_clock)
throw MuxInitException();
}
- /* At startup, derive edi_time, TIST and CIF count such that there is
- * a consistency across mux restarts. Ensure edi_time and TIST represent
- * current time.
- *
- * FCT and DLFC are directly derived from m_currentFrame.
- * Every 6s, FCT overflows. DLFC overflows at 5000 every 120s.
- *
- * Keep a granularity of 24ms, which corresponds to the duration of an ETI
- * frame, to get nicer timestamps.
- */
- using Sec = chrono::seconds;
- const auto now = chrono::system_clock::now();
- const time_t t_now = chrono::system_clock::to_time_t(chrono::time_point_cast<Sec>(now));
-
- m_edi_time = t_now - (t_now % 6);
- m_currentFrame = 0;
- time_t edi_time_at_cif0 = t_now - (t_now % 120);
- while (edi_time_at_cif0 < m_edi_time) {
- edi_time_at_cif0 += 6;
- m_currentFrame += 250;
- }
-
- if (edi_time_at_cif0 != m_edi_time) {
- throw std::logic_error("Invalid startup offset calculation for CIF!");
- }
-
- const auto offset = now - chrono::time_point_cast<Sec>(now);
- if (offset >= chrono::seconds(1)) {
- throw std::logic_error("Invalid startup offset calculation for TIST! " +
- to_string(chrono::duration_cast<chrono::milliseconds>(offset).count()) +
- " ms");
- }
+ const uint32_t tist_at_fct0_ms = m_pt.get<double>("general.tist_at_fct0", 0);
+ currentFrame = m_time.init(tist_at_fct0_ms, m_pt.get<double>("general.tist_offset", 0.0));
+ m_time.mnsc_increment_time = false;
- int64_t offset_ms = chrono::duration_cast<chrono::milliseconds>(offset).count();
- offset_ms += 1000 * (t_now - m_edi_time);
+ bool tist_enabled = m_pt.get("general.tist", false);
- m_timestamp = 0;
- while (offset_ms >= 24) {
- increment_timestamp();
- m_currentFrame++;
- offset_ms -= 24;
- }
+ auto tist_edi_time = m_time.get_tist_seconds();
+ const auto timestamp = tist_edi_time.first;
+ const auto edi_time = tist_edi_time.second;
+ m_time.mnsc_time = edi_time;
etiLog.log(info, "Startup CIF Count %i with timestamp: %d + %f",
- m_currentFrame, m_edi_time,
- (m_timestamp & 0xFFFFFF) / 16384000.0);
+ currentFrame, edi_time,
+ (timestamp & 0xFFFFFF) / 16384000.0);
// Try to load offset once
- bool tist_enabled = m_pt.get("general.tist", false);
- m_tist_offset = m_pt.get<int>("general.tist_offset", 0);
-
m_tai_clock_required = (tist_enabled and edi_conf.enabled()) or require_tai_clock;
if (m_tai_clock_required) {
@@ -387,14 +440,6 @@ void DabMultiplexer::prepare_data_inputs()
}
}
-void DabMultiplexer::increment_timestamp()
-{
- m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2
- if (m_timestamp > 0xf9FFff) {
- m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second
- m_edi_time += 1;
- }
-}
/* Each call creates one ETI frame */
void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs)
@@ -425,9 +470,14 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp";
}
}
- update_dab_time();
- const auto edi_time = m_edi_time + m_tist_offset;
+ auto tist_edi_time = m_time.get_tist_seconds();
+ const auto timestamp = tist_edi_time.first;
+ const auto edi_time = tist_edi_time.second;
+ /*
+ etiLog.level(debug) << "Frame " << currentFrame << " " << edi_time <<
+ " + " << (timestamp >> TIMESTAMP_LEVEL_2_SHIFT);
+ */
// Initialise the ETI frame
memset(etiFrame, 0, 6144);
@@ -443,7 +493,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
//****** Field FSYNC *****//
// See ETS 300 799, 6.2.1.2
- if ((m_currentFrame & 1) == 0) {
+ if ((currentFrame & 1) == 0) {
etiSync->FSYNC = ETI_FSYNC1;
}
else {
@@ -461,9 +511,8 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
eti_FC *fc = (eti_FC *) &etiFrame[4];
//****** FCT ******//
- // Incremente for each frame, overflows at 249
- fc->FCT = m_currentFrame % 250;
- edi_tagDETI.dlfc = m_currentFrame % 5000;
+ fc->FCT = currentFrame % 250;
+ edi_tagDETI.dlfc = currentFrame % 5000;
//****** FICF ******//
// Fast Information Channel Flag, 1 bit, =1 if FIC present
@@ -480,7 +529,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
/* Frame Phase, 3 bit counter, tells the COFDM generator
* when to insert the TII. Is also used by the MNSC.
*/
- fc->FP = edi_tagDETI.fp = m_currentFrame & 0x7;
+ fc->FP = edi_tagDETI.fp = currentFrame & 0x7;
//****** MID ******//
//Mode Identity, 2 bits, 01 ModeI, 10 modeII, 11 ModeIII, 00 ModeIV
@@ -554,14 +603,9 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
eoh->MNSC = 0;
- if (fc->FP == 0) {
- // update the latched time only when FP==0 to ensure MNSC encodes
- // a consistent time
- m_edi_time_latched_for_mnsc = edi_time;
- }
-
struct tm time_tm;
- gmtime_r(&m_edi_time_latched_for_mnsc, &time_tm);
+ gmtime_r(&m_time.mnsc_time, &time_tm);
+
switch (fc->FP & 0x3) {
case 0:
{
@@ -571,6 +615,12 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
mnsc->identifier = 0;
mnsc->rfa = 0;
}
+
+ if (m_time.mnsc_increment_time)
+ {
+ m_time.mnsc_increment_time = false;
+ m_time.mnsc_time += 1;
+ }
break;
case 1:
{
@@ -613,7 +663,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
// Insert all FIBs
const bool fib3_present = (ensemble->transmission_mode == TransmissionMode_e::TM_III);
- index += fig_carousel.write_fibs(&etiFrame[index], m_currentFrame, fib3_present);
+ index += fig_carousel.write_fibs(&etiFrame[index], currentFrame, fib3_present);
/**********************************************************************
****** Input Data Reading *******************************************
@@ -625,12 +675,13 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
int sizeSubchannel = subchannel->getSizeByte();
// no need to check enableTist because we always increment the timestamp
int result = subchannel->readFrame(&etiFrame[index],
- sizeSubchannel, edi_time, tai_utc_offset, m_timestamp);
+ sizeSubchannel,
+ edi_time, tai_utc_offset, timestamp);
if (result < 0) {
etiLog.log(info,
"Subchannel %d read failed at ETI frame number: %d",
- subchannel->id, m_currentFrame);
+ subchannel->id, currentFrame);
}
// save pointer to Audio or Data Stream into correct TagESTn for EDI
@@ -670,8 +721,8 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
bool enableTist = m_pt.get("general.tist", false);
if (enableTist) {
- tist->TIST = htonl(m_timestamp) | 0xff;
- edi_tagDETI.tsta = m_timestamp & 0xffffff;
+ tist->TIST = htonl(timestamp) | 0xff;
+ edi_tagDETI.tsta = timestamp & 0xffffff;
}
else {
tist->TIST = htonl(0xffffff) | 0xff;
@@ -692,7 +743,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
output->setMetadata(md_edi_time);
shared_ptr<OutputMetadata> md_dlfc =
- make_shared<OutputMetadataDLFC>(m_currentFrame % 5000);
+ make_shared<OutputMetadataDLFC>(currentFrame % 5000);
output->setMetadata(md_dlfc);
}
}
@@ -708,8 +759,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
Approximate 8 ms 1 ms 3,91 us 488 ns 61 ns
time resolution
*/
-
- increment_timestamp();
+ m_time.increment_timestamp();
/**********************************************************************
*********** Section FRPD *****************************************
@@ -751,6 +801,12 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
}
edi_sender->write(edi_tagpacket);
+
+ for (const auto& stat : edi_sender->get_tcp_server_stats()) {
+ get_mgmt_server().update_edi_tcp_output_stat(
+ stat.listen_port,
+ stat.stats.size());
+ }
}
#if _DEBUG
@@ -761,7 +817,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
if (enableTist) {
etiLog.log(info, "ETI frame number %i Timestamp: %d + %f",
m_currentFrame, edi_time,
- (m_timestamp & 0xFFFFFF) / 16384000.0);
+ (timestamp & 0xFFFFFF) / 16384000.0);
}
else {
etiLog.log(info, "ETI frame number %i Time: %d, no TIST",
@@ -770,7 +826,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
}
#endif
- m_currentFrame++;
+ currentFrame++;
}
void DabMultiplexer::print_info()
@@ -791,7 +847,7 @@ void DabMultiplexer::set_parameter(const std::string& parameter,
throw ParameterError(ss.str());
}
else if (parameter == "tist_offset") {
- m_tist_offset = std::stoi(value);
+ m_time.set_tist_offset(std::stod(value));
}
else {
stringstream ss;
@@ -806,10 +862,10 @@ const std::string DabMultiplexer::get_parameter(const std::string& parameter) co
{
stringstream ss;
if (parameter == "frames") {
- ss << m_currentFrame;
+ ss << currentFrame;
}
else if (parameter == "tist_offset") {
- ss << m_tist_offset;
+ ss << m_time.tist_offset();
}
else {
ss << "Parameter '" << parameter <<
@@ -823,8 +879,8 @@ const std::string DabMultiplexer::get_parameter(const std::string& parameter) co
const json::map_t DabMultiplexer::get_all_values() const
{
json::map_t map;
- map["frames"].v = m_currentFrame;
- map["tist_offset"].v = m_tist_offset;
+ map["frames"].v = currentFrame;
+ map["tist_offset"].v = m_time.tist_offset();
return map;
}
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index 44155dc..9306eed 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2024
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -30,21 +30,12 @@
#endif
#include "dabOutput/dabOutput.h"
-#include "edioutput/TagItems.h"
-#include "edioutput/TagPacket.h"
-#include "edioutput/AFPacket.h"
#include "edioutput/Transport.h"
#include "fig/FIGCarousel.h"
-#include "crc.h"
-#include "utils.h"
-#include "Socket.h"
-#include "PcDebug.h"
#include "MuxElements.h"
#include "RemoteControl.h"
-#include "Eti.h"
#include "ClockTAI.h"
#include <vector>
-#include <chrono>
#include <memory>
#include <string>
#include <memory>
@@ -52,6 +43,37 @@
constexpr uint32_t ETI_FSYNC1 = 0x49C5F8;
+class MuxTime {
+ private:
+ std::time_t m_edi_time = 0;
+ uint32_t m_pps_offset_ms = 0;
+ int64_t m_tist_offset_ms = 0;
+
+ public:
+ std::pair<uint32_t, std::time_t> get_tist_seconds();
+ std::pair<uint32_t, std::time_t> get_milliseconds_seconds();
+
+
+ /* Pre v3 odr-dabmux did the MNSC calculation differently,
+ * which works with the easydabv2. The rework in odr-dabmux,
+ * deriving MNSC time from EDI time broke this.
+ *
+ * That's why we're now tracking MNSC time in separate variables,
+ * to get the same behaviour back.
+ *
+ * I'm not aware of any devices using MNSC time besides the
+ * easydab. ODR-DabMod now considers EDI seconds or ZMQ metadata.
+ */
+ bool mnsc_increment_time = false;
+ std::time_t mnsc_time = 0;
+
+ /* Setup the time and return the initial currentFrame counter value */
+ uint64_t init(uint32_t tist_at_fct0_ms, double tist_offset);
+ void increment_timestamp();
+ double tist_offset() const { return m_tist_offset_ms / 1000.0; }
+ void set_tist_offset(double new_tist_offset);
+};
+
class DabMultiplexer : public RemoteControllable {
public:
DabMultiplexer(boost::property_tree::ptree pt);
@@ -61,8 +83,6 @@ class DabMultiplexer : public RemoteControllable {
void prepare(bool require_tai_clock);
- uint64_t getCurrentFrame() const { return m_currentFrame; }
-
void mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs);
void print_info(void);
@@ -82,48 +102,19 @@ class DabMultiplexer : public RemoteControllable {
void prepare_subchannels(void);
void prepare_services_components(void);
void prepare_data_inputs(void);
- void increment_timestamp(void);
boost::property_tree::ptree m_pt;
- uint32_t m_timestamp = 0;
- std::time_t m_edi_time = 0;
- std::time_t m_edi_time_latched_for_mnsc = 0;
+ MuxTime m_time;
+ uint64_t currentFrame = 0;
edi::configuration_t edi_conf;
std::shared_ptr<edi::Sender> edi_sender;
- uint64_t m_currentFrame = 0;
-
std::shared_ptr<dabEnsemble> ensemble;
- int m_tist_offset = 0;
bool m_tai_clock_required = false;
ClockTAI m_clock_tai;
- /* New FIG Carousel */
FIC::FIGCarousel fig_carousel;
};
-
-// DAB Mode
-#define DEFAULT_DAB_MODE 1
-
-// Taille de la trame de donnee, sous-canal 3, nb de paquets de 64bits,
-// STL3 * 8 = x kbytes par trame ETI
-
-// Data bitrate in kbits/s. Must be 64 kb/s multiple.
-#define DEFAULT_DATA_BITRATE 384
-#define DEFAULT_PACKET_BITRATE 32
-
-/* default ensemble parameters. Label must be max 16 chars, short label
- * a subset of the label, max 8 chars
- */
-#define DEFAULT_ENSEMBLE_LABEL "ODR Dab Mux"
-#define DEFAULT_ENSEMBLE_SHORT_LABEL "ODRMux"
-#define DEFAULT_ENSEMBLE_ID 0xc000
-#define DEFAULT_ENSEMBLE_ECC 0xa1
-
-// start value for default service IDs (if not overridden by configuration)
-#define DEFAULT_SERVICE_ID 50
-#define DEFAULT_PACKET_ADDRESS 0
-
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 1a367da..bf525c1 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -327,6 +327,38 @@ int main(int argc, char *argv[])
if (outputuid == "edi") {
ptree pt_edi = pt_outputs.get_child("edi");
+ bool default_enable_pft = pt_edi.get<bool>("enable_pft", false);
+ edi_conf.verbose = pt_edi.get<bool>("verbose", false);
+
+ unsigned int default_fec = pt_edi.get<unsigned int>("fec", 3);
+ unsigned int default_chunk_len = pt_edi.get<unsigned int>("chunk_len", 207);
+
+ auto check_spreading_factor = [](int percent) {
+ if (percent < 0) {
+ throw std::runtime_error("EDI output: negative packet_spread value is invalid.");
+ }
+ double factor = (double)percent / 100.0;
+ if (factor > 30000) {
+ throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ }
+ return factor;
+ };
+
+ double default_spreading_factor = check_spreading_factor(pt_edi.get<int>("packet_spread", 95));
+
+ using pt_t = boost::property_tree::basic_ptree<std::basic_string<char>, std::basic_string<char>>;
+ auto handle_overrides = [&](edi::pft_settings_t& pft_settings, pt_t pt) {
+ pft_settings.chunk_len = pt.get<unsigned int>("chunk_len", default_chunk_len);
+ pft_settings.enable_pft = pt.get<bool>("enable_pft", default_enable_pft);
+ pft_settings.fec = pt.get<unsigned int>("fec", default_fec);
+ pft_settings.fragment_spreading_factor = default_spreading_factor;
+ auto override_spread_percent = pt.get_optional<int>("packet_spread");
+ if (override_spread_percent) {
+ pft_settings.fragment_spreading_factor = check_spreading_factor(*override_spread_percent);
+ }
+ pft_settings.verbose = pt.get<bool>("verbose", edi_conf.verbose);
+ };
+
for (auto pt_edi_dest : pt_edi.get_child("destinations")) {
const auto proto = pt_edi_dest.second.get<string>("protocol", "udp");
if (proto == "udp") {
@@ -346,6 +378,8 @@ int main(int argc, char *argv[])
dest->dest_port = pt_edi.get<unsigned int>("port");
}
+ handle_overrides(dest->pft_settings, pt_edi_dest.second);
+
edi_conf.destinations.push_back(dest);
}
else if (proto == "tcp") {
@@ -355,6 +389,8 @@ int main(int argc, char *argv[])
double preroll = pt_edi_dest.second.get<double>("preroll-burst", 0.0);
dest->tcp_server_preroll_buffers = ceil(preroll / 24e-3);
+ handle_overrides(dest->pft_settings, pt_edi_dest.second);
+
edi_conf.destinations.push_back(dest);
}
else {
@@ -362,22 +398,6 @@ int main(int argc, char *argv[])
}
}
- edi_conf.dump = pt_edi.get<bool>("dump", false);
- edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false);
- edi_conf.verbose = pt_edi.get<bool>("verbose", false);
-
- edi_conf.fec = pt_edi.get<unsigned int>("fec", 3);
- edi_conf.chunk_len = pt_edi.get<unsigned int>("chunk_len", 207);
-
- int spread_percent = pt_edi.get<int>("packet_spread", 95);
- if (spread_percent < 0) {
- throw std::runtime_error("EDI output: negative packet_spread value is invalid.");
- }
- edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0;
- if (edi_conf.fragment_spreading_factor > 30000) {
- throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
- }
-
edi_conf.tagpacket_alignment = pt_edi.get<unsigned int>("tagpacket_alignment", 8);
mux.set_edi_config(edi_conf);
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index 568e80e..dff093a 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -28,13 +28,12 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <errno.h>
-#include <string.h>
-#include <math.h>
-#include <stdint.h>
-#include <limits>
#include <sstream>
#include <algorithm>
+#include <cstring>
+#include <cmath>
+#include <cstdint>
+#include <limits>
#include <boost/version.hpp>
#include "ManagementServer.h"
#include "Log.h"
@@ -127,37 +126,42 @@ ManagementServer& get_mgmt_server()
*/
}
-void ManagementServer::registerInput(InputStat* is)
+void ManagementServer::register_input(InputStat* is)
{
unique_lock<mutex> lock(m_statsmutex);
std::string id(is->get_name());
- if (m_inputStats.count(id) == 1) {
+ if (m_input_stats.count(id) == 1) {
etiLog.level(error) <<
"Double registration in MGMT Server with id '" <<
id << "'";
return;
}
- m_inputStats[id] = is;
+ m_input_stats[id] = is;
}
-void ManagementServer::unregisterInput(std::string id)
+void ManagementServer::unregister_input(std::string id)
{
unique_lock<mutex> lock(m_statsmutex);
- if (m_inputStats.count(id) == 1) {
- m_inputStats.erase(id);
+ if (m_input_stats.count(id) == 1) {
+ m_input_stats.erase(id);
}
}
+// outputs will never disappear, no need to have a "remove" logic
+void ManagementServer::update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections)
+{
+ m_output_stats[listen_port] = num_connections;
+}
bool ManagementServer::isInputRegistered(std::string& id)
{
unique_lock<mutex> lock(m_statsmutex);
- if (m_inputStats.count(id) == 0) {
+ if (m_input_stats.count(id) == 0) {
etiLog.level(error) <<
"Management Server: id '" <<
id << "' does was not registered";
@@ -166,7 +170,7 @@ bool ManagementServer::isInputRegistered(std::string& id)
return true;
}
-std::string ManagementServer::getStatConfigJSON()
+std::string ManagementServer::get_input_config_json()
{
unique_lock<mutex> lock(m_statsmutex);
@@ -175,7 +179,7 @@ std::string ManagementServer::getStatConfigJSON()
std::map<std::string,InputStat*>::iterator iter;
int i = 0;
- for(iter = m_inputStats.begin(); iter != m_inputStats.end();
+ for (iter = m_input_stats.begin(); iter != m_input_stats.end();
++iter, i++)
{
std::string id = iter->first;
@@ -192,16 +196,15 @@ std::string ManagementServer::getStatConfigJSON()
return ss.str();
}
-std::string ManagementServer::getValuesJSON()
+std::string ManagementServer::get_input_values_json()
{
unique_lock<mutex> lock(m_statsmutex);
std::ostringstream ss;
ss << "{ \"values\" : {\n";
- std::map<std::string,InputStat*>::iterator iter;
int i = 0;
- for(iter = m_inputStats.begin(); iter != m_inputStats.end();
+ for (auto iter = m_input_stats.begin(); iter != m_input_stats.end();
++iter, i++)
{
const std::string& id = iter->first;
@@ -220,6 +223,31 @@ std::string ManagementServer::getValuesJSON()
return ss.str();
}
+std::string ManagementServer::get_output_values_json()
+{
+ unique_lock<mutex> lock(m_statsmutex);
+
+ std::ostringstream ss;
+ ss << "{ \"output_values\" : {\n";
+
+ int i = 0;
+ for (auto iter = m_output_stats.begin(); iter != m_output_stats.end();
+ ++iter, i++)
+ {
+ auto listen_port = iter->first;
+ auto num_connections = iter->second;
+ if (i > 0) {
+ ss << " ,\n";
+ }
+ ss << " \"edi_tcp_" << listen_port << "\" : { \"num_connections\": " <<
+ num_connections << "} ";
+ }
+
+ ss << "}\n}\n";
+
+ return ss.str();
+}
+
ManagementServer::ManagementServer() :
m_zmq_context(),
m_zmq_sock(m_zmq_context, ZMQ_REP),
@@ -323,10 +351,13 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message)
<< "}\n";
}
else if (data == "config") {
- answer << getStatConfigJSON();
+ answer << get_input_config_json();
}
else if (data == "values") {
- answer << getValuesJSON();
+ answer << get_input_values_json();
+ }
+ else if (data == "output_values") {
+ answer << get_output_values_json();
}
else if (data == "getptree") {
unique_lock<mutex> lock(m_configmutex);
@@ -366,12 +397,12 @@ InputStat::InputStat(const std::string& name) :
InputStat::~InputStat()
{
- get_mgmt_server().unregisterInput(m_name);
+ get_mgmt_server().unregister_input(m_name);
}
void InputStat::registerAtServer()
{
- get_mgmt_server().registerInput(this);
+ get_mgmt_server().register_input(this);
}
void InputStat::notifyBuffer(long bufsize)
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index 6e39922..c7a4222 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -50,6 +50,7 @@
# include "config.h"
#endif
+#include "Socket.h"
#include "zmq.hpp"
#include <string>
#include <map>
@@ -167,8 +168,10 @@ class ManagementServer
void open(int listenport);
/* Un-/Register a statistics data source */
- void registerInput(InputStat* is);
- void unregisterInput(std::string id);
+ void register_input(InputStat* is);
+ void unregister_input(std::string id);
+
+ void update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections);
/* Load a ptree given by the management server.
*
@@ -205,20 +208,25 @@ class ManagementServer
std::thread m_restarter_thread;
/******* Statistics Data ********/
- std::map<std::string, InputStat*> m_inputStats;
+ std::map<std::string, InputStat*> m_input_stats;
+
+ // Holds information about EDI/TCP outputs
+ std::map<uint16_t /* port */, size_t /* num_connections */> m_output_stats;
/* Return a description of the configuration that will
* allow to define what graphs to be created
*
* returns: a JSON encoded configuration
*/
- std::string getStatConfigJSON();
+ std::string get_input_config_json();
/* Return the values for the statistics as defined in the configuration
*
* returns: JSON encoded statistics
*/
- std::string getValuesJSON();
+ std::string get_input_values_json();
+
+ std::string get_output_values_json();
// mutex for accessing the map
std::mutex m_statsmutex;
diff --git a/src/fig/FIG.h b/src/fig/FIG.h
index 9752245..eda4671 100644
--- a/src/fig/FIG.h
+++ b/src/fig/FIG.h
@@ -35,11 +35,19 @@ namespace FIC {
class FIGRuntimeInformation {
public:
- FIGRuntimeInformation(std::shared_ptr<dabEnsemble>& e) :
+
+ using dab_time_t = std::pair<uint32_t /* milliseconds */, time_t>;
+ using get_time_func_t = std::function<dab_time_t()>;
+
+ FIGRuntimeInformation(
+ std::shared_ptr<dabEnsemble>& e,
+ get_time_func_t getTimeFunc) :
+ getTimeFunc(getTimeFunc),
currentFrame(0),
ensemble(e),
factumAnalyzer(false) {}
+ get_time_func_t getTimeFunc;
unsigned long currentFrame;
std::shared_ptr<dabEnsemble> ensemble;
bool factumAnalyzer;
diff --git a/src/fig/FIG0_10.cpp b/src/fig/FIG0_10.cpp
index 56ce9fb..240aa19 100644
--- a/src/fig/FIG0_10.cpp
+++ b/src/fig/FIG0_10.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -23,7 +23,6 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "fig/FIG0structs.h"
#include "fig/FIG0_10.h"
#include "utils.h"
@@ -89,7 +88,7 @@ FillStatus FIG0_10::fill(uint8_t *buf, size_t max_size)
return fs;
}
- //Time and country identifier
+ // Time and country identifier
auto fig0_10 = (FIGtype0_10_LongForm*)buf;
fig0_10->FIGtypeNumber = 0;
@@ -102,9 +101,9 @@ FillStatus FIG0_10::fill(uint8_t *buf, size_t max_size)
remaining -= 2;
struct tm timeData;
- time_t dab_time_seconds = 0;
- uint32_t dab_time_millis = 0;
- get_dab_time(&dab_time_seconds, &dab_time_millis);
+ const auto dab_time = m_rti->getTimeFunc();
+ time_t dab_time_seconds = dab_time.second;
+ uint32_t dab_time_millis = dab_time.first;
gmtime_r(&dab_time_seconds, &timeData);
fig0_10->RFU = 0;
diff --git a/src/fig/FIG0structs.h b/src/fig/FIG0structs.h
index 5f514b3..2e107e8 100644
--- a/src/fig/FIG0structs.h
+++ b/src/fig/FIG0structs.h
@@ -24,19 +24,17 @@
*/
#pragma once
-
#include <cstdint>
-
#include "fig/FIG.h"
-#define FIG0_13_APPTYPE_SLIDESHOW 0x2
-#define FIG0_13_APPTYPE_WEBSITE 0x3
-#define FIG0_13_APPTYPE_TPEG 0x4
-#define FIG0_13_APPTYPE_DGPS 0x5
-#define FIG0_13_APPTYPE_TMC 0x6
-#define FIG0_13_APPTYPE_SPI 0x7
-#define FIG0_13_APPTYPE_DABJAVA 0x8
-#define FIG0_13_APPTYPE_JOURNALINE 0x44a
+constexpr uint16_t FIG0_13_APPTYPE_SLIDESHOW = 0x2;
+constexpr uint16_t FIG0_13_APPTYPE_WEBSITE = 0x3;
+constexpr uint16_t FIG0_13_APPTYPE_TPEG = 0x4;
+constexpr uint16_t FIG0_13_APPTYPE_DGPS = 0x5;
+constexpr uint16_t FIG0_13_APPTYPE_TMC = 0x6;
+constexpr uint16_t FIG0_13_APPTYPE_SPI = 0x7;
+constexpr uint16_t FIG0_13_APPTYPE_DABJAVA = 0x8;
+constexpr uint16_t FIG0_13_APPTYPE_JOURNALINE = 0x44a;
struct FIGtype0 {
diff --git a/src/fig/FIGCarousel.cpp b/src/fig/FIGCarousel.cpp
index 9748dbf..ceda275 100644
--- a/src/fig/FIGCarousel.cpp
+++ b/src/fig/FIGCarousel.cpp
@@ -68,8 +68,11 @@ bool FIGCarouselElement::check_deadline()
/**************** FIGCarousel *****************/
-FIGCarousel::FIGCarousel(std::shared_ptr<dabEnsemble> ensemble) :
- m_rti(ensemble),
+FIGCarousel::FIGCarousel(
+ std::shared_ptr<dabEnsemble> ensemble,
+ FIGRuntimeInformation::get_time_func_t getTimeFunc
+ ) :
+ m_rti(ensemble, getTimeFunc),
m_fig0_0(&m_rti),
m_fig0_1(&m_rti),
m_fig0_2(&m_rti),
diff --git a/src/fig/FIGCarousel.h b/src/fig/FIGCarousel.h
index 1e33577..a2a8022 100644
--- a/src/fig/FIGCarousel.h
+++ b/src/fig/FIGCarousel.h
@@ -67,7 +67,9 @@ enum class FIBAllocation {
class FIGCarousel {
public:
- FIGCarousel(std::shared_ptr<dabEnsemble> ensemble);
+ FIGCarousel(
+ std::shared_ptr<dabEnsemble> ensemble,
+ FIGRuntimeInformation::get_time_func_t getTimeFunc);
/* Write all FIBs to the buffer, including correct padding and crc.
* Returns number of bytes written.
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 3838541..141641f 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -81,6 +81,7 @@ void Edi::open(const std::string& name)
{
const std::regex re_udp("udp://:([0-9]+)");
const std::regex re_udp_multicast("udp://@([0-9.]+):([0-9]+)");
+ const std::regex re_udp_multicast_bindto("udp://([0-9.])+@([0-9.]+):([0-9]+)");
const std::regex re_tcp("tcp://(.*):([0-9]+)");
lock_guard<mutex> lock(m_mutex);
@@ -97,13 +98,31 @@ void Edi::open(const std::string& name)
m_udp_sock.reinit(udp_port);
m_udp_sock.setBlocking(false);
}
+ else if (std::regex_match(name, m, re_udp_multicast_bindto)) {
+ const string bind_to = m[1].str();
+ const string multicast_address = m[2].str();
+ const int udp_port = std::stoi(m[3].str());
+
+ m_input_used = InputUsed::UDP;
+ if (IN_MULTICAST(ntohl(inet_addr(multicast_address.c_str())))) {
+ m_udp_sock.init_receive_multicast(udp_port, bind_to, multicast_address);
+ }
+ else {
+ throw runtime_error(string("Address ") + multicast_address + " is not a multicast address");
+ }
+ m_udp_sock.setBlocking(false);
+ }
else if (std::regex_match(name, m, re_udp_multicast)) {
const string multicast_address = m[1].str();
const int udp_port = std::stoi(m[2].str());
m_input_used = InputUsed::UDP;
- m_udp_sock.reinit(udp_port);
+ if (IN_MULTICAST(ntohl(inet_addr(multicast_address.c_str())))) {
+ m_udp_sock.init_receive_multicast(udp_port, "0.0.0.0", multicast_address);
+ }
+ else {
+ throw runtime_error(string("Address ") + multicast_address + " is not a multicast address");
+ }
m_udp_sock.setBlocking(false);
- m_udp_sock.joinGroup(multicast_address.c_str());
}
else if (std::regex_match(name, m, re_tcp)) {
m_input_used = InputUsed::TCP;
@@ -235,7 +254,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
while (not m_pending_sti_frame.frame.empty()) {
if (m_pending_sti_frame.frame.size() == size) {
- if (m_pending_sti_frame.timestamp.valid()) {
+ if (m_pending_sti_frame.timestamp.is_valid()) {
auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
ts_req += m_tist_delay;
const double offset = ts_req.diff_s(m_pending_sti_frame.timestamp);
@@ -305,7 +324,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
m_is_prebuffering = true;
return 0;
}
- else if (not m_pending_sti_frame.timestamp.valid()) {
+ else if (not m_pending_sti_frame.timestamp.is_valid()) {
etiLog.level(warn) << "EDI input " << m_name <<
" invalid timestamp, ignoring";
memset(buffer, 0, size);
diff --git a/src/utils.cpp b/src/utils.cpp
index 1e006f7..7ea6293 100644
--- a/src/utils.cpp
+++ b/src/utils.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -29,36 +29,13 @@
#include <iostream>
#include <memory>
#include <boost/algorithm/string/join.hpp>
-#include "DabMux.h"
#include "utils.h"
#include "fig/FIG0structs.h"
using namespace std;
-static time_t dab_time_seconds = 0;
-static int dab_time_millis = 0;
-
static void printServices(const vector<shared_ptr<DabService> >& services);
-void update_dab_time()
-{
- if (dab_time_seconds == 0) {
- dab_time_seconds = time(nullptr);
- } else {
- dab_time_millis+= 24;
- if (dab_time_millis >= 1000) {
- dab_time_millis -= 1000;
- ++dab_time_seconds;
- }
- }
-}
-
-void get_dab_time(time_t *time, uint32_t *millis)
-{
- *time = dab_time_seconds;
- *millis = dab_time_millis;
-}
-
uint32_t gregorian2mjd(int year, int month, int day)
{
@@ -99,7 +76,7 @@ void header_message()
fprintf(stderr, "Her Majesty the Queen in Right of Canada\n");
fprintf(stderr, "(Communications Research Centre Canada)\n\n");
- fprintf(stderr, "Copyright (C) 2021 Matthias P. Braendli\n");
+ fprintf(stderr, "Copyright (C) 2024 Matthias P. Braendli\n");
fprintf(stderr, "LICENCE: GPLv3+\n\n");
fprintf(stderr, "http://opendigitalradio.org\n\n");
@@ -107,7 +84,7 @@ void header_message()
fprintf(stderr, "Input URLs supported: prbs udp file zmq\n");
fprintf(stderr, "Inputs format supported: raw mpeg packet epm\n");
- std::cerr << "Output URLs supported:\n" <<
+ std::cerr << "Outputs supported: " <<
#if defined(HAVE_OUTPUT_FILE)
" file" <<
#endif
@@ -126,7 +103,7 @@ void header_message()
#if defined(HAVE_OUTPUT_SIMUL)
" simul" <<
#endif
- "\n\n";
+ " edi zmq\n\n";
}
diff --git a/src/utils.h b/src/utils.h
index 331a0b2..d037bb3 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
This file contains a set of utility functions that are used to show
@@ -34,10 +34,6 @@
#include <memory>
#include "MuxElements.h"
-/* Must be called once per ETI frame to update the time */
-void update_dab_time(void);
-void get_dab_time(time_t *time, uint32_t *millis);
-
/* Convert a date and time into the modified Julian date
* used in FIG 0/10
*
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
new file mode 100644
index 0000000..06b7420
--- /dev/null
+++ b/src/zmq2edi/EDISender.cpp
@@ -0,0 +1,390 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "EDISender.h"
+#include "Log.h"
+#include <cmath>
+#include <numeric>
+#include <map>
+#include <algorithm>
+
+using namespace std;
+
+EDISender::~EDISender()
+{
+ if (running.load()) {
+ running.store(false);
+
+ // Unblock thread
+ frame_t emptyframe;
+ frames.push(emptyframe);
+
+ process_thread.join();
+ }
+}
+
+void EDISender::start(const edi_configuration_t& conf,
+ int delay_ms, int max_delay_ms)
+{
+ edi_conf = conf;
+ tist_delay_ms = delay_ms;
+ tist_max_delay_ms = max_delay_ms;
+
+ if (edi_conf.verbose) {
+ etiLog.log(info, "Setup EDI");
+ }
+
+ if (edi_conf.dump) {
+ edi_debug_file.open("./edi.debug");
+ }
+
+ if (edi_conf.enabled()) {
+ for (auto& edi_destination : edi_conf.destinations) {
+ auto edi_output = make_shared<UdpSocket>(edi_destination.source_port);
+
+ if (not edi_destination.source_addr.empty()) {
+ int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
+ if (err) {
+ throw runtime_error("EDI socket set source failed!");
+ }
+ err = edi_output->setMulticastTTL(edi_destination.ttl);
+ if (err) {
+ throw runtime_error("EDI socket set TTL failed!");
+ }
+ }
+
+ edi_destination.socket = edi_output;
+ }
+ }
+
+ if (edi_conf.verbose) {
+ etiLog.log(info, "EDI set up");
+ }
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT pft(edi_conf);
+ edi_pft = pft;
+
+ if (edi_conf.interleaver_enabled()) {
+ edi_interleaver.SetLatency(edi_conf.latency_frames);
+ }
+
+ startTime = std::chrono::steady_clock::now();
+ running.store(true);
+ process_thread = thread(&EDISender::process, this);
+}
+
+void EDISender::push_frame(const frame_t& frame)
+{
+ frames.push(frame);
+}
+
+void EDISender::print_configuration()
+{
+ if (edi_conf.enabled()) {
+ etiLog.level(info) << "EDI";
+ etiLog.level(info) << " verbose " << edi_conf.verbose;
+ for (auto& edi_dest : edi_conf.destinations) {
+ etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
+ if (not edi_dest.source_addr.empty()) {
+ etiLog.level(info) << " source " << edi_dest.source_addr;
+ etiLog.level(info) << " ttl " << edi_dest.ttl;
+ }
+ etiLog.level(info) << " source port " << edi_dest.source_port;
+ }
+ if (edi_conf.interleaver_enabled()) {
+ etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
+ }
+ }
+ else {
+ etiLog.level(info) << "EDI disabled";
+ }
+}
+
+void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
+{
+ edi::TagDETI edi_tagDETI;
+ edi::TagStarPTR edi_tagStarPtr;
+ map<int, edi::TagESTn> edi_subchannelToTag;
+ // The above Tag Items will be assembled into a TAG Packet
+ edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
+
+ // SYNC
+ edi_tagDETI.stat = p[0];
+
+ // LIDATA FCT
+ edi_tagDETI.dlfc = metadata.dlfc;
+
+ const int fct = p[4];
+ if (metadata.dlfc % 250 != fct) {
+ etiLog.level(warn) << "Frame FCT=" << fct <<
+ " does not correspond to DLFC=" << metadata.dlfc;
+ }
+
+ bool ficf = (p[5] & 0x80) >> 7;
+ edi_tagDETI.ficf = ficf;
+
+ const int nst = p[5] & 0x7F;
+
+ edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
+ const int mid = (p[6] & 0x18) >> 3;
+ edi_tagDETI.mid = mid;
+ //const int fl = (p[6] & 0x07) * 256 + p[7];
+
+ int ficl = 0;
+ if (ficf == 0) {
+ etiLog.level(warn) << "Not FIC in data stream!";
+ return;
+ }
+ else if (mid == 3) {
+ ficl = 32;
+ }
+ else {
+ ficl = 24;
+ }
+
+ vector<uint32_t> sad(nst);
+ vector<uint32_t> stl(nst);
+ // Loop over STC subchannels:
+ for (int i=0; i < nst; i++) {
+ // EDI stream index is 1-indexed
+ const int edi_stream_id = i + 1;
+
+ uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
+ sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
+ uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
+ stl[i] = (p[10+4*i] & 0x03) * 256 + \
+ p[11+4*i];
+
+ edi::TagESTn tag_ESTn;
+ tag_ESTn.id = edi_stream_id;
+ tag_ESTn.scid = scid;
+ tag_ESTn.sad = sad[i];
+ tag_ESTn.tpl = tpl;
+ tag_ESTn.rfa = 0; // two bits
+ tag_ESTn.mst_length = stl[i];
+ tag_ESTn.mst_data = nullptr;
+
+ edi_subchannelToTag[i] = tag_ESTn;
+ }
+
+ const uint16_t mnsc = p[8 + 4*nst] * 256 + \
+ p[8 + 4*nst + 1];
+ edi_tagDETI.mnsc = mnsc;
+
+ /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
+ p[8 + 4*nst + 3]; */
+
+ edi_tagDETI.fic_data = p + 12 + 4*nst;
+ edi_tagDETI.fic_length = ficl * 4;
+
+ // loop over MSC subchannels
+ int offset = 0;
+ for (int i=0; i < nst; i++) {
+ edi::TagESTn& tag = edi_subchannelToTag[i];
+ tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
+
+ offset += stl[i] * 8;
+ }
+
+ /*
+ const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
+ p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
+
+ // TIST
+ const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
+ uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
+ (uint32_t)(p[tist_ix+1]) << 16 |
+ (uint32_t)(p[tist_ix+2]) << 8 |
+ (uint32_t)(p[tist_ix+3]);
+
+ std::time_t posix_timestamp_1_jan_2000 = 946684800;
+
+ // Wait until our time is tist_delay after the TIST before
+ // we release that frame
+
+ using namespace std::chrono;
+
+ const auto seconds = metadata.edi_time;
+ const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
+ const auto t_frame = system_clock::from_time_t(
+ seconds + posix_timestamp_1_jan_2000) + pps_offset;
+
+ const auto t_release = t_frame + milliseconds(tist_delay_ms);
+ const auto t_now = system_clock::now();
+
+ /*
+ etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000;
+ etiLog.level(debug) << "now " << system_clock::to_time_t(t_now);
+ etiLog.level(debug) << "wait " << wait_time.count();
+ */
+
+ const auto wait_time = t_release - t_now;
+ wait_times.push_back(duration_cast<microseconds>(wait_time).count());
+
+ if (tist_max_delay_ms > 0) {
+ const auto t_latest_release = t_frame + milliseconds(tist_max_delay_ms);
+
+ if (t_now > t_latest_release) {
+ // drop frame
+ num_dropped.fetch_add(1);
+ return;
+ }
+ }
+
+ if (t_release > t_now) {
+ std::this_thread::sleep_for(wait_time);
+ }
+
+ edi_tagDETI.tsta = tist;
+ edi_tagDETI.atstf = 1;
+ edi_tagDETI.utco = metadata.utc_offset;
+ edi_tagDETI.seconds = metadata.edi_time;
+
+ if (edi_conf.enabled()) {
+ // put tags *ptr, DETI and all subchannels into one TagPacket
+ edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
+ edi_tagpacket.tag_items.push_back(&edi_tagDETI);
+
+ for (auto& tag : edi_subchannelToTag) {
+ edi_tagpacket.tag_items.push_back(&tag.second);
+ }
+
+ // Assemble into one AF Packet
+ edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
+
+ if (edi_conf.enable_pft) {
+ // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
+ vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
+
+ if (edi_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
+ edi_fragments.size());
+ }
+
+ if (edi_conf.interleaver_enabled()) {
+ edi_fragments = edi_interleaver.Interleave(edi_fragments);
+ }
+
+ // Send over ethernet
+ for (const auto& edi_frag : edi_fragments) {
+ for (auto& dest : edi_conf.destinations) {
+ InetAddress addr;
+ addr.setAddress(dest.dest_addr.c_str());
+ addr.setPort(edi_conf.dest_port);
+
+ dest.socket->send(edi_frag, addr);
+ }
+
+ if (edi_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ }
+ }
+
+ if (edi_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
+ edi_fragments.size());
+ }
+ }
+ else {
+ // Send over ethernet
+ for (auto& dest : edi_conf.destinations) {
+ InetAddress addr;
+ addr.setAddress(dest.dest_addr.c_str());
+ addr.setPort(edi_conf.dest_port);
+
+ dest.socket->send(edi_afpacket, addr);
+ }
+
+ if (edi_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
+ }
+ }
+ }
+}
+
+void EDISender::process()
+{
+ while (running.load()) {
+ frame_t frame;
+ frames.wait_and_pop(frame);
+
+ if (not running.load() or frame.first.empty()) {
+ break;
+ }
+
+ if (frame.first.size() == 6144) {
+ send_eti_frame(frame.first.data(), frame.second);
+ }
+ else {
+ etiLog.level(warn) << "Ignoring short ETI frame, "
+ "DFLC=" << frame.second.dlfc << ", len=" <<
+ frame.first.size();
+ }
+
+ if (wait_times.size() == 250) { // every six seconds
+ const double n = wait_times.size();
+
+ double sum = accumulate(wait_times.begin(), wait_times.end(), 0);
+ size_t num_late = std::count_if(wait_times.begin(), wait_times.end(),
+ [](double v){ return v < 0; });
+ double mean = sum / n;
+
+ double sq_sum = 0;
+ for (const auto t : wait_times) {
+ sq_sum += (t-mean) * (t-mean);
+ }
+ double stdev = sqrt(sq_sum / n);
+ auto min_max = minmax_element(wait_times.begin(), wait_times.end());
+
+ /* Debug code
+ stringstream ss;
+ ss << "times:";
+ for (const auto t : wait_times) {
+ ss << " " << t;
+ }
+ etiLog.level(debug) << ss.str();
+ */
+
+ const size_t dropped = num_dropped.exchange(0);
+
+ etiLog.level(info) << "Wait time statistics [microseconds]:"
+ " min: " << *min_max.first <<
+ " max: " << *min_max.second <<
+ " mean: " << mean <<
+ " stdev: " << stdev <<
+ " late: " <<
+ num_late << " of " << wait_times.size() << " (" <<
+ num_late * 100.0 / n << "%)" <<
+ " dropped: " << dropped;
+
+ wait_times.clear();
+ }
+ }
+}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
new file mode 100644
index 0000000..44502c1
--- /dev/null
+++ b/src/zmq2edi/EDISender.h
@@ -0,0 +1,91 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+#include <iostream>
+#include <iterator>
+#include <thread>
+#include <vector>
+#include <chrono>
+#include <atomic>
+#include "ThreadsafeQueue.h"
+#include "dabOutput/dabOutput.h"
+#include "dabOutput/edi/TagItems.h"
+#include "dabOutput/edi/TagPacket.h"
+#include "dabOutput/edi/AFPacket.h"
+#include "dabOutput/edi/PFT.h"
+#include "dabOutput/edi/Interleaver.h"
+
+// This metadata gets transmitted in the zmq stream
+struct metadata_t {
+ uint32_t edi_time = 0;
+ int16_t utc_offset = 0;
+ uint16_t dlfc = 0;
+};
+
+using frame_t = std::pair<std::vector<uint8_t>, metadata_t>;
+
+class EDISender {
+ public:
+ EDISender() = default;
+ EDISender(const EDISender& other) = delete;
+ EDISender& operator=(const EDISender& other) = delete;
+ ~EDISender();
+ void start(const edi_configuration_t& conf, int delay_ms, int max_delay_ms);
+ void push_frame(const frame_t& frame);
+ void print_configuration(void);
+
+ private:
+ void send_eti_frame(uint8_t* p, metadata_t metadata);
+ void process(void);
+
+ int tist_delay_ms = 0;
+ int tist_max_delay_ms = 0;
+ std::atomic<bool> running = ATOMIC_VAR_INIT(false);
+ std::thread process_thread;
+ edi_configuration_t edi_conf;
+ std::chrono::steady_clock::time_point startTime;
+ ThreadsafeQueue<frame_t> frames;
+ std::ofstream edi_debug_file;
+
+ // The TagPacket will then be placed into an AFPacket
+ edi::AFPacketiser edi_afPacketiser;
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT edi_pft;
+
+ // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
+ edi::Interleaver edi_interleaver;
+
+ // For statistics about wait time before we transmit packets,
+ // in microseconds
+ std::vector<double> wait_times;
+
+ // Number of frames dropped because their TIST was larger than max_delay
+ std::atomic<size_t> num_dropped = ATOMIC_VAR_INIT(0);
+
+};
diff --git a/src/zmq2edi/README.md b/src/zmq2edi/README.md
deleted file mode 100644
index e509479..0000000
--- a/src/zmq2edi/README.md
+++ /dev/null
@@ -1,8 +0,0 @@
-Convert an ZeroMQ stream to EDI
-===============================
-
-This *zmq2edi* tool can receive a ZMQ ETI stream from
-ODR-DabMux and generate and EDI stream.
-
-Quite useful if your modulator wants EDI input, and your network is not good
-enough making you want to use something based on TCP.
diff --git a/src/zmq2edi/Sender.cpp b/src/zmq2edi/Sender.cpp
deleted file mode 100644
index fe46846..0000000
--- a/src/zmq2edi/Sender.cpp
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2024
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "Sender.h"
-#include "Log.h"
-#include <cmath>
-#include <numeric>
-#include <map>
-#include <algorithm>
-#include <limits>
-
-using namespace std;
-
-Sender::Sender() :
- zmq_ctx(2)
-{
-}
-
-Sender::~Sender()
-{
- if (running.load()) {
- running.store(false);
-
- // Unblock thread
- frame_t emptyframe;
- frames.push(std::move(emptyframe));
-
- process_thread.join();
- }
-}
-
-void Sender::start(const edi::configuration_t& conf,
- const zmq_send_config_t& zmq_conf,
- int delay_ms, bool drop_late_packets)
-{
- edi_conf = conf;
- tist_delay_ms = delay_ms;
- drop_late = drop_late_packets;
-
- edi_sender = make_shared<edi::Sender>(edi_conf);
-
- for (const auto& url : zmq_conf.urls) {
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB);
- zmq_sock.bind(url.c_str());
- zmq_sockets.emplace_back(std::move(zmq_sock));
- }
-
- running.store(true);
- process_thread = thread(&Sender::process, this);
-}
-
-void Sender::push_frame(frame_t&& frame)
-{
- frames.push(std::move(frame));
-}
-
-void Sender::print_configuration()
-{
- if (edi_conf.enabled()) {
- edi_conf.print();
- }
- else {
- etiLog.level(info) << "EDI disabled";
- }
-}
-
-void Sender::send_eti_frame(frame_t& frame)
-{
- uint8_t *p = frame.data.data();
-
- edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr("DETI");
- map<int, edi::TagESTn> edi_subchannelToTag;
- // The above Tag Items will be assembled into a TAG Packet
- edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
-
- // SYNC
- edi_tagDETI.stat = p[0];
-
- // LIDATA FCT
- edi_tagDETI.dlfc = frame.metadata.dlfc;
-
- const int fct = p[4];
- if (frame.metadata.dlfc % 250 != fct) {
- etiLog.level(warn) << "Frame FCT=" << fct <<
- " does not correspond to DLFC=" << frame.metadata.dlfc;
- }
-
- bool ficf = (p[5] & 0x80) >> 7;
- edi_tagDETI.ficf = ficf;
-
- const int nst = p[5] & 0x7F;
-
- edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
- const int mid = (p[6] & 0x18) >> 3;
- edi_tagDETI.mid = mid;
- //const int fl = (p[6] & 0x07) * 256 + p[7];
-
- int ficl = 0;
- if (ficf == 0) {
- etiLog.level(warn) << "Not FIC in data stream!";
- return;
- }
- else if (mid == 3) {
- ficl = 32;
- }
- else {
- ficl = 24;
- }
-
- vector<uint32_t> sad(nst);
- vector<uint32_t> stl(nst);
- // Loop over STC subchannels:
- for (int i=0; i < nst; i++) {
- // EDI stream index is 1-indexed
- const int edi_stream_id = i + 1;
-
- uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
- sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
- uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
- stl[i] = (p[10+4*i] & 0x03) * 256 + \
- p[11+4*i];
-
- edi::TagESTn tag_ESTn;
- tag_ESTn.id = edi_stream_id;
- tag_ESTn.scid = scid;
- tag_ESTn.sad = sad[i];
- tag_ESTn.tpl = tpl;
- tag_ESTn.rfa = 0; // two bits
- tag_ESTn.mst_length = stl[i];
- tag_ESTn.mst_data = nullptr;
-
- edi_subchannelToTag[i] = tag_ESTn;
- }
-
- uint16_t mnsc = 0;
- std::memcpy(&mnsc, p + 8 + 4*nst, sizeof(uint16_t));
- edi_tagDETI.mnsc = mnsc;
-
- /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
- p[8 + 4*nst + 3]; */
-
- edi_tagDETI.fic_data = p + 12 + 4*nst;
- edi_tagDETI.fic_length = ficl * 4;
-
- // loop over MSC subchannels
- int offset = 0;
- for (int i=0; i < nst; i++) {
- edi::TagESTn& tag = edi_subchannelToTag[i];
- tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
-
- offset += stl[i] * 8;
- }
-
- /*
- const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
- p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
-
- // TIST
- const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
- uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
- (uint32_t)(p[tist_ix+1]) << 16 |
- (uint32_t)(p[tist_ix+2]) << 8 |
- (uint32_t)(p[tist_ix+3]);
-
- std::time_t posix_timestamp_1_jan_2000 = 946684800;
-
- // Wait until our time is tist_delay after the TIST before
- // we release that frame
-
- using namespace std::chrono;
-
- const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
- const auto t_frame = system_clock::from_time_t(
- frame.metadata.edi_time + posix_timestamp_1_jan_2000 - frame.metadata.utc_offset) + pps_offset;
-
- const auto t_release = t_frame + milliseconds(tist_delay_ms);
- const auto t_now = system_clock::now();
-
- const bool late = t_release < t_now;
-
- buffering_stat_t stat;
- stat.late = late;
-
- if (not late) {
- const auto wait_time = t_release - t_now;
- std::this_thread::sleep_for(wait_time);
- }
-
- stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - frame.received_at).count();
- buffering_stats.push_back(std::move(stat));
-
- if (late and drop_late) {
- return;
- }
-
- edi_tagDETI.tsta = tist;
- edi_tagDETI.atstf = 1;
- edi_tagDETI.utco = frame.metadata.utc_offset;
- edi_tagDETI.seconds = frame.metadata.edi_time;
-
- if (edi_sender and edi_conf.enabled()) {
- // put tags *ptr, DETI and all subchannels into one TagPacket
- edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
- edi_tagpacket.tag_items.push_back(&edi_tagDETI);
-
- for (auto& tag : edi_subchannelToTag) {
- edi_tagpacket.tag_items.push_back(&tag.second);
- }
-
- edi_sender->write(edi_tagpacket);
- }
-
- if (not frame.original_zmq_message.empty()) {
- for (auto& sock : zmq_sockets) {
- const auto send_result = sock.send(frame.original_zmq_message, zmq::send_flags::dontwait);
- if (not send_result.has_value()) {
- num_zmq_send_errors++;
- }
- }
- }
-}
-
-void Sender::process()
-{
- while (running.load()) {
- frame_t frame;
- frames.wait_and_pop(frame);
-
- if (not running.load() or frame.data.empty()) {
- break;
- }
-
- if (frame.data.size() == 6144) {
- send_eti_frame(frame);
- }
- else {
- etiLog.level(warn) << "Ignoring short ETI frame, "
- "DFLC=" << frame.metadata.dlfc << ", len=" <<
- frame.data.size();
- }
-
- if (buffering_stats.size() == 250) { // every six seconds
- const double n = buffering_stats.size();
-
- size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(),
- [](const buffering_stat_t& s){ return s.late; });
-
- double sum = 0.0;
- double min = std::numeric_limits<double>::max();
- double max = -std::numeric_limits<double>::max();
- for (const auto& s : buffering_stats) {
- // convert to milliseconds
- const double t = s.buffering_time_us / 1000.0;
- sum += t;
-
- if (t < min) {
- min = t;
- }
-
- if (t > max) {
- max = t;
- }
- }
- double mean = sum / n;
-
- double sq_sum = 0;
- for (const auto& s : buffering_stats) {
- const double t = s.buffering_time_us / 1000.0;
- sq_sum += (t-mean) * (t-mean);
- }
- double stdev = sqrt(sq_sum / n);
-
- /* Debug code
- stringstream ss;
- ss << "times:";
- for (const auto t : buffering_stats) {
- ss << " " << lrint(t.buffering_time_us / 1000.0);
- }
- etiLog.level(debug) << ss.str();
- // */
-
- etiLog.level(info) << "Buffering time statistics [milliseconds]:"
- " min: " << min <<
- " max: " << max <<
- " mean: " << mean <<
- " stdev: " << stdev <<
- " late: " <<
- num_late << " of " << buffering_stats.size() << " (" <<
- num_late * 100.0 / n << "%) " <<
- "Num ZMQ send errors: " << num_zmq_send_errors;
-
- buffering_stats.clear();
- }
- }
-}
diff --git a/src/zmq2edi/Sender.h b/src/zmq2edi/Sender.h
deleted file mode 100644
index 6dfd615..0000000
--- a/src/zmq2edi/Sender.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2024
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#pragma once
-#include <iostream>
-#include <iterator>
-#include <thread>
-#include <vector>
-#include <chrono>
-#include <atomic>
-#include "ThreadsafeQueue.h"
-#include "dabOutput/dabOutput.h"
-#include "edioutput/TagItems.h"
-#include "edioutput/TagPacket.h"
-#include "edioutput/Transport.h"
-
-// This metadata gets transmitted in the zmq stream
-struct metadata_t {
- uint32_t edi_time = 0;
- int16_t utc_offset = 0;
- uint16_t dlfc = 0;
-};
-
-struct frame_t {
- // Since a zmq message actually contains 4 frames, the
- // original_zmq_msg is only non-empty for the first of the
- // four calls to Sender::send_edi_frame().
- zmq::message_t original_zmq_message;
- std::vector<uint8_t> data;
- metadata_t metadata;
- std::chrono::steady_clock::time_point received_at;
-};
-
-struct zmq_send_config_t {
- std::vector<std::string> urls;
-};
-
-class Sender {
- public:
- Sender();
- Sender(const Sender& other) = delete;
- Sender& operator=(const Sender& other) = delete;
- ~Sender();
- void start(const edi::configuration_t& conf,
- const zmq_send_config_t& zmq_conf,
- int delay_ms, bool drop_late_packets);
- void push_frame(frame_t&& frame);
- void print_configuration(void);
-
- private:
- void send_eti_frame(frame_t& frame);
- void process(void);
-
- int tist_delay_ms;
- bool drop_late;
- std::atomic<bool> running;
- std::thread process_thread;
- edi::configuration_t edi_conf;
- ThreadsafeQueue<frame_t> frames;
-
- std::shared_ptr<edi::Sender> edi_sender;
-
- zmq::context_t zmq_ctx;
- std::vector<zmq::socket_t> zmq_sockets;
-
- struct buffering_stat_t {
- // Time between when we received the packets and when we transmit packets, in microseconds
- double buffering_time_us = 0.0;
- bool late = false;
- };
- std::vector<buffering_stat_t> buffering_stats;
- size_t num_zmq_send_errors = 0;
-
-};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 41d92b5..63c3228 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2024
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -27,64 +27,50 @@
#include "Log.h"
#include "zmq.hpp"
+#include <math.h>
#include <getopt.h>
-#include <cmath>
-#include <cstring>
-#include <chrono>
+#include <string.h>
#include <iostream>
#include <iterator>
-#include <thread>
#include <vector>
-#include "Sender.h"
+#include "EDISender.h"
#include "dabOutput/dabOutput.h"
constexpr size_t MAX_ERROR_COUNT = 10;
constexpr long ZMQ_TIMEOUT_MS = 1000;
-constexpr long DEFAULT_BACKOFF = 5000;
-static edi::configuration_t edi_conf;
+static edi_configuration_t edi_conf;
-static Sender edisender;
+static EDISender edisender;
-static void usage()
+void usage(void)
{
using namespace std;
cerr << "Usage:" << endl;
cerr << "odr-zmq2edi [options] <source>" << endl << endl;
- cerr << "ODR-ZMQ2EDI can output to both EDI and ZMQ. It buffers and releases frames according to their timestamp." << endl;
-
cerr << "Options:" << endl;
cerr << "The following options can be given only once:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
- cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
- cerr << " Negative delay values are also allowed." << endl;
- cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
- cerr << " This is useful for checking that NTP is properly synchronised" << endl;
- cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
- cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
-
- cerr << " ZMQ Output options:" << endl;
- cerr << " -Z <url> Add a zmq output to URL, e.g. tcp:*:9876 to listen for connections on port 9876 " << endl << endl;
-
- cerr << " EDI Output options:" << endl;
- cerr << " -v Enables verbose mode." << endl;
- cerr << " -P Disable PFT and send AFPackets." << endl;
- cerr << " -f <fec> Set the FEC." << endl;
- cerr << " -i <spread> Configure the UDP packet spread/interleaver with given percentage: 0% send all fragments at once, 100% spread over 24ms, >100% spread and interleave. Default 95%\n";
- cerr << " -D Dump the EDI to edi.debug file." << endl;
- cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl;
-
- cerr << "The following options can be given several times, when more than EDI/UDP destination is desired:" << endl;
- cerr << " -d <destination ip> Set the destination ip." << endl;
- cerr << " -p <destination port> Set the destination port." << endl;
- cerr << " -s <source port> Set the source port." << endl;
- cerr << " -S <source ip> Select the source IP in case we want to use multicast." << endl;
- cerr << " -t <ttl> Set the packet's TTL." << endl << endl;
-
- cerr << "The input socket will be reset if no data is received for " <<
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
+ cerr << " -W <max_delay> Drop ETI frames if TIST is <max_delay> later than current system time." << endl;
+ cerr << " -p <destination port> sets the destination port." << endl;
+ cerr << " -P Disable PFT and send AFPackets." << endl;
+ cerr << " -f <fec> sets the FEC." << endl;
+ cerr << " -i <interleave> enables the interleaved with this latency." << endl;
+ cerr << " -D dumps the EDI to edi.debug file." << endl;
+ cerr << " -v Enables verbose mode." << endl;
+ cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl;
+
+ cerr << "The following options can be given several times, when more than once destination is addressed:" << endl;
+ cerr << " -d <destination ip> sets the destination ip." << endl;
+ cerr << " -s <source port> sets the source port." << endl;
+ cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
+ cerr << " -t <ttl> set the packet's TTL." << endl << endl;
+
+ cerr << "odr-zmq2edi will quit if it does not receive data for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -169,8 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b
/* There is some state inside the parsing of destination arguments,
* because several destinations can be given. */
-static std::shared_ptr<edi::udp_destination_t> edi_destination;
-static bool dest_port_set = false;
+static edi_destination_t edi_destination;
static bool source_port_set = false;
static bool source_addr_set = false;
static bool ttl_set = false;
@@ -183,10 +168,10 @@ static void add_edi_destination(void)
std::to_string(edi_conf.destinations.size() + 1));
}
- edi_conf.destinations.push_back(std::move(edi_destination));
- edi_destination = std::make_shared<edi::udp_destination_t>();
+ edi_conf.destinations.push_back(edi_destination);
+ edi_destination_t newdest;
+ edi_destination = newdest;
- dest_port_set = false;
source_port_set = false;
source_addr_set = false;
ttl_set = false;
@@ -195,44 +180,33 @@ static void add_edi_destination(void)
static void parse_destination_args(char option)
{
- if (not edi_destination) {
- edi_destination = std::make_shared<edi::udp_destination_t>();
- }
-
switch (option) {
- case 'p':
- if (dest_port_set) {
- add_edi_destination();
- }
- edi_destination->dest_port = std::stoi(optarg);
- dest_port_set = true;
- break;
case 's':
if (source_port_set) {
add_edi_destination();
}
- edi_destination->source_port = std::stoi(optarg);
+ edi_destination.source_port = std::stoi(optarg);
source_port_set = true;
break;
case 'S':
if (source_addr_set) {
add_edi_destination();
}
- edi_destination->source_addr = optarg;
+ edi_destination.source_addr = optarg;
source_addr_set = true;
break;
case 't':
if (ttl_set) {
add_edi_destination();
}
- edi_destination->ttl = std::stoi(optarg);
+ edi_destination.ttl = std::stoi(optarg);
ttl_set = true;
break;
case 'd':
if (dest_addr_set) {
add_edi_destination();
}
- edi_destination->dest_addr = optarg;
+ edi_destination.dest_addr = optarg;
dest_addr_set = true;
break;
default:
@@ -240,8 +214,6 @@ static void parse_destination_args(char option)
}
}
-class FCTDiscontinuity { };
-
int start(int argc, char **argv)
{
edi_conf.enable_pft = true;
@@ -252,28 +224,23 @@ int start(int argc, char **argv)
}
int delay_ms = 500;
- bool drop_late_packets = false;
- uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
- std::string startupcheck;
-
- zmq_send_config_t zmq_conf;
+ int max_delay_ms = 0; // no max delay
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xhZ:");
+ ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:W:");
switch (ch) {
case -1:
break;
- case 'C':
- startupcheck = optarg;
- break;
case 'd':
case 's':
case 'S':
case 't':
- case 'p':
parse_destination_args(ch);
break;
+ case 'p':
+ edi_conf.dest_port = std::stoi(optarg);
+ break;
case 'P':
edi_conf.enable_pft = false;
break;
@@ -282,14 +249,18 @@ int start(int argc, char **argv)
break;
case 'i':
{
- int spread_percent = std::stoi(optarg);
- if (spread_percent < 0) {
- throw std::runtime_error("EDI output: negative spread value is invalid.");
- }
+ double interleave_ms = std::stod(optarg);
+ if (interleave_ms != 0.0) {
+ if (interleave_ms < 0) {
+ throw std::runtime_error("EDI output: negative interleave value is invalid.");
+ }
- edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0;
- if (edi_conf.fragment_spreading_factor > 30000) {
- throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ auto latency_rounded = lround(interleave_ms / 24.0);
+ if (latency_rounded * 24 > 30000) {
+ throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ }
+
+ edi_conf.latency_frames = latency_rounded;
}
}
break;
@@ -302,17 +273,11 @@ int start(int argc, char **argv)
case 'a':
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
- case 'b':
- backoff_after_reset_ms = std::stoi(optarg);
- break;
case 'w':
delay_ms = std::stoi(optarg);
break;
- case 'x':
- drop_late_packets = true;
- break;
- case 'Z':
- zmq_conf.urls.push_back(optarg);
+ case 'W':
+ max_delay_ms = std::stoi(optarg);
break;
case 'h':
default:
@@ -321,184 +286,119 @@ int start(int argc, char **argv)
}
}
- if (dest_addr_set) {
- add_edi_destination();
- }
+ add_edi_destination();
if (optind >= argc) {
etiLog.level(error) << "source option is missing";
return 1;
}
- if (edi_conf.destinations.empty() and zmq_conf.urls.empty()) {
- etiLog.level(error) << "No destinations set";
+ if (edi_conf.dest_port == 0) {
+ etiLog.level(error) << "No EDI destination port defined";
return 1;
}
- if (not edi_conf.destinations.empty()) {
- edisender.print_configuration();
+ if (edi_conf.destinations.empty()) {
+ etiLog.level(error) << "No EDI destinations set";
+ return 1;
}
- if (not zmq_conf.urls.empty()) {
- etiLog.level(info) << "Setting up ZMQ to:";
- for (const auto& url : zmq_conf.urls) {
- etiLog.level(info) << " " << url;
- }
+ if (max_delay_ms > 0) {
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms and max delay " << max_delay_ms << " ms";
}
-
-
- if (not startupcheck.empty()) {
- etiLog.level(info) << "Running startup check '" << startupcheck << "'";
- int wstatus = system(startupcheck.c_str());
-
- if (WIFEXITED(wstatus)) {
- if (WEXITSTATUS(wstatus) == 0) {
- etiLog.level(info) << "Startup check ok";
- }
- else {
- etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus);
- return 1;
- }
- }
- else {
- etiLog.level(error) << "Startup check failed, child didn't terminate normally";
- return 1;
- }
+ else {
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms";
}
-
- etiLog.level(info) << "Setting up Sender with delay " << delay_ms << " ms. " <<
- (drop_late_packets ? "Will" : "Will not") << " drop late packets";
- edisender.start(edi_conf, zmq_conf, delay_ms, drop_late_packets);
+ edisender.start(edi_conf, delay_ms, max_delay_ms);
+ edisender.print_configuration();
const char* source_url = argv[optind];
- zmq::context_t zmq_ctx(1);
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
- while (true) {
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- size_t error_count = 0;
- int previous_fct = -1;
-
- try {
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- const auto recv_result = zmq_sock.recv(incoming, zmq::recv_flags::none);
- if (not recv_result.has_value()) {
- continue;
- }
-
- const auto received_at = std::chrono::steady_clock::now();
- // Casting incoming.data() to zmq_dab_message_t* is not allowed, because
- // it might be misaligned
- zmq_dab_message_t dab_msg;
- memcpy(&dab_msg, incoming.data(), ZMQ_DAB_MESSAGE_HEAD_LENGTH);
+ size_t frame_count = 0;
+ size_t error_count = 0;
- if (dab_msg.version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg.version;
- error_count++;
- }
-
- int offset = sizeof(dab_msg.version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen);
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- std::vector<frame_t> all_frames;
- all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
+ zmq::context_t zmq_ctx(1);
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
+ error_count++;
+ }
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg.buflen[i] <= 0 or dab_msg.buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg.buflen[i];
- error_count++;
- }
- else {
- frame_t frame;
- frame.data.resize(6144, 0x55);
- frame.received_at = received_at;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- const int framesize = dab_msg.buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ error_count++;
+ }
- memcpy(frame.data.data(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- const int fct = frame.data[4];
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
- const int expected_fct = (previous_fct + 1) % 250;
- if (previous_fct != -1 and expected_fct != fct) {
- etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct;
- throw FCTDiscontinuity();
- }
- previous_fct = fct;
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
- all_frames.push_back(std::move(frame));
+ const int framesize = dab_msg->buflen[i];
- offset += framesize;
- }
- }
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
- f.metadata = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ offset += framesize;
+ }
+ }
- offset += consumed_bytes;
- }
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- if (not all_frames.empty()) {
- all_frames[0].original_zmq_message = std::move(incoming);
- }
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- for (auto &f : all_frames) {
- edisender.push_frame(std::move(f));
- }
- }
+ offset += consumed_bytes;
}
- etiLog.level(info) << "Backoff " << backoff_after_reset_ms <<
- "ms due to ZMQ input (" << source_url << ") timeout";
- }
- catch (const FCTDiscontinuity&) {
- etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity";
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ frame_count++;
+ }
}
-
- zmq_sock.close();
- std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms));
}
+ etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+
return 0;
}
int main(int argc, char **argv)
{
- // Version handling is done very early to ensure nothing else but the version gets printed out
- if (argc == 2 and strcmp(argv[1], "--version") == 0) {
- fprintf(stdout, "%s\n",
-#if defined(GITVERSION)
- GITVERSION
-#else
- PACKAGE_VERSION
-#endif
- );
- return 0;
- }
-
etiLog.level(info) << "ZMQ2EDI converter from " <<
PACKAGE_NAME << " " <<
#if defined(GITVERSION)
@@ -508,20 +408,12 @@ int main(int argc, char **argv)
#endif
" starting up";
- int ret = 1;
-
try {
- ret = start(argc, argv);
+ return start(argc, argv);
}
- catch (const std::runtime_error &e) {
- etiLog.level(error) << "Runtime error: " << e.what();
+ catch (std::runtime_error &e) {
+ etiLog.level(error) << "Error: " << e.what();
}
- catch (const std::logic_error &e) {
- etiLog.level(error) << "Logic error! " << e.what();
- }
-
- // To make sure things get printed to stderr
- std::this_thread::sleep_for(std::chrono::milliseconds(300));
- return ret;
+ return 1;
}