aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2edi/zmq2edi.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq2edi/zmq2edi.cpp')
-rw-r--r--src/zmq2edi/zmq2edi.cpp360
1 files changed, 126 insertions, 234 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 41d92b5..63c3228 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2024
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -27,64 +27,50 @@
#include "Log.h"
#include "zmq.hpp"
+#include <math.h>
#include <getopt.h>
-#include <cmath>
-#include <cstring>
-#include <chrono>
+#include <string.h>
#include <iostream>
#include <iterator>
-#include <thread>
#include <vector>
-#include "Sender.h"
+#include "EDISender.h"
#include "dabOutput/dabOutput.h"
constexpr size_t MAX_ERROR_COUNT = 10;
constexpr long ZMQ_TIMEOUT_MS = 1000;
-constexpr long DEFAULT_BACKOFF = 5000;
-static edi::configuration_t edi_conf;
+static edi_configuration_t edi_conf;
-static Sender edisender;
+static EDISender edisender;
-static void usage()
+void usage(void)
{
using namespace std;
cerr << "Usage:" << endl;
cerr << "odr-zmq2edi [options] <source>" << endl << endl;
- cerr << "ODR-ZMQ2EDI can output to both EDI and ZMQ. It buffers and releases frames according to their timestamp." << endl;
-
cerr << "Options:" << endl;
cerr << "The following options can be given only once:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
- cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
- cerr << " Negative delay values are also allowed." << endl;
- cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
- cerr << " This is useful for checking that NTP is properly synchronised" << endl;
- cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
- cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
-
- cerr << " ZMQ Output options:" << endl;
- cerr << " -Z <url> Add a zmq output to URL, e.g. tcp:*:9876 to listen for connections on port 9876 " << endl << endl;
-
- cerr << " EDI Output options:" << endl;
- cerr << " -v Enables verbose mode." << endl;
- cerr << " -P Disable PFT and send AFPackets." << endl;
- cerr << " -f <fec> Set the FEC." << endl;
- cerr << " -i <spread> Configure the UDP packet spread/interleaver with given percentage: 0% send all fragments at once, 100% spread over 24ms, >100% spread and interleave. Default 95%\n";
- cerr << " -D Dump the EDI to edi.debug file." << endl;
- cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl;
-
- cerr << "The following options can be given several times, when more than EDI/UDP destination is desired:" << endl;
- cerr << " -d <destination ip> Set the destination ip." << endl;
- cerr << " -p <destination port> Set the destination port." << endl;
- cerr << " -s <source port> Set the source port." << endl;
- cerr << " -S <source ip> Select the source IP in case we want to use multicast." << endl;
- cerr << " -t <ttl> Set the packet's TTL." << endl << endl;
-
- cerr << "The input socket will be reset if no data is received for " <<
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
+ cerr << " -W <max_delay> Drop ETI frames if TIST is <max_delay> later than current system time." << endl;
+ cerr << " -p <destination port> sets the destination port." << endl;
+ cerr << " -P Disable PFT and send AFPackets." << endl;
+ cerr << " -f <fec> sets the FEC." << endl;
+ cerr << " -i <interleave> enables the interleaved with this latency." << endl;
+ cerr << " -D dumps the EDI to edi.debug file." << endl;
+ cerr << " -v Enables verbose mode." << endl;
+ cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl;
+
+ cerr << "The following options can be given several times, when more than once destination is addressed:" << endl;
+ cerr << " -d <destination ip> sets the destination ip." << endl;
+ cerr << " -s <source port> sets the source port." << endl;
+ cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
+ cerr << " -t <ttl> set the packet's TTL." << endl << endl;
+
+ cerr << "odr-zmq2edi will quit if it does not receive data for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -169,8 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b
/* There is some state inside the parsing of destination arguments,
* because several destinations can be given. */
-static std::shared_ptr<edi::udp_destination_t> edi_destination;
-static bool dest_port_set = false;
+static edi_destination_t edi_destination;
static bool source_port_set = false;
static bool source_addr_set = false;
static bool ttl_set = false;
@@ -183,10 +168,10 @@ static void add_edi_destination(void)
std::to_string(edi_conf.destinations.size() + 1));
}
- edi_conf.destinations.push_back(std::move(edi_destination));
- edi_destination = std::make_shared<edi::udp_destination_t>();
+ edi_conf.destinations.push_back(edi_destination);
+ edi_destination_t newdest;
+ edi_destination = newdest;
- dest_port_set = false;
source_port_set = false;
source_addr_set = false;
ttl_set = false;
@@ -195,44 +180,33 @@ static void add_edi_destination(void)
static void parse_destination_args(char option)
{
- if (not edi_destination) {
- edi_destination = std::make_shared<edi::udp_destination_t>();
- }
-
switch (option) {
- case 'p':
- if (dest_port_set) {
- add_edi_destination();
- }
- edi_destination->dest_port = std::stoi(optarg);
- dest_port_set = true;
- break;
case 's':
if (source_port_set) {
add_edi_destination();
}
- edi_destination->source_port = std::stoi(optarg);
+ edi_destination.source_port = std::stoi(optarg);
source_port_set = true;
break;
case 'S':
if (source_addr_set) {
add_edi_destination();
}
- edi_destination->source_addr = optarg;
+ edi_destination.source_addr = optarg;
source_addr_set = true;
break;
case 't':
if (ttl_set) {
add_edi_destination();
}
- edi_destination->ttl = std::stoi(optarg);
+ edi_destination.ttl = std::stoi(optarg);
ttl_set = true;
break;
case 'd':
if (dest_addr_set) {
add_edi_destination();
}
- edi_destination->dest_addr = optarg;
+ edi_destination.dest_addr = optarg;
dest_addr_set = true;
break;
default:
@@ -240,8 +214,6 @@ static void parse_destination_args(char option)
}
}
-class FCTDiscontinuity { };
-
int start(int argc, char **argv)
{
edi_conf.enable_pft = true;
@@ -252,28 +224,23 @@ int start(int argc, char **argv)
}
int delay_ms = 500;
- bool drop_late_packets = false;
- uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
- std::string startupcheck;
-
- zmq_send_config_t zmq_conf;
+ int max_delay_ms = 0; // no max delay
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xhZ:");
+ ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:W:");
switch (ch) {
case -1:
break;
- case 'C':
- startupcheck = optarg;
- break;
case 'd':
case 's':
case 'S':
case 't':
- case 'p':
parse_destination_args(ch);
break;
+ case 'p':
+ edi_conf.dest_port = std::stoi(optarg);
+ break;
case 'P':
edi_conf.enable_pft = false;
break;
@@ -282,14 +249,18 @@ int start(int argc, char **argv)
break;
case 'i':
{
- int spread_percent = std::stoi(optarg);
- if (spread_percent < 0) {
- throw std::runtime_error("EDI output: negative spread value is invalid.");
- }
+ double interleave_ms = std::stod(optarg);
+ if (interleave_ms != 0.0) {
+ if (interleave_ms < 0) {
+ throw std::runtime_error("EDI output: negative interleave value is invalid.");
+ }
- edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0;
- if (edi_conf.fragment_spreading_factor > 30000) {
- throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ auto latency_rounded = lround(interleave_ms / 24.0);
+ if (latency_rounded * 24 > 30000) {
+ throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ }
+
+ edi_conf.latency_frames = latency_rounded;
}
}
break;
@@ -302,17 +273,11 @@ int start(int argc, char **argv)
case 'a':
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
- case 'b':
- backoff_after_reset_ms = std::stoi(optarg);
- break;
case 'w':
delay_ms = std::stoi(optarg);
break;
- case 'x':
- drop_late_packets = true;
- break;
- case 'Z':
- zmq_conf.urls.push_back(optarg);
+ case 'W':
+ max_delay_ms = std::stoi(optarg);
break;
case 'h':
default:
@@ -321,184 +286,119 @@ int start(int argc, char **argv)
}
}
- if (dest_addr_set) {
- add_edi_destination();
- }
+ add_edi_destination();
if (optind >= argc) {
etiLog.level(error) << "source option is missing";
return 1;
}
- if (edi_conf.destinations.empty() and zmq_conf.urls.empty()) {
- etiLog.level(error) << "No destinations set";
+ if (edi_conf.dest_port == 0) {
+ etiLog.level(error) << "No EDI destination port defined";
return 1;
}
- if (not edi_conf.destinations.empty()) {
- edisender.print_configuration();
+ if (edi_conf.destinations.empty()) {
+ etiLog.level(error) << "No EDI destinations set";
+ return 1;
}
- if (not zmq_conf.urls.empty()) {
- etiLog.level(info) << "Setting up ZMQ to:";
- for (const auto& url : zmq_conf.urls) {
- etiLog.level(info) << " " << url;
- }
+ if (max_delay_ms > 0) {
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms and max delay " << max_delay_ms << " ms";
}
-
-
- if (not startupcheck.empty()) {
- etiLog.level(info) << "Running startup check '" << startupcheck << "'";
- int wstatus = system(startupcheck.c_str());
-
- if (WIFEXITED(wstatus)) {
- if (WEXITSTATUS(wstatus) == 0) {
- etiLog.level(info) << "Startup check ok";
- }
- else {
- etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus);
- return 1;
- }
- }
- else {
- etiLog.level(error) << "Startup check failed, child didn't terminate normally";
- return 1;
- }
+ else {
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms";
}
-
- etiLog.level(info) << "Setting up Sender with delay " << delay_ms << " ms. " <<
- (drop_late_packets ? "Will" : "Will not") << " drop late packets";
- edisender.start(edi_conf, zmq_conf, delay_ms, drop_late_packets);
+ edisender.start(edi_conf, delay_ms, max_delay_ms);
+ edisender.print_configuration();
const char* source_url = argv[optind];
- zmq::context_t zmq_ctx(1);
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
- while (true) {
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- size_t error_count = 0;
- int previous_fct = -1;
-
- try {
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- const auto recv_result = zmq_sock.recv(incoming, zmq::recv_flags::none);
- if (not recv_result.has_value()) {
- continue;
- }
-
- const auto received_at = std::chrono::steady_clock::now();
- // Casting incoming.data() to zmq_dab_message_t* is not allowed, because
- // it might be misaligned
- zmq_dab_message_t dab_msg;
- memcpy(&dab_msg, incoming.data(), ZMQ_DAB_MESSAGE_HEAD_LENGTH);
+ size_t frame_count = 0;
+ size_t error_count = 0;
- if (dab_msg.version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg.version;
- error_count++;
- }
-
- int offset = sizeof(dab_msg.version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen);
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- std::vector<frame_t> all_frames;
- all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
+ zmq::context_t zmq_ctx(1);
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
+ error_count++;
+ }
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg.buflen[i] <= 0 or dab_msg.buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg.buflen[i];
- error_count++;
- }
- else {
- frame_t frame;
- frame.data.resize(6144, 0x55);
- frame.received_at = received_at;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- const int framesize = dab_msg.buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ error_count++;
+ }
- memcpy(frame.data.data(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- const int fct = frame.data[4];
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
- const int expected_fct = (previous_fct + 1) % 250;
- if (previous_fct != -1 and expected_fct != fct) {
- etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct;
- throw FCTDiscontinuity();
- }
- previous_fct = fct;
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
- all_frames.push_back(std::move(frame));
+ const int framesize = dab_msg->buflen[i];
- offset += framesize;
- }
- }
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
- f.metadata = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ offset += framesize;
+ }
+ }
- offset += consumed_bytes;
- }
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- if (not all_frames.empty()) {
- all_frames[0].original_zmq_message = std::move(incoming);
- }
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- for (auto &f : all_frames) {
- edisender.push_frame(std::move(f));
- }
- }
+ offset += consumed_bytes;
}
- etiLog.level(info) << "Backoff " << backoff_after_reset_ms <<
- "ms due to ZMQ input (" << source_url << ") timeout";
- }
- catch (const FCTDiscontinuity&) {
- etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity";
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ frame_count++;
+ }
}
-
- zmq_sock.close();
- std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms));
}
+ etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+
return 0;
}
int main(int argc, char **argv)
{
- // Version handling is done very early to ensure nothing else but the version gets printed out
- if (argc == 2 and strcmp(argv[1], "--version") == 0) {
- fprintf(stdout, "%s\n",
-#if defined(GITVERSION)
- GITVERSION
-#else
- PACKAGE_VERSION
-#endif
- );
- return 0;
- }
-
etiLog.level(info) << "ZMQ2EDI converter from " <<
PACKAGE_NAME << " " <<
#if defined(GITVERSION)
@@ -508,20 +408,12 @@ int main(int argc, char **argv)
#endif
" starting up";
- int ret = 1;
-
try {
- ret = start(argc, argv);
+ return start(argc, argv);
}
- catch (const std::runtime_error &e) {
- etiLog.level(error) << "Runtime error: " << e.what();
+ catch (std::runtime_error &e) {
+ etiLog.level(error) << "Error: " << e.what();
}
- catch (const std::logic_error &e) {
- etiLog.level(error) << "Logic error! " << e.what();
- }
-
- // To make sure things get printed to stderr
- std::this_thread::sleep_for(std::chrono::milliseconds(300));
- return ret;
+ return 1;
}