aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2edi/zmq2edi.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq2edi/zmq2edi.cpp')
-rw-r--r--src/zmq2edi/zmq2edi.cpp167
1 files changed, 107 insertions, 60 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index f7d733c..20ddcbe 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -59,6 +59,8 @@ static void usage()
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
cerr << " Negative delay values are also allowed." << endl;
+ cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
+ cerr << " This is useful for checking that NTP is properly synchronised" << endl;
cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
cerr << " -p <destination port> Set the destination port." << endl;
cerr << " -P Disable PFT and send AFPackets." << endl;
@@ -222,6 +224,8 @@ static void parse_destination_args(char option)
}
}
+class FCTDiscontinuity { };
+
int start(int argc, char **argv)
{
edi_conf.enable_pft = true;
@@ -234,13 +238,17 @@ int start(int argc, char **argv)
int delay_ms = 500;
bool drop_late_packets = false;
uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
+ std::string startupcheck;
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh");
+ ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xh");
switch (ch) {
case -1:
break;
+ case 'C':
+ startupcheck = optarg;
+ break;
case 'd':
case 's':
case 'S':
@@ -298,6 +306,25 @@ int start(int argc, char **argv)
}
}
+ if (not startupcheck.empty()) {
+ etiLog.level(info) << "Running startup check '" << startupcheck << "'";
+ int wstatus = system(startupcheck.c_str());
+
+ if (WIFEXITED(wstatus)) {
+ if (WEXITSTATUS(wstatus) == 0) {
+ etiLog.level(info) << "Startup check ok";
+ }
+ else {
+ etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus);
+ return 1;
+ }
+ }
+ else {
+ etiLog.level(error) << "Startup check failed, child didn't terminate normally";
+ return 1;
+ }
+ }
+
add_edi_destination();
if (optind >= argc) {
@@ -325,88 +352,100 @@ int start(int argc, char **argv)
zmq::context_t zmq_ctx(1);
etiLog.level(info) << "Opening ZMQ input: " << source_url;
- size_t num_consecutive_resets = 0;
while (true) {
zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
zmq_sock.connect(source_url);
zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
size_t error_count = 0;
+ int previous_fct = -1;
+
+ try {
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
+ error_count++;
+ }
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
+ const auto received_at = std::chrono::steady_clock::now();
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- num_consecutive_resets = 0;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ error_count++;
+ }
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
- error_count++;
- }
+ std::vector<frame_t> all_frames;
+ all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ frame_t frame;
+ frame.data.resize(6144, 0x55);
+ frame.received_at = received_at;
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ const int framesize = dab_msg->buflen[i];
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
- error_count++;
- }
- else {
- std::vector<uint8_t> buf(6144, 0x55);
+ memcpy(frame.data.data(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- const int framesize = dab_msg->buflen[i];
+ const int fct = frame.data[4];
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ const int expected_fct = (previous_fct + 1) % 250;
+ if (previous_fct != -1 and expected_fct != fct) {
+ etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct;
+ throw FCTDiscontinuity();
+ }
+ previous_fct = fct;
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ all_frames.push_back(std::move(frame));
- offset += framesize;
+ offset += framesize;
+ }
}
- }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.metadata = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
+ for (auto &f : all_frames) {
+ edisender.push_frame(std::move(f));
+ }
}
}
- }
- num_consecutive_resets++;
+ etiLog.level(info) << "Backoff " << backoff_after_reset_ms <<
+ "ms due to ZMQ input (" << source_url << ") timeout";
+ }
+ catch (const FCTDiscontinuity&) {
+ etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity";
+ }
zmq_sock.close();
std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms));
- etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
- num_consecutive_resets << " consecutive resets.";
}
return 0;
@@ -423,12 +462,20 @@ int main(int argc, char **argv)
#endif
" starting up";
+ int ret = 1;
+
try {
- return start(argc, argv);
+ ret = start(argc, argv);
+
+ // To make sure things get printed to stderr
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ }
+ catch (const std::runtime_error &e) {
+ etiLog.level(error) << "Runtime error: " << e.what();
}
- catch (std::runtime_error &e) {
- etiLog.level(error) << "Error: " << e.what();
+ catch (const std::logic_error &e) {
+ etiLog.level(error) << "Logic error! " << e.what();
}
- return 1;
+ return ret;
}