diff options
Diffstat (limited to 'src/zmq2edi/zmq2edi.cpp')
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 167 |
1 files changed, 107 insertions, 60 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index f7d733c..20ddcbe 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -59,6 +59,8 @@ static void usage() cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; cerr << " Negative delay values are also allowed." << endl; + cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl; + cerr << " This is useful for checking that NTP is properly synchronised" << endl; cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; cerr << " -p <destination port> Set the destination port." << endl; cerr << " -P Disable PFT and send AFPackets." << endl; @@ -222,6 +224,8 @@ static void parse_destination_args(char option) } } +class FCTDiscontinuity { }; + int start(int argc, char **argv) { edi_conf.enable_pft = true; @@ -234,13 +238,17 @@ int start(int argc, char **argv) int delay_ms = 500; bool drop_late_packets = false; uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; + std::string startupcheck; int ch = 0; while (ch != -1) { - ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh"); + ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xh"); switch (ch) { case -1: break; + case 'C': + startupcheck = optarg; + break; case 'd': case 's': case 'S': @@ -298,6 +306,25 @@ int start(int argc, char **argv) } } + if (not startupcheck.empty()) { + etiLog.level(info) << "Running startup check '" << startupcheck << "'"; + int wstatus = system(startupcheck.c_str()); + + if (WIFEXITED(wstatus)) { + if (WEXITSTATUS(wstatus) == 0) { + etiLog.level(info) << "Startup check ok"; + } + else { + etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus); + return 1; + } + } + else { + etiLog.level(error) << "Startup check failed, child didn't terminate normally"; + return 1; + } + } + add_edi_destination(); if (optind >= argc) { @@ -325,88 +352,100 @@ int start(int argc, char **argv) zmq::context_t zmq_ctx(1); etiLog.level(info) << "Opening ZMQ input: " << source_url; - size_t num_consecutive_resets = 0; while (true) { zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); zmq_sock.connect(source_url); zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages size_t error_count = 0; + int previous_fct = -1; + + try { + while (error_count < MAX_ERROR_COUNT) { + zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); + if (num_events == 0) { // timeout + error_count++; + } + else { + // Event received: recv will not block + zmq_sock.recv(&incoming); + const auto received_at = std::chrono::steady_clock::now(); - while (error_count < MAX_ERROR_COUNT) { - zmq::message_t incoming; - zmq::pollitem_t items[1]; - items[0].socket = zmq_sock; - items[0].events = ZMQ_POLLIN; - const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); - if (num_events == 0) { // timeout - error_count++; - } - else { - num_consecutive_resets = 0; + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - // Event received: recv will not block - zmq_sock.recv(&incoming); + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + error_count++; + } - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; - error_count++; - } + std::vector<frame_t> all_frames; + all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { + etiLog.level(error) << "ZeroMQ buffer " << i << + " has invalid length " << dab_msg->buflen[i]; + error_count++; + } + else { + frame_t frame; + frame.data.resize(6144, 0x55); + frame.received_at = received_at; - std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + const int framesize = dab_msg->buflen[i]; - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { - etiLog.level(error) << "ZeroMQ buffer " << i << - " has invalid length " << dab_msg->buflen[i]; - error_count++; - } - else { - std::vector<uint8_t> buf(6144, 0x55); + memcpy(frame.data.data(), + ((uint8_t*)incoming.data()) + offset, + framesize); - const int framesize = dab_msg->buflen[i]; + const int fct = frame.data[4]; - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); + const int expected_fct = (previous_fct + 1) % 250; + if (previous_fct != -1 and expected_fct != fct) { + etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct; + throw FCTDiscontinuity(); + } + previous_fct = fct; - all_frames.emplace_back( - std::piecewise_construct, - std::make_tuple(std::move(buf)), - std::make_tuple()); + all_frames.push_back(std::move(frame)); - offset += framesize; + offset += framesize; + } } - } - for (auto &f : all_frames) { - size_t consumed_bytes = 0; + for (auto &f : all_frames) { + size_t consumed_bytes = 0; - f.second = get_md_one_frame( - static_cast<uint8_t*>(incoming.data()) + offset, - incoming.size() - offset, - &consumed_bytes); + f.metadata = get_md_one_frame( + static_cast<uint8_t*>(incoming.data()) + offset, + incoming.size() - offset, + &consumed_bytes); - offset += consumed_bytes; - } + offset += consumed_bytes; + } - for (auto &f : all_frames) { - edisender.push_frame(f); + for (auto &f : all_frames) { + edisender.push_frame(std::move(f)); + } } } - } - num_consecutive_resets++; + etiLog.level(info) << "Backoff " << backoff_after_reset_ms << + "ms due to ZMQ input (" << source_url << ") timeout"; + } + catch (const FCTDiscontinuity&) { + etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity"; + } zmq_sock.close(); std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); - etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << - num_consecutive_resets << " consecutive resets."; } return 0; @@ -423,12 +462,20 @@ int main(int argc, char **argv) #endif " starting up"; + int ret = 1; + try { - return start(argc, argv); + ret = start(argc, argv); + + // To make sure things get printed to stderr + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + catch (const std::runtime_error &e) { + etiLog.level(error) << "Runtime error: " << e.what(); } - catch (std::runtime_error &e) { - etiLog.level(error) << "Error: " << e.what(); + catch (const std::logic_error &e) { + etiLog.level(error) << "Logic error! " << e.what(); } - return 1; + return ret; } |