aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2edi/zmq2edi.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-29 14:31:55 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-29 14:31:55 +0100
commit78b4f04de2975da7b6240983fe1c6a496289a067 (patch)
tree1a88031d66d04ad4f87d99d331436b33c0c99cdf /src/zmq2edi/zmq2edi.cpp
parentc0f12dce1b8486660962c5d4e9d017078aac2263 (diff)
downloaddabmux-78b4f04de2975da7b6240983fe1c6a496289a067.tar.gz
dabmux-78b4f04de2975da7b6240983fe1c6a496289a067.tar.bz2
dabmux-78b4f04de2975da7b6240983fe1c6a496289a067.zip
Add ZMQ output to odr-zmq2edi
Diffstat (limited to 'src/zmq2edi/zmq2edi.cpp')
-rw-r--r--src/zmq2edi/zmq2edi.cpp106
1 files changed, 71 insertions, 35 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index cff16c7..41d92b5 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -36,7 +36,7 @@
#include <thread>
#include <vector>
-#include "EDISender.h"
+#include "Sender.h"
#include "dabOutput/dabOutput.h"
constexpr size_t MAX_ERROR_COUNT = 10;
@@ -45,7 +45,7 @@ constexpr long DEFAULT_BACKOFF = 5000;
static edi::configuration_t edi_conf;
-static EDISender edisender;
+static Sender edisender;
static void usage()
{
@@ -54,6 +54,8 @@ static void usage()
cerr << "Usage:" << endl;
cerr << "odr-zmq2edi [options] <source>" << endl << endl;
+ cerr << "ODR-ZMQ2EDI can output to both EDI and ZMQ. It buffers and releases frames according to their timestamp." << endl;
+
cerr << "Options:" << endl;
cerr << "The following options can be given only once:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
@@ -62,15 +64,20 @@ static void usage()
cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
cerr << " This is useful for checking that NTP is properly synchronised" << endl;
cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
+ cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
+
+ cerr << " ZMQ Output options:" << endl;
+ cerr << " -Z <url> Add a zmq output to URL, e.g. tcp:*:9876 to listen for connections on port 9876 " << endl << endl;
+
+ cerr << " EDI Output options:" << endl;
+ cerr << " -v Enables verbose mode." << endl;
cerr << " -P Disable PFT and send AFPackets." << endl;
cerr << " -f <fec> Set the FEC." << endl;
cerr << " -i <spread> Configure the UDP packet spread/interleaver with given percentage: 0% send all fragments at once, 100% spread over 24ms, >100% spread and interleave. Default 95%\n";
cerr << " -D Dump the EDI to edi.debug file." << endl;
- cerr << " -v Enables verbose mode." << endl;
cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl;
- cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
- cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl;
+ cerr << "The following options can be given several times, when more than EDI/UDP destination is desired:" << endl;
cerr << " -d <destination ip> Set the destination ip." << endl;
cerr << " -p <destination port> Set the destination port." << endl;
cerr << " -s <source port> Set the source port." << endl;
@@ -176,7 +183,7 @@ static void add_edi_destination(void)
std::to_string(edi_conf.destinations.size() + 1));
}
- edi_conf.destinations.push_back(move(edi_destination));
+ edi_conf.destinations.push_back(std::move(edi_destination));
edi_destination = std::make_shared<edi::udp_destination_t>();
dest_port_set = false;
@@ -249,9 +256,11 @@ int start(int argc, char **argv)
uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
std::string startupcheck;
+ zmq_send_config_t zmq_conf;
+
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xh");
+ ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xhZ:");
switch (ch) {
case -1:
break;
@@ -302,6 +311,9 @@ int start(int argc, char **argv)
case 'x':
drop_late_packets = true;
break;
+ case 'Z':
+ zmq_conf.urls.push_back(optarg);
+ break;
case 'h':
default:
usage();
@@ -309,6 +321,32 @@ int start(int argc, char **argv)
}
}
+ if (dest_addr_set) {
+ add_edi_destination();
+ }
+
+ if (optind >= argc) {
+ etiLog.level(error) << "source option is missing";
+ return 1;
+ }
+
+ if (edi_conf.destinations.empty() and zmq_conf.urls.empty()) {
+ etiLog.level(error) << "No destinations set";
+ return 1;
+ }
+
+ if (not edi_conf.destinations.empty()) {
+ edisender.print_configuration();
+ }
+
+ if (not zmq_conf.urls.empty()) {
+ etiLog.level(info) << "Setting up ZMQ to:";
+ for (const auto& url : zmq_conf.urls) {
+ etiLog.level(info) << " " << url;
+ }
+ }
+
+
if (not startupcheck.empty()) {
etiLog.level(info) << "Running startup check '" << startupcheck << "'";
int wstatus = system(startupcheck.c_str());
@@ -328,22 +366,9 @@ int start(int argc, char **argv)
}
}
- add_edi_destination();
-
- if (optind >= argc) {
- etiLog.level(error) << "source option is missing";
- return 1;
- }
-
- if (edi_conf.destinations.empty()) {
- etiLog.level(error) << "No EDI destinations set";
- return 1;
- }
-
- etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms. " <<
+ etiLog.level(info) << "Setting up Sender with delay " << delay_ms << " ms. " <<
(drop_late_packets ? "Will" : "Will not") << " drop late packets";
- edisender.start(edi_conf, delay_ms, drop_late_packets);
- edisender.print_configuration();
+ edisender.start(edi_conf, zmq_conf, delay_ms, drop_late_packets);
const char* source_url = argv[optind];
@@ -370,26 +395,33 @@ int start(int argc, char **argv)
}
else {
// Event received: recv will not block
- zmq_sock.recv(incoming, zmq::recv_flags::none);
+ const auto recv_result = zmq_sock.recv(incoming, zmq::recv_flags::none);
+ if (not recv_result.has_value()) {
+ continue;
+ }
+
const auto received_at = std::chrono::steady_clock::now();
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ // Casting incoming.data() to zmq_dab_message_t* is not allowed, because
+ // it might be misaligned
+ zmq_dab_message_t dab_msg;
+ memcpy(&dab_msg, incoming.data(), ZMQ_DAB_MESSAGE_HEAD_LENGTH);
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ if (dab_msg.version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg.version;
error_count++;
}
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ int offset = sizeof(dab_msg.version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen);
std::vector<frame_t> all_frames;
all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ if (dab_msg.buflen[i] <= 0 or dab_msg.buflen[i] > 6144) {
etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
+ " has invalid length " << dab_msg.buflen[i];
error_count++;
}
else {
@@ -397,7 +429,7 @@ int start(int argc, char **argv)
frame.data.resize(6144, 0x55);
frame.received_at = received_at;
- const int framesize = dab_msg->buflen[i];
+ const int framesize = dab_msg.buflen[i];
memcpy(frame.data.data(),
((uint8_t*)incoming.data()) + offset,
@@ -429,6 +461,10 @@ int start(int argc, char **argv)
offset += consumed_bytes;
}
+ if (not all_frames.empty()) {
+ all_frames[0].original_zmq_message = std::move(incoming);
+ }
+
for (auto &f : all_frames) {
edisender.push_frame(std::move(f));
}
@@ -476,9 +512,6 @@ int main(int argc, char **argv)
try {
ret = start(argc, argv);
-
- // To make sure things get printed to stderr
- std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
catch (const std::runtime_error &e) {
etiLog.level(error) << "Runtime error: " << e.what();
@@ -487,5 +520,8 @@ int main(int argc, char **argv)
etiLog.level(error) << "Logic error! " << e.what();
}
+ // To make sure things get printed to stderr
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
return ret;
}