aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2025-09-10 09:29:08 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2025-09-10 09:29:08 +0200
commitd30cf3e7c31f9299062b9ef85a0949e60611509f (patch)
tree5240c7127791c2226212722c02d95ea21652a943
parent91531ce72ea4d90ea541f072da54bc46ef555db8 (diff)
parentb59516179a3e1842de95f8aef549a75dac1102c6 (diff)
downloaddabmux-d30cf3e7c31f9299062b9ef85a0949e60611509f.tar.gz
dabmux-d30cf3e7c31f9299062b9ef85a0949e60611509f.tar.bz2
dabmux-d30cf3e7c31f9299062b9ef85a0949e60611509f.zip
Merge commit 'b595161' into HEAD
-rw-r--r--doc/advanced.mux4
-rw-r--r--doc/example.mux6
-rw-r--r--lib/edioutput/EDIConfig.h3
-rw-r--r--lib/edioutput/Transport.cpp32
-rw-r--r--lib/edioutput/Transport.h7
-rw-r--r--lib/fec/decode_rs.h12
-rw-r--r--src/DabMultiplexer.cpp2
-rw-r--r--src/DabMux.cpp13
-rw-r--r--src/input/Edi.cpp7
-rw-r--r--src/zmq2edi/EDISender.cpp390
-rw-r--r--src/zmq2edi/EDISender.h91
-rw-r--r--src/zmq2edi/zmq2edi.cpp419
12 files changed, 58 insertions, 928 deletions
diff --git a/doc/advanced.mux b/doc/advanced.mux
index 0fc1b53..d2cc0fd 100644
--- a/doc/advanced.mux
+++ b/doc/advanced.mux
@@ -36,8 +36,8 @@ general {
; number in seconds. Granularity: 24ms
; tist_offset 0.480
- ; Specify the TIST value for the frame with FCT==0, in microseconds
- ; tist_at_fct0 768000
+ ; Specify the TIST value for the frame with FCT==0, in milliseconds
+ ; tist_at_fct0 768
; The management server is a simple TCP server that can present
; statistics data (buffers, overruns, underruns, etc)
diff --git a/doc/example.mux b/doc/example.mux
index 34cd2ee..ae12fb2 100644
--- a/doc/example.mux
+++ b/doc/example.mux
@@ -211,11 +211,13 @@ subchannels {
inputuri "tcp://0.0.0.0:9001"
; For UDP, PFT should be enabled at the sender.
- ; Unicast UDP input:
+ ; Unicast UDP input, bound to all interfaces:
;inputuri "udp://:9001"
+ ; Unicast UDP input, bound to interface with IP 192.168.0.10:
+ ;inputuri "udp://192.168.0.10:9001"
; Multicast UDP input:
;inputuri "udp://@239.10.0.1:9001"
- ; Multicast UDP input with local interface (192.168.0.10) specification
+ ; Multicast UDP input with local interface 192.168.0.10 specification
;inputuri "udp://192.168.0.10@239.10.0.1:9001"
; Two buffer-management types are available: prebuffering and timestamped.
diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h
index 7016e87..de4217f 100644
--- a/lib/edioutput/EDIConfig.h
+++ b/lib/edioutput/EDIConfig.h
@@ -27,6 +27,7 @@
#pragma once
+#include <optional>
#include <vector>
#include <string>
#include <memory>
@@ -60,7 +61,7 @@ struct udp_destination_t : public destination_t {
uint16_t dest_port = 0;
std::string source_addr;
uint16_t source_port = 0;
- uint8_t ttl = 10;
+ std::optional<uint8_t> ttl = std::nullopt;
};
// TCP server that can accept multiple connections
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index e9559b5..3898213 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -41,10 +41,15 @@ void configuration_t::print() const
if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port;
if (not udp_dest->source_addr.empty()) {
- etiLog.level(info) << " source " << udp_dest->source_addr;
- etiLog.level(info) << " ttl " << udp_dest->ttl;
+ etiLog.level(info) << " source address=" << udp_dest->source_addr;
}
- etiLog.level(info) << " source port " << udp_dest->source_port;
+ if (udp_dest->ttl) {
+ etiLog.level(info) << " ttl=" << (int)(*udp_dest->ttl);
+ }
+ else {
+ etiLog.level(info) << " ttl=(default)";
+ }
+ etiLog.level(info) << " source port=" << udp_dest->source_port;
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
@@ -80,7 +85,10 @@ Sender::Sender(const configuration_t& conf) :
if (not udp_dest->source_addr.empty()) {
udp_socket.setMulticastSource(udp_dest->source_addr.c_str());
- udp_socket.setMulticastTTL(udp_dest->ttl);
+ }
+
+ if (udp_dest->ttl) {
+ udp_socket.setMulticastTTL(*udp_dest->ttl);
}
auto sender = make_shared<udp_sender_t>(
@@ -99,7 +107,7 @@ Sender::Sender(const configuration_t& conf) :
make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
- auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port);
+ auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port, m_conf.verbose);
m_pft_spreaders.emplace_back(
make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
@@ -199,7 +207,13 @@ void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame)
void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame)
{
- sock.sendall(frame);
+ const auto error_stats = sock.sendall(frame);
+
+ if (verbose and error_stats.has_seen_new_errors) {
+ etiLog.level(warn) << "TCP output " << dest_addr << ":" << dest_port
+ << " has " << error_stats.num_reconnects
+ << " reconnects: most recent error: " << error_stats.last_error;
+ }
}
Sender::udp_sender_t::udp_sender_t(std::string dest_addr,
@@ -221,7 +235,11 @@ Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,
}
Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr,
- uint16_t dest_port) :
+ uint16_t dest_port,
+ bool verbose) :
+ dest_addr(dest_addr),
+ dest_port(dest_port),
+ verbose(verbose),
sock(dest_addr, dest_port)
{
}
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index b8a9008..96784c0 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -118,8 +118,13 @@ class Sender {
struct tcp_send_client_t : public i_sender {
tcp_send_client_t(
const std::string& dest_addr,
- uint16_t dest_port);
+ uint16_t dest_port,
+ bool verbose);
+ std::string dest_addr;
+ uint16_t dest_port;
+ bool verbose;
+ size_t m_num_reconnects_prev = 0;
Socket::TCPSendClient sock;
virtual void send_packet(const std::vector<uint8_t> &frame) override;
};
diff --git a/lib/fec/decode_rs.h b/lib/fec/decode_rs.h
index c165cf3..647b885 100644
--- a/lib/fec/decode_rs.h
+++ b/lib/fec/decode_rs.h
@@ -145,15 +145,15 @@
count++;
}
if (count != no_eras) {
- printf("count = %d no_eras = %d\n lambda(x) is WRONG\n",count,no_eras);
+ fprintf(stderr, "count = %d no_eras = %d\n lambda(x) is WRONG\n",count,no_eras);
count = -1;
goto finish;
}
#if DEBUG >= 2
- printf("\n Erasure positions as determined by roots of Eras Loc Poly:\n");
+ fprintf(stderr, "\n Erasure positions as determined by roots of Eras Loc Poly:\n");
for (i = 0; i < count; i++)
- printf("%d ", loc[i]);
- printf("\n");
+ fprintf(stderr, "%d ", loc[i]);
+ fprintf(stderr, "\n");
#endif
#endif
}
@@ -227,7 +227,7 @@
continue; /* Not a root */
/* store root (index-form) and error location number */
#if DEBUG>=2
- printf("count %d root %d loc %d\n",count,i,k);
+ fprintf(stderr, "count %d root %d loc %d\n",count,i,k);
#endif
root[count] = i;
loc[count] = k;
@@ -279,7 +279,7 @@
}
#if DEBUG >= 1
if (den == 0) {
- printf("\n ERROR: denominator = 0\n");
+ fprintf(stderr, "\n ERROR: denominator = 0\n");
count = -1;
goto finish;
}
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 52f053a..bea82c2 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -77,8 +77,6 @@ uint64_t MuxTime::init(uint32_t tist_at_fct0_ms, double tist_offset)
const auto counter_offset = tist_at_fct0_ms / 24;
const auto offset_as_count = m_pps_offset_ms / 24;
- etiLog.level(debug) << "Init " << counter_offset << " " << offset_as_count;
-
return (250 - counter_offset + offset_as_count) % 250;
}
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index bf525c1..4b9352f 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -352,10 +352,9 @@ int main(int argc, char *argv[])
pft_settings.enable_pft = pt.get<bool>("enable_pft", default_enable_pft);
pft_settings.fec = pt.get<unsigned int>("fec", default_fec);
pft_settings.fragment_spreading_factor = default_spreading_factor;
- auto override_spread_percent = pt.get_optional<int>("packet_spread");
- if (override_spread_percent) {
+ if (auto override_spread_percent = pt.get_optional<int>("packet_spread"))
pft_settings.fragment_spreading_factor = check_spreading_factor(*override_spread_percent);
- }
+
pft_settings.verbose = pt.get<bool>("verbose", edi_conf.verbose);
};
@@ -364,12 +363,12 @@ int main(int argc, char *argv[])
if (proto == "udp") {
auto dest = make_shared<edi::udp_destination_t>();
dest->dest_addr = pt_edi_dest.second.get<string>("destination");
- dest->ttl = pt_edi_dest.second.get<unsigned int>("ttl", 1);
+ if (auto ttl = pt_edi_dest.second.get_optional<unsigned int>("ttl"))
+ dest->ttl = *ttl;
dest->source_addr = pt_edi_dest.second.get<string>("source", "");
- dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport");
-
- dest->dest_port = pt_edi_dest.second.get<unsigned int>("port", 0);
+ dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport", 0);
+ dest->dest_port = pt_edi_dest.second.get<unsigned int>("port", 0);
if (dest->dest_port == 0) {
// Compatiblity: we have removed the transport and addressing in the
// PFT layer, which removed the requirement that all outputs must share
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 141641f..b100f32 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -80,6 +80,7 @@ Edi::~Edi() {
void Edi::open(const std::string& name)
{
const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_udp_bindto("udp://([^:]+):([0-9]+)");
const std::regex re_udp_multicast("udp://@([0-9.]+):([0-9]+)");
const std::regex re_udp_multicast_bindto("udp://([0-9.])+@([0-9.]+):([0-9]+)");
const std::regex re_tcp("tcp://(.*):([0-9]+)");
@@ -98,6 +99,12 @@ void Edi::open(const std::string& name)
m_udp_sock.reinit(udp_port);
m_udp_sock.setBlocking(false);
}
+ else if (std::regex_match(name, m, re_udp_bindto)) {
+ const int udp_port = std::stoi(m[2].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port, m[1].str());
+ m_udp_sock.setBlocking(false);
+ }
else if (std::regex_match(name, m, re_udp_multicast_bindto)) {
const string bind_to = m[1].str();
const string multicast_address = m[2].str();
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
deleted file mode 100644
index 06b7420..0000000
--- a/src/zmq2edi/EDISender.cpp
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "EDISender.h"
-#include "Log.h"
-#include <cmath>
-#include <numeric>
-#include <map>
-#include <algorithm>
-
-using namespace std;
-
-EDISender::~EDISender()
-{
- if (running.load()) {
- running.store(false);
-
- // Unblock thread
- frame_t emptyframe;
- frames.push(emptyframe);
-
- process_thread.join();
- }
-}
-
-void EDISender::start(const edi_configuration_t& conf,
- int delay_ms, int max_delay_ms)
-{
- edi_conf = conf;
- tist_delay_ms = delay_ms;
- tist_max_delay_ms = max_delay_ms;
-
- if (edi_conf.verbose) {
- etiLog.log(info, "Setup EDI");
- }
-
- if (edi_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (edi_conf.enabled()) {
- for (auto& edi_destination : edi_conf.destinations) {
- auto edi_output = make_shared<UdpSocket>(edi_destination.source_port);
-
- if (not edi_destination.source_addr.empty()) {
- int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = edi_output->setMulticastTTL(edi_destination.ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
- }
-
- edi_destination.socket = edi_output;
- }
- }
-
- if (edi_conf.verbose) {
- etiLog.log(info, "EDI set up");
- }
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT pft(edi_conf);
- edi_pft = pft;
-
- if (edi_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(edi_conf.latency_frames);
- }
-
- startTime = std::chrono::steady_clock::now();
- running.store(true);
- process_thread = thread(&EDISender::process, this);
-}
-
-void EDISender::push_frame(const frame_t& frame)
-{
- frames.push(frame);
-}
-
-void EDISender::print_configuration()
-{
- if (edi_conf.enabled()) {
- etiLog.level(info) << "EDI";
- etiLog.level(info) << " verbose " << edi_conf.verbose;
- for (auto& edi_dest : edi_conf.destinations) {
- etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
- if (not edi_dest.source_addr.empty()) {
- etiLog.level(info) << " source " << edi_dest.source_addr;
- etiLog.level(info) << " ttl " << edi_dest.ttl;
- }
- etiLog.level(info) << " source port " << edi_dest.source_port;
- }
- if (edi_conf.interleaver_enabled()) {
- etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
- }
- }
- else {
- etiLog.level(info) << "EDI disabled";
- }
-}
-
-void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
-{
- edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
- map<int, edi::TagESTn> edi_subchannelToTag;
- // The above Tag Items will be assembled into a TAG Packet
- edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
-
- // SYNC
- edi_tagDETI.stat = p[0];
-
- // LIDATA FCT
- edi_tagDETI.dlfc = metadata.dlfc;
-
- const int fct = p[4];
- if (metadata.dlfc % 250 != fct) {
- etiLog.level(warn) << "Frame FCT=" << fct <<
- " does not correspond to DLFC=" << metadata.dlfc;
- }
-
- bool ficf = (p[5] & 0x80) >> 7;
- edi_tagDETI.ficf = ficf;
-
- const int nst = p[5] & 0x7F;
-
- edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
- const int mid = (p[6] & 0x18) >> 3;
- edi_tagDETI.mid = mid;
- //const int fl = (p[6] & 0x07) * 256 + p[7];
-
- int ficl = 0;
- if (ficf == 0) {
- etiLog.level(warn) << "Not FIC in data stream!";
- return;
- }
- else if (mid == 3) {
- ficl = 32;
- }
- else {
- ficl = 24;
- }
-
- vector<uint32_t> sad(nst);
- vector<uint32_t> stl(nst);
- // Loop over STC subchannels:
- for (int i=0; i < nst; i++) {
- // EDI stream index is 1-indexed
- const int edi_stream_id = i + 1;
-
- uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
- sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
- uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
- stl[i] = (p[10+4*i] & 0x03) * 256 + \
- p[11+4*i];
-
- edi::TagESTn tag_ESTn;
- tag_ESTn.id = edi_stream_id;
- tag_ESTn.scid = scid;
- tag_ESTn.sad = sad[i];
- tag_ESTn.tpl = tpl;
- tag_ESTn.rfa = 0; // two bits
- tag_ESTn.mst_length = stl[i];
- tag_ESTn.mst_data = nullptr;
-
- edi_subchannelToTag[i] = tag_ESTn;
- }
-
- const uint16_t mnsc = p[8 + 4*nst] * 256 + \
- p[8 + 4*nst + 1];
- edi_tagDETI.mnsc = mnsc;
-
- /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
- p[8 + 4*nst + 3]; */
-
- edi_tagDETI.fic_data = p + 12 + 4*nst;
- edi_tagDETI.fic_length = ficl * 4;
-
- // loop over MSC subchannels
- int offset = 0;
- for (int i=0; i < nst; i++) {
- edi::TagESTn& tag = edi_subchannelToTag[i];
- tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
-
- offset += stl[i] * 8;
- }
-
- /*
- const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
- p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
-
- // TIST
- const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
- uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
- (uint32_t)(p[tist_ix+1]) << 16 |
- (uint32_t)(p[tist_ix+2]) << 8 |
- (uint32_t)(p[tist_ix+3]);
-
- std::time_t posix_timestamp_1_jan_2000 = 946684800;
-
- // Wait until our time is tist_delay after the TIST before
- // we release that frame
-
- using namespace std::chrono;
-
- const auto seconds = metadata.edi_time;
- const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
- const auto t_frame = system_clock::from_time_t(
- seconds + posix_timestamp_1_jan_2000) + pps_offset;
-
- const auto t_release = t_frame + milliseconds(tist_delay_ms);
- const auto t_now = system_clock::now();
-
- /*
- etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000;
- etiLog.level(debug) << "now " << system_clock::to_time_t(t_now);
- etiLog.level(debug) << "wait " << wait_time.count();
- */
-
- const auto wait_time = t_release - t_now;
- wait_times.push_back(duration_cast<microseconds>(wait_time).count());
-
- if (tist_max_delay_ms > 0) {
- const auto t_latest_release = t_frame + milliseconds(tist_max_delay_ms);
-
- if (t_now > t_latest_release) {
- // drop frame
- num_dropped.fetch_add(1);
- return;
- }
- }
-
- if (t_release > t_now) {
- std::this_thread::sleep_for(wait_time);
- }
-
- edi_tagDETI.tsta = tist;
- edi_tagDETI.atstf = 1;
- edi_tagDETI.utco = metadata.utc_offset;
- edi_tagDETI.seconds = metadata.edi_time;
-
- if (edi_conf.enabled()) {
- // put tags *ptr, DETI and all subchannels into one TagPacket
- edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
- edi_tagpacket.tag_items.push_back(&edi_tagDETI);
-
- for (auto& tag : edi_subchannelToTag) {
- edi_tagpacket.tag_items.push_back(&tag.second);
- }
-
- // Assemble into one AF Packet
- edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
-
- if (edi_conf.enable_pft) {
- // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
- edi_fragments.size());
- }
-
- if (edi_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- // Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_frag, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu\n",
- edi_fragments.size());
- }
- }
- else {
- // Send over ethernet
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_afpacket, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
- }
- }
- }
-}
-
-void EDISender::process()
-{
- while (running.load()) {
- frame_t frame;
- frames.wait_and_pop(frame);
-
- if (not running.load() or frame.first.empty()) {
- break;
- }
-
- if (frame.first.size() == 6144) {
- send_eti_frame(frame.first.data(), frame.second);
- }
- else {
- etiLog.level(warn) << "Ignoring short ETI frame, "
- "DFLC=" << frame.second.dlfc << ", len=" <<
- frame.first.size();
- }
-
- if (wait_times.size() == 250) { // every six seconds
- const double n = wait_times.size();
-
- double sum = accumulate(wait_times.begin(), wait_times.end(), 0);
- size_t num_late = std::count_if(wait_times.begin(), wait_times.end(),
- [](double v){ return v < 0; });
- double mean = sum / n;
-
- double sq_sum = 0;
- for (const auto t : wait_times) {
- sq_sum += (t-mean) * (t-mean);
- }
- double stdev = sqrt(sq_sum / n);
- auto min_max = minmax_element(wait_times.begin(), wait_times.end());
-
- /* Debug code
- stringstream ss;
- ss << "times:";
- for (const auto t : wait_times) {
- ss << " " << t;
- }
- etiLog.level(debug) << ss.str();
- */
-
- const size_t dropped = num_dropped.exchange(0);
-
- etiLog.level(info) << "Wait time statistics [microseconds]:"
- " min: " << *min_max.first <<
- " max: " << *min_max.second <<
- " mean: " << mean <<
- " stdev: " << stdev <<
- " late: " <<
- num_late << " of " << wait_times.size() << " (" <<
- num_late * 100.0 / n << "%)" <<
- " dropped: " << dropped;
-
- wait_times.clear();
- }
- }
-}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
deleted file mode 100644
index 44502c1..0000000
--- a/src/zmq2edi/EDISender.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#pragma once
-#include <iostream>
-#include <iterator>
-#include <thread>
-#include <vector>
-#include <chrono>
-#include <atomic>
-#include "ThreadsafeQueue.h"
-#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/PFT.h"
-#include "dabOutput/edi/Interleaver.h"
-
-// This metadata gets transmitted in the zmq stream
-struct metadata_t {
- uint32_t edi_time = 0;
- int16_t utc_offset = 0;
- uint16_t dlfc = 0;
-};
-
-using frame_t = std::pair<std::vector<uint8_t>, metadata_t>;
-
-class EDISender {
- public:
- EDISender() = default;
- EDISender(const EDISender& other) = delete;
- EDISender& operator=(const EDISender& other) = delete;
- ~EDISender();
- void start(const edi_configuration_t& conf, int delay_ms, int max_delay_ms);
- void push_frame(const frame_t& frame);
- void print_configuration(void);
-
- private:
- void send_eti_frame(uint8_t* p, metadata_t metadata);
- void process(void);
-
- int tist_delay_ms = 0;
- int tist_max_delay_ms = 0;
- std::atomic<bool> running = ATOMIC_VAR_INIT(false);
- std::thread process_thread;
- edi_configuration_t edi_conf;
- std::chrono::steady_clock::time_point startTime;
- ThreadsafeQueue<frame_t> frames;
- std::ofstream edi_debug_file;
-
- // The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
-
- // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
- edi::Interleaver edi_interleaver;
-
- // For statistics about wait time before we transmit packets,
- // in microseconds
- std::vector<double> wait_times;
-
- // Number of frames dropped because their TIST was larger than max_delay
- std::atomic<size_t> num_dropped = ATOMIC_VAR_INIT(0);
-
-};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
deleted file mode 100644
index 63c3228..0000000
--- a/src/zmq2edi/zmq2edi.cpp
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "Log.h"
-#include "zmq.hpp"
-#include <math.h>
-#include <getopt.h>
-#include <string.h>
-#include <iostream>
-#include <iterator>
-#include <vector>
-
-#include "EDISender.h"
-#include "dabOutput/dabOutput.h"
-
-constexpr size_t MAX_ERROR_COUNT = 10;
-constexpr long ZMQ_TIMEOUT_MS = 1000;
-
-static edi_configuration_t edi_conf;
-
-static EDISender edisender;
-
-void usage(void)
-{
- using namespace std;
-
- cerr << "Usage:" << endl;
- cerr << "odr-zmq2edi [options] <source>" << endl << endl;
-
- cerr << "Options:" << endl;
- cerr << "The following options can be given only once:" << endl;
- cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
- cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
- cerr << " -W <max_delay> Drop ETI frames if TIST is <max_delay> later than current system time." << endl;
- cerr << " -p <destination port> sets the destination port." << endl;
- cerr << " -P Disable PFT and send AFPackets." << endl;
- cerr << " -f <fec> sets the FEC." << endl;
- cerr << " -i <interleave> enables the interleaved with this latency." << endl;
- cerr << " -D dumps the EDI to edi.debug file." << endl;
- cerr << " -v Enables verbose mode." << endl;
- cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl;
-
- cerr << "The following options can be given several times, when more than once destination is addressed:" << endl;
- cerr << " -d <destination ip> sets the destination ip." << endl;
- cerr << " -s <source port> sets the source port." << endl;
- cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
- cerr << " -t <ttl> set the packet's TTL." << endl << endl;
-
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
- (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
- cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
-}
-
-static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes)
-{
- size_t remaining = size;
- if (remaining < 3) {
- etiLog.level(warn) << "Insufficient data to parse metadata";
- throw std::runtime_error("Insufficient data");
- }
-
- metadata_t md;
- bool utc_offset_received = false;
- bool edi_time_received = false;
- bool dlfc_received = false;
-
- while (remaining) {
- uint8_t id = buf[0];
- uint16_t len = (((uint16_t)buf[1]) << 8) + buf[2];
-
- if (id == static_cast<uint8_t>(output_metadata_id_e::separation_marker)) {
- if (len != 0) {
- etiLog.level(warn) << "Invalid length " << len << " for metadata: separation_marker";
- }
-
- if (not utc_offset_received or not edi_time_received or not dlfc_received) {
- throw std::runtime_error("Incomplete metadata received");
- }
-
- remaining -= 3;
- *consumed_bytes = size - remaining;
- return md;
- }
- else if (id == static_cast<uint8_t>(output_metadata_id_e::utc_offset)) {
- if (len != 2) {
- etiLog.level(warn) << "Invalid length " << len << " for metadata: utc_offset";
- }
- if (remaining < 2) {
- throw std::runtime_error("Insufficient data for utc_offset");
- }
- uint16_t utco;
- std::memcpy(&utco, buf + 3, sizeof(utco));
- md.utc_offset = ntohs(utco);
- utc_offset_received = true;
- remaining -= 5;
- buf += 5;
- }
- else if (id == static_cast<uint8_t>(output_metadata_id_e::edi_time)) {
- if (len != 4) {
- etiLog.level(warn) << "Invalid length " << len << " for metadata: edi_time";
- }
- if (remaining < 4) {
- throw std::runtime_error("Insufficient data for edi_time");
- }
- uint32_t edi_time;
- std::memcpy(&edi_time, buf + 3, sizeof(edi_time));
- md.edi_time = ntohl(edi_time);
- edi_time_received = true;
- remaining -= 7;
- buf += 7;
- }
- else if (id == static_cast<uint8_t>(output_metadata_id_e::dlfc)) {
- if (len != 2) {
- etiLog.level(warn) << "Invalid length " << len << " for metadata: dlfc";
- }
- if (remaining < 2) {
- throw std::runtime_error("Insufficient data for dlfc");
- }
- uint16_t dlfc;
- std::memcpy(&dlfc, buf + 3, sizeof(dlfc));
- md.dlfc = ntohs(dlfc);
- dlfc_received = true;
- remaining -= 5;
- buf += 5;
- }
- }
-
- throw std::runtime_error("Insufficient data");
-}
-
-/* There is some state inside the parsing of destination arguments,
- * because several destinations can be given. */
-
-static edi_destination_t edi_destination;
-static bool source_port_set = false;
-static bool source_addr_set = false;
-static bool ttl_set = false;
-static bool dest_addr_set = false;
-
-static void add_edi_destination(void)
-{
- if (not dest_addr_set) {
- throw std::runtime_error("Destination address not specified for destination number " +
- std::to_string(edi_conf.destinations.size() + 1));
- }
-
- edi_conf.destinations.push_back(edi_destination);
- edi_destination_t newdest;
- edi_destination = newdest;
-
- source_port_set = false;
- source_addr_set = false;
- ttl_set = false;
- dest_addr_set = false;
-}
-
-static void parse_destination_args(char option)
-{
- switch (option) {
- case 's':
- if (source_port_set) {
- add_edi_destination();
- }
- edi_destination.source_port = std::stoi(optarg);
- source_port_set = true;
- break;
- case 'S':
- if (source_addr_set) {
- add_edi_destination();
- }
- edi_destination.source_addr = optarg;
- source_addr_set = true;
- break;
- case 't':
- if (ttl_set) {
- add_edi_destination();
- }
- edi_destination.ttl = std::stoi(optarg);
- ttl_set = true;
- break;
- case 'd':
- if (dest_addr_set) {
- add_edi_destination();
- }
- edi_destination.dest_addr = optarg;
- dest_addr_set = true;
- break;
- default:
- throw std::logic_error("parse_destination_args invalid");
- }
-}
-
-int start(int argc, char **argv)
-{
- edi_conf.enable_pft = true;
-
- if (argc == 0) {
- usage();
- return 1;
- }
-
- int delay_ms = 500;
- int max_delay_ms = 0; // no max delay
-
- int ch = 0;
- while (ch != -1) {
- ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:W:");
- switch (ch) {
- case -1:
- break;
- case 'd':
- case 's':
- case 'S':
- case 't':
- parse_destination_args(ch);
- break;
- case 'p':
- edi_conf.dest_port = std::stoi(optarg);
- break;
- case 'P':
- edi_conf.enable_pft = false;
- break;
- case 'f':
- edi_conf.fec = std::stoi(optarg);
- break;
- case 'i':
- {
- double interleave_ms = std::stod(optarg);
- if (interleave_ms != 0.0) {
- if (interleave_ms < 0) {
- throw std::runtime_error("EDI output: negative interleave value is invalid.");
- }
-
- auto latency_rounded = lround(interleave_ms / 24.0);
- if (latency_rounded * 24 > 30000) {
- throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
- }
-
- edi_conf.latency_frames = latency_rounded;
- }
- }
- break;
- case 'D':
- edi_conf.dump = true;
- break;
- case 'v':
- edi_conf.verbose = true;
- break;
- case 'a':
- edi_conf.tagpacket_alignment = std::stoi(optarg);
- break;
- case 'w':
- delay_ms = std::stoi(optarg);
- break;
- case 'W':
- max_delay_ms = std::stoi(optarg);
- break;
- case 'h':
- default:
- usage();
- return 1;
- }
- }
-
- add_edi_destination();
-
- if (optind >= argc) {
- etiLog.level(error) << "source option is missing";
- return 1;
- }
-
- if (edi_conf.dest_port == 0) {
- etiLog.level(error) << "No EDI destination port defined";
- return 1;
- }
-
- if (edi_conf.destinations.empty()) {
- etiLog.level(error) << "No EDI destinations set";
- return 1;
- }
-
- if (max_delay_ms > 0) {
- etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms and max delay " << max_delay_ms << " ms";
- }
- else {
- etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms";
- }
- edisender.start(edi_conf, delay_ms, max_delay_ms);
- edisender.print_configuration();
-
- const char* source_url = argv[optind];
-
-
- size_t frame_count = 0;
- size_t error_count = 0;
-
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
- zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
-
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
-
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
- error_count++;
- }
-
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
-
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
-
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
- error_count++;
- }
- else {
- std::vector<uint8_t> buf(6144, 0x55);
-
- const int framesize = dab_msg->buflen[i];
-
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
-
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
-
- offset += framesize;
- }
- }
-
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
-
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
-
- offset += consumed_bytes;
- }
-
- for (auto &f : all_frames) {
- edisender.push_frame(f);
- frame_count++;
- }
- }
- }
-
- etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
-
- return 0;
-}
-
-int main(int argc, char **argv)
-{
- etiLog.level(info) << "ZMQ2EDI converter from " <<
- PACKAGE_NAME << " " <<
-#if defined(GITVERSION)
- GITVERSION <<
-#else
- PACKAGE_VERSION <<
-#endif
- " starting up";
-
- try {
- return start(argc, argv);
- }
- catch (std::runtime_error &e) {
- etiLog.level(error) << "Error: " << e.what();
- }
-
- return 1;
-}