aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-29 14:31:55 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-29 14:31:55 +0100
commit78b4f04de2975da7b6240983fe1c6a496289a067 (patch)
tree1a88031d66d04ad4f87d99d331436b33c0c99cdf
parentc0f12dce1b8486660962c5d4e9d017078aac2263 (diff)
downloaddabmux-78b4f04de2975da7b6240983fe1c6a496289a067.tar.gz
dabmux-78b4f04de2975da7b6240983fe1c6a496289a067.tar.bz2
dabmux-78b4f04de2975da7b6240983fe1c6a496289a067.zip
Add ZMQ output to odr-zmq2edi
-rw-r--r--Makefile.am4
-rw-r--r--src/zmq2edi/Sender.cpp (renamed from src/zmq2edi/EDISender.cpp)47
-rw-r--r--src/zmq2edi/Sender.h (renamed from src/zmq2edi/EDISender.h)25
-rw-r--r--src/zmq2edi/zmq2edi.cpp106
4 files changed, 127 insertions, 55 deletions
diff --git a/Makefile.am b/Makefile.am
index d512a16..0c730dd 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -207,8 +207,8 @@ odr_zmq2farsync_CFLAGS = -Wall $(ZMQ_CPPFLAGS) $(PTHREAD_CFLAGS) $(GITVERSION_
odr_zmq2farsync_CXXFLAGS = -Wall $(PTHREAD_CFLAGS) $(PTHREAD_LIBS) $(ZMQ_CPPFLAGS) $(GITVERSION_FLAGS) $(INCLUDE)
odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \
- src/zmq2edi/EDISender.h \
- src/zmq2edi/EDISender.cpp \
+ src/zmq2edi/Sender.h \
+ src/zmq2edi/Sender.cpp \
src/dabOutput/dabOutput.h \
src/dabOutput/metadata.h \
src/dabOutput/metadata.cpp \
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/Sender.cpp
index 4a70105..fe46846 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/Sender.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -25,7 +25,7 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "EDISender.h"
+#include "Sender.h"
#include "Log.h"
#include <cmath>
#include <numeric>
@@ -35,20 +35,27 @@
using namespace std;
-EDISender::~EDISender()
+Sender::Sender() :
+ zmq_ctx(2)
+{
+}
+
+Sender::~Sender()
{
if (running.load()) {
running.store(false);
// Unblock thread
frame_t emptyframe;
- frames.push(emptyframe);
+ frames.push(std::move(emptyframe));
process_thread.join();
}
}
-void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets)
+void Sender::start(const edi::configuration_t& conf,
+ const zmq_send_config_t& zmq_conf,
+ int delay_ms, bool drop_late_packets)
{
edi_conf = conf;
tist_delay_ms = delay_ms;
@@ -56,16 +63,22 @@ void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_
edi_sender = make_shared<edi::Sender>(edi_conf);
+ for (const auto& url : zmq_conf.urls) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB);
+ zmq_sock.bind(url.c_str());
+ zmq_sockets.emplace_back(std::move(zmq_sock));
+ }
+
running.store(true);
- process_thread = thread(&EDISender::process, this);
+ process_thread = thread(&Sender::process, this);
}
-void EDISender::push_frame(frame_t&& frame)
+void Sender::push_frame(frame_t&& frame)
{
- frames.push(move(frame));
+ frames.push(std::move(frame));
}
-void EDISender::print_configuration()
+void Sender::print_configuration()
{
if (edi_conf.enabled()) {
edi_conf.print();
@@ -75,7 +88,7 @@ void EDISender::print_configuration()
}
}
-void EDISender::send_eti_frame(frame_t& frame)
+void Sender::send_eti_frame(frame_t& frame)
{
uint8_t *p = frame.data.data();
@@ -221,9 +234,18 @@ void EDISender::send_eti_frame(frame_t& frame)
edi_sender->write(edi_tagpacket);
}
+
+ if (not frame.original_zmq_message.empty()) {
+ for (auto& sock : zmq_sockets) {
+ const auto send_result = sock.send(frame.original_zmq_message, zmq::send_flags::dontwait);
+ if (not send_result.has_value()) {
+ num_zmq_send_errors++;
+ }
+ }
+ }
}
-void EDISender::process()
+void Sender::process()
{
while (running.load()) {
frame_t frame;
@@ -289,7 +311,8 @@ void EDISender::process()
" stdev: " << stdev <<
" late: " <<
num_late << " of " << buffering_stats.size() << " (" <<
- num_late * 100.0 / n << "%)";
+ num_late * 100.0 / n << "%) " <<
+ "Num ZMQ send errors: " << num_zmq_send_errors;
buffering_stats.clear();
}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/Sender.h
index 9c37e75..6dfd615 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/Sender.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -46,18 +46,27 @@ struct metadata_t {
};
struct frame_t {
+ // Since a zmq message actually contains 4 frames, the
+ // original_zmq_msg is only non-empty for the first of the
+ // four calls to Sender::send_edi_frame().
+ zmq::message_t original_zmq_message;
std::vector<uint8_t> data;
metadata_t metadata;
std::chrono::steady_clock::time_point received_at;
};
-class EDISender {
+struct zmq_send_config_t {
+ std::vector<std::string> urls;
+};
+
+class Sender {
public:
- EDISender() = default;
- EDISender(const EDISender& other) = delete;
- EDISender& operator=(const EDISender& other) = delete;
- ~EDISender();
+ Sender();
+ Sender(const Sender& other) = delete;
+ Sender& operator=(const Sender& other) = delete;
+ ~Sender();
void start(const edi::configuration_t& conf,
+ const zmq_send_config_t& zmq_conf,
int delay_ms, bool drop_late_packets);
void push_frame(frame_t&& frame);
void print_configuration(void);
@@ -75,11 +84,15 @@ class EDISender {
std::shared_ptr<edi::Sender> edi_sender;
+ zmq::context_t zmq_ctx;
+ std::vector<zmq::socket_t> zmq_sockets;
+
struct buffering_stat_t {
// Time between when we received the packets and when we transmit packets, in microseconds
double buffering_time_us = 0.0;
bool late = false;
};
std::vector<buffering_stat_t> buffering_stats;
+ size_t num_zmq_send_errors = 0;
};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index cff16c7..41d92b5 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -36,7 +36,7 @@
#include <thread>
#include <vector>
-#include "EDISender.h"
+#include "Sender.h"
#include "dabOutput/dabOutput.h"
constexpr size_t MAX_ERROR_COUNT = 10;
@@ -45,7 +45,7 @@ constexpr long DEFAULT_BACKOFF = 5000;
static edi::configuration_t edi_conf;
-static EDISender edisender;
+static Sender edisender;
static void usage()
{
@@ -54,6 +54,8 @@ static void usage()
cerr << "Usage:" << endl;
cerr << "odr-zmq2edi [options] <source>" << endl << endl;
+ cerr << "ODR-ZMQ2EDI can output to both EDI and ZMQ. It buffers and releases frames according to their timestamp." << endl;
+
cerr << "Options:" << endl;
cerr << "The following options can be given only once:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
@@ -62,15 +64,20 @@ static void usage()
cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
cerr << " This is useful for checking that NTP is properly synchronised" << endl;
cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
+ cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
+
+ cerr << " ZMQ Output options:" << endl;
+ cerr << " -Z <url> Add a zmq output to URL, e.g. tcp:*:9876 to listen for connections on port 9876 " << endl << endl;
+
+ cerr << " EDI Output options:" << endl;
+ cerr << " -v Enables verbose mode." << endl;
cerr << " -P Disable PFT and send AFPackets." << endl;
cerr << " -f <fec> Set the FEC." << endl;
cerr << " -i <spread> Configure the UDP packet spread/interleaver with given percentage: 0% send all fragments at once, 100% spread over 24ms, >100% spread and interleave. Default 95%\n";
cerr << " -D Dump the EDI to edi.debug file." << endl;
- cerr << " -v Enables verbose mode." << endl;
cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl;
- cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
- cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl;
+ cerr << "The following options can be given several times, when more than EDI/UDP destination is desired:" << endl;
cerr << " -d <destination ip> Set the destination ip." << endl;
cerr << " -p <destination port> Set the destination port." << endl;
cerr << " -s <source port> Set the source port." << endl;
@@ -176,7 +183,7 @@ static void add_edi_destination(void)
std::to_string(edi_conf.destinations.size() + 1));
}
- edi_conf.destinations.push_back(move(edi_destination));
+ edi_conf.destinations.push_back(std::move(edi_destination));
edi_destination = std::make_shared<edi::udp_destination_t>();
dest_port_set = false;
@@ -249,9 +256,11 @@ int start(int argc, char **argv)
uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
std::string startupcheck;
+ zmq_send_config_t zmq_conf;
+
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xh");
+ ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xhZ:");
switch (ch) {
case -1:
break;
@@ -302,6 +311,9 @@ int start(int argc, char **argv)
case 'x':
drop_late_packets = true;
break;
+ case 'Z':
+ zmq_conf.urls.push_back(optarg);
+ break;
case 'h':
default:
usage();
@@ -309,6 +321,32 @@ int start(int argc, char **argv)
}
}
+ if (dest_addr_set) {
+ add_edi_destination();
+ }
+
+ if (optind >= argc) {
+ etiLog.level(error) << "source option is missing";
+ return 1;
+ }
+
+ if (edi_conf.destinations.empty() and zmq_conf.urls.empty()) {
+ etiLog.level(error) << "No destinations set";
+ return 1;
+ }
+
+ if (not edi_conf.destinations.empty()) {
+ edisender.print_configuration();
+ }
+
+ if (not zmq_conf.urls.empty()) {
+ etiLog.level(info) << "Setting up ZMQ to:";
+ for (const auto& url : zmq_conf.urls) {
+ etiLog.level(info) << " " << url;
+ }
+ }
+
+
if (not startupcheck.empty()) {
etiLog.level(info) << "Running startup check '" << startupcheck << "'";
int wstatus = system(startupcheck.c_str());
@@ -328,22 +366,9 @@ int start(int argc, char **argv)
}
}
- add_edi_destination();
-
- if (optind >= argc) {
- etiLog.level(error) << "source option is missing";
- return 1;
- }
-
- if (edi_conf.destinations.empty()) {
- etiLog.level(error) << "No EDI destinations set";
- return 1;
- }
-
- etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms. " <<
+ etiLog.level(info) << "Setting up Sender with delay " << delay_ms << " ms. " <<
(drop_late_packets ? "Will" : "Will not") << " drop late packets";
- edisender.start(edi_conf, delay_ms, drop_late_packets);
- edisender.print_configuration();
+ edisender.start(edi_conf, zmq_conf, delay_ms, drop_late_packets);
const char* source_url = argv[optind];
@@ -370,26 +395,33 @@ int start(int argc, char **argv)
}
else {
// Event received: recv will not block
- zmq_sock.recv(incoming, zmq::recv_flags::none);
+ const auto recv_result = zmq_sock.recv(incoming, zmq::recv_flags::none);
+ if (not recv_result.has_value()) {
+ continue;
+ }
+
const auto received_at = std::chrono::steady_clock::now();
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ // Casting incoming.data() to zmq_dab_message_t* is not allowed, because
+ // it might be misaligned
+ zmq_dab_message_t dab_msg;
+ memcpy(&dab_msg, incoming.data(), ZMQ_DAB_MESSAGE_HEAD_LENGTH);
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ if (dab_msg.version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg.version;
error_count++;
}
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ int offset = sizeof(dab_msg.version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen);
std::vector<frame_t> all_frames;
all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ if (dab_msg.buflen[i] <= 0 or dab_msg.buflen[i] > 6144) {
etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
+ " has invalid length " << dab_msg.buflen[i];
error_count++;
}
else {
@@ -397,7 +429,7 @@ int start(int argc, char **argv)
frame.data.resize(6144, 0x55);
frame.received_at = received_at;
- const int framesize = dab_msg->buflen[i];
+ const int framesize = dab_msg.buflen[i];
memcpy(frame.data.data(),
((uint8_t*)incoming.data()) + offset,
@@ -429,6 +461,10 @@ int start(int argc, char **argv)
offset += consumed_bytes;
}
+ if (not all_frames.empty()) {
+ all_frames[0].original_zmq_message = std::move(incoming);
+ }
+
for (auto &f : all_frames) {
edisender.push_frame(std::move(f));
}
@@ -476,9 +512,6 @@ int main(int argc, char **argv)
try {
ret = start(argc, argv);
-
- // To make sure things get printed to stderr
- std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
catch (const std::runtime_error &e) {
etiLog.level(error) << "Runtime error: " << e.what();
@@ -487,5 +520,8 @@ int main(int argc, char **argv)
etiLog.level(error) << "Logic error! " << e.what();
}
+ // To make sure things get printed to stderr
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
return ret;
}