summaryrefslogtreecommitdiffstats
path: root/src/zmq2edi
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq2edi')
-rw-r--r--src/zmq2edi/EDISender.cpp2
-rw-r--r--src/zmq2edi/EDISender.h6
-rw-r--r--src/zmq2edi/zmq2edi.cpp168
3 files changed, 96 insertions, 80 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 2128abf..2188f8a 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -79,7 +79,7 @@ void EDISender::print_configuration()
void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
{
edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
+ edi::TagStarPTR edi_tagStarPtr("DETI");
map<int, edi::TagESTn> edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index bb9c8bc..3525b4b 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -34,9 +34,9 @@
#include <atomic>
#include "ThreadsafeQueue.h"
#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/Transport.h"
+#include "edioutput/TagItems.h"
+#include "edioutput/TagPacket.h"
+#include "edioutput/Transport.h"
// This metadata gets transmitted in the zmq stream
struct metadata_t {
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 3888d8a..f7d733c 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -27,11 +27,13 @@
#include "Log.h"
#include "zmq.hpp"
-#include <math.h>
#include <getopt.h>
-#include <string.h>
+#include <cmath>
+#include <cstring>
+#include <chrono>
#include <iostream>
#include <iterator>
+#include <thread>
#include <vector>
#include "EDISender.h"
@@ -39,12 +41,13 @@
constexpr size_t MAX_ERROR_COUNT = 10;
constexpr long ZMQ_TIMEOUT_MS = 1000;
+constexpr long DEFAULT_BACKOFF = 5000;
static edi::configuration_t edi_conf;
static EDISender edisender;
-void usage(void)
+static void usage()
{
using namespace std;
@@ -54,23 +57,25 @@ void usage(void)
cerr << "Options:" << endl;
cerr << "The following options can be given only once:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
- cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
- cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
- cerr << " -p <destination port> sets the destination port." << endl;
- cerr << " -P Disable PFT and send AFPackets." << endl;
- cerr << " -f <fec> sets the FEC." << endl;
- cerr << " -i <interleave> enables the interleaved with this latency." << endl;
- cerr << " -D dumps the EDI to edi.debug file." << endl;
- cerr << " -v Enables verbose mode." << endl;
- cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl;
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
+ cerr << " Negative delay values are also allowed." << endl;
+ cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
+ cerr << " -p <destination port> Set the destination port." << endl;
+ cerr << " -P Disable PFT and send AFPackets." << endl;
+ cerr << " -f <fec> Set the FEC." << endl;
+ cerr << " -i <interleave> Enable the interleaver with this latency." << endl;
+ cerr << " -D Dump the EDI to edi.debug file." << endl;
+ cerr << " -v Enables verbose mode." << endl;
+ cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl;
+ cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl;
- cerr << " -d <destination ip> sets the destination ip." << endl;
- cerr << " -s <source port> sets the source port." << endl;
- cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
- cerr << " -t <ttl> set the packet's TTL." << endl << endl;
+ cerr << " -d <destination ip> Set the destination ip." << endl;
+ cerr << " -s <source port> Set the source port." << endl;
+ cerr << " -S <source ip> Select the source IP in case we want to use multicast." << endl;
+ cerr << " -t <ttl> Set the packet's TTL." << endl << endl;
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ cerr << "The input socket will be reset if no data is received for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -228,10 +233,11 @@ int start(int argc, char **argv)
int delay_ms = 500;
bool drop_late_packets = false;
+ uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:x");
+ ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh");
switch (ch) {
case -1:
break;
@@ -276,6 +282,9 @@ int start(int argc, char **argv)
case 'a':
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
+ case 'b':
+ backoff_after_reset_ms = std::stoi(optarg);
+ break;
case 'w':
delay_ms = std::stoi(optarg);
break;
@@ -313,85 +322,92 @@ int start(int argc, char **argv)
const char* source_url = argv[optind];
-
- size_t frame_count = 0;
- size_t error_count = 0;
-
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
-
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ size_t num_consecutive_resets = 0;
+ while (true) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ size_t error_count = 0;
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ num_consecutive_resets = 0;
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
+
+ const int framesize = dab_msg->buflen[i];
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
- offset += framesize;
+ offset += framesize;
+ }
}
- }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
- frame_count++;
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ }
}
}
- }
- etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+ num_consecutive_resets++;
+
+ zmq_sock.close();
+ std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms));
+ etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
+ num_consecutive_resets << " consecutive resets.";
+ }
return 0;
}