diff options
-rw-r--r-- | doc/stats_zmq2edi_munin.py | 148 | ||||
-rw-r--r-- | lib/edi/common.cpp | 17 | ||||
-rw-r--r-- | src/input/Zmq.cpp | 4 | ||||
-rw-r--r-- | src/input/Zmq.h | 4 | ||||
-rw-r--r-- | src/zmq2edi/EDISender.cpp | 98 | ||||
-rw-r--r-- | src/zmq2edi/EDISender.h | 22 | ||||
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 167 |
7 files changed, 346 insertions, 114 deletions
diff --git a/doc/stats_zmq2edi_munin.py b/doc/stats_zmq2edi_munin.py new file mode 100644 index 0000000..cf84afc --- /dev/null +++ b/doc/stats_zmq2edi_munin.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +# +# A munin plugin for ODR-ZMQ2EDI +# +# Reads the logfile, and the previously rotated logfile (suffixed by .1) and +# analyses the output. Generates a graph with percentage of frames late, and a +# graph with min/max wait time. +# +# Copy this to /etc/munin/plugins/stats_zmq2edi_munin +# and make it executable (chmod +x) +# +# Then make sure that zmq2edi log output gets written to LOGFILE below, +# and setup up a logrotate script to rotate the log. The rotated log +# filename must be appended with .1 + +# Every six seconds a line is output. We are polled in 5 min = 300s intervals +NUM_LINES = int(300 / 6) +LOGFILE = "/var/log/supervisor/zmq2edi.log" + +import time +import sys +import os +import re + +munin_config = """ +multigraph wait_time_zmq2edi +graph_title zmq2edi wait_time +graph_order high low +graph_args --base 1000 +graph_vlabel max/min wait times during last ${graph_period} +graph_category zmq2edi +graph_info This graph shows the min and max wait times + +high.info Max wait time +high.label Max wait time ms +high.min 0 +high.warning 1: +low.info Min wait time +low.label Min wait time ms +low.min -6000 +low.warning 1: + +multigraph late_packets_zmq2edi +graph_title EDI packets delivered too late +graph_order late +graph_args --base 1000 +graph_vlabel late packets during last ${graph_period} +graph_category zmq2edi +graph_info This graph shows the number late EDI packets (250 packets = 6 seconds) + +late.info Number of late packets +late.label Number of late packets +late.min 0 +late.max %s +late.warning 0:0 +""" % (NUM_LINES * 250,) + +def parse_logs(): + # example lines: + # Buffering time statistics [milliseconds]: min: 907.799 max: 981.409 mean: 944.335 stdev: 26.827 late: 0 of 250 (0%) + # Values might also be in scientific form, e.g. -1.80938e+07 + re_logline = re.compile(r"""Buffering time statistics.* min: (.+) max: (.+) mean: (.+) stdev: (.+) late: (.+) of 250""", flags=re.ASCII) + + # The following lines are output at startup and during a reset respectively: + startup_pattern = "starting up" + backoff_pattern = "Backoff" + + lines = [] + + # Check that the file exists and was last written to in the previous 2* 6s, + # otherwise assume the tool isn't running + if not os.path.exists(LOGFILE) or (time.time() - os.stat(LOGFILE).st_mtime) > 12: + num_late = None + t_min_period = None + t_max_period = None + else: + # Keep only the last NUM_LINES + + # Read the previously rotated logfile too to make sure we have enough data + for fname in [LOGFILE+ ".1", LOGFILE]: + if os.path.exists(fname): + with open(fname, "r") as fd: + for line in fd: + lines.append(line) + if len(lines) > NUM_LINES: + del lines[0] + + # Calculate min, max over the whole period, and sum the number of late + num_late = 0 + t_min_period = None + t_max_period = None + num_statistics = 0 + + for line in lines: + if startup_pattern in line: + num_late += 250 + elif backoff_pattern in line: + num_late += 250 + else: + match = re_logline.search(line) + if match: + num_statistics += 1 + t_min = float(match.group(1)) + t_max = float(match.group(2)) + t_mean = float(match.group(3)) + stdev = float(match.group(4)) + late = int(match.group(5)) + + if t_min_period is None or t_min < t_min_period: + t_min_period = t_min + + if t_max_period is None or t_max > t_max_period: + t_max_period = t_max + + if num_late is None: + num_late = 0 + + num_late += late + + # The min can take extremely low values, we clamp it here to -6 seconds + # to keep the graph readable + if t_min_period is not None and t_min_period < -6000: + t_min_period = -6000 + + return num_late, round(t_min_period) if t_min_period is not None else None, round(t_max_period) if t_max_period is not None else None + +def muninify(value): + """ According to http://guide.munin-monitoring.org/en/latest/develop/plugins/plugin-concise.html#plugin-concise + "If the plugin - for any reason - has no value to report, then it may send the value U for undefined." + """ + return 'U' if value is None else value + +# No arguments means that munin wants values +if len(sys.argv) == 1: + num_late, t_min, t_max = parse_logs() + + munin_values = "multigraph wait_time_zmq2edi\n" + munin_values += "high.value {}\n".format(muninify(t_max)) + munin_values += "low.value {}\n".format(muninify(t_min)) + + munin_values += "multigraph late_packets_zmq2edi\n" + munin_values += "late.value {}\n".format(muninify(num_late)) + print(munin_values) + +elif len(sys.argv) == 2 and sys.argv[1] == "config": + print(munin_config) +else: + sys.exit(1) diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index e4a51b4..470b3ba 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -103,7 +103,10 @@ std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const auto ts = chrono::system_clock::from_time_t(to_unix_epoch()); // PPS offset in seconds = tsta / 16384000 - ts += chrono::nanoseconds(std::lrint(tsta / 0.016384)); + // We cannot use nanosecond resolution because not all platforms use a + // system_clock that has nanosecond precision. It's not really important, + // as this function is only used for debugging. + ts += chrono::microseconds(std::lrint(tsta / 16.384)); return ts; } @@ -239,7 +242,9 @@ decode_state_t TagDispatcher::decode_afpacket( return {false, 0}; } - if (m_last_seq + (uint16_t)1 != seq) { + // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional + const uint16_t expected_seq = m_last_seq + 1; + if (expected_seq != seq) { etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; } m_last_seq = seq; @@ -307,15 +312,17 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload) uint32_t taglength = read_32b(payload.begin() + i + 4); if (taglength % 8 != 0) { - etiLog.log(warn, "Invalid tag length: not multiple of 8!"); + etiLog.log(warn, "Invalid EDI tag length, not multiple of 8!"); break; } taglength /= 8; length = taglength; - if (i + 8 + taglength >= payload.size()) { - etiLog.log(warn, "Invalid tag length: tag larger than tagpacket!"); + const size_t calculated_length = i + 8 + taglength; + if (calculated_length > payload.size()) { + etiLog.log(warn, "Invalid EDI tag length: tag larger %zu than tagpacket %zu!", + calculated_length, payload.size()); break; } diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 603f514..305653b 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -369,7 +369,7 @@ int ZmqMPEG::readFromSocket(size_t framesize) if ( msg.size() >= sizeof(zmq_frame_header_t) and msg.size() == ZMQ_FRAME_SIZE(frame) and frame->version == 1 and - frame->encoder == ZMQ_ENCODER_TOOLAME) { + frame->encoder == ZMQ_ENCODER_MPEG_L2) { datalen = frame->datasize; data = ZMQ_FRAME_DATA(frame); @@ -439,7 +439,7 @@ int ZmqAAC::readFromSocket(size_t framesize) if ( msg.size() >= sizeof(zmq_frame_header_t) and msg.size() == ZMQ_FRAME_SIZE(frame) and frame->version == 1 and - frame->encoder == ZMQ_ENCODER_FDK) { + frame->encoder == ZMQ_ENCODER_AACPLUS) { datalen = frame->datasize; data = ZMQ_FRAME_DATA(frame); diff --git a/src/input/Zmq.h b/src/input/Zmq.h index f4992f1..c101da0 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -119,8 +119,8 @@ struct dab_input_zmq_config_t std::string curve_encoder_keyfile; }; -#define ZMQ_ENCODER_FDK 1 -#define ZMQ_ENCODER_TOOLAME 2 +#define ZMQ_ENCODER_AACPLUS 1 +#define ZMQ_ENCODER_MPEG_L2 2 /* This defines the on-wire representation of a ZMQ message header. * diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 2188f8a..4a70105 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,6 +31,7 @@ #include <numeric> #include <map> #include <algorithm> +#include <limits> using namespace std; @@ -47,8 +48,7 @@ EDISender::~EDISender() } } -void EDISender::start(const edi::configuration_t& conf, - int delay_ms, bool drop_late_packets) +void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets) { edi_conf = conf; tist_delay_ms = delay_ms; @@ -56,14 +56,13 @@ void EDISender::start(const edi::configuration_t& conf, edi_sender = make_shared<edi::Sender>(edi_conf); - startTime = std::chrono::steady_clock::now(); running.store(true); process_thread = thread(&EDISender::process, this); } -void EDISender::push_frame(const frame_t& frame) +void EDISender::push_frame(frame_t&& frame) { - frames.push(frame); + frames.push(move(frame)); } void EDISender::print_configuration() @@ -76,8 +75,10 @@ void EDISender::print_configuration() } } -void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) +void EDISender::send_eti_frame(frame_t& frame) { + uint8_t *p = frame.data.data(); + edi::TagDETI edi_tagDETI; edi::TagStarPTR edi_tagStarPtr("DETI"); map<int, edi::TagESTn> edi_subchannelToTag; @@ -88,12 +89,12 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) edi_tagDETI.stat = p[0]; // LIDATA FCT - edi_tagDETI.dlfc = metadata.dlfc; + edi_tagDETI.dlfc = frame.metadata.dlfc; const int fct = p[4]; - if (metadata.dlfc % 250 != fct) { + if (frame.metadata.dlfc % 250 != fct) { etiLog.level(warn) << "Frame FCT=" << fct << - " does not correspond to DLFC=" << metadata.dlfc; + " does not correspond to DLFC=" << frame.metadata.dlfc; } bool ficf = (p[5] & 0x80) >> 7; @@ -182,25 +183,32 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0)); const auto t_frame = system_clock::from_time_t( - metadata.edi_time + posix_timestamp_1_jan_2000 - metadata.utc_offset) + pps_offset; + frame.metadata.edi_time + posix_timestamp_1_jan_2000 - frame.metadata.utc_offset) + pps_offset; const auto t_release = t_frame + milliseconds(tist_delay_ms); const auto t_now = system_clock::now(); - const auto wait_time = t_release - t_now; - wait_times.push_back(duration_cast<microseconds>(wait_time).count()); + const bool late = t_release < t_now; + + buffering_stat_t stat; + stat.late = late; - if (t_release > t_now) { + if (not late) { + const auto wait_time = t_release - t_now; std::this_thread::sleep_for(wait_time); } - else if (drop_late) { + + stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - frame.received_at).count(); + buffering_stats.push_back(std::move(stat)); + + if (late and drop_late) { return; } edi_tagDETI.tsta = tist; edi_tagDETI.atstf = 1; - edi_tagDETI.utco = metadata.utc_offset; - edi_tagDETI.seconds = metadata.edi_time; + edi_tagDETI.utco = frame.metadata.utc_offset; + edi_tagDETI.seconds = frame.metadata.edi_time; if (edi_sender and edi_conf.enabled()) { // put tags *ptr, DETI and all subchannels into one TagPacket @@ -221,53 +229,69 @@ void EDISender::process() frame_t frame; frames.wait_and_pop(frame); - if (not running.load() or frame.first.empty()) { + if (not running.load() or frame.data.empty()) { break; } - if (frame.first.size() == 6144) { - send_eti_frame(frame.first.data(), frame.second); + if (frame.data.size() == 6144) { + send_eti_frame(frame); } else { etiLog.level(warn) << "Ignoring short ETI frame, " - "DFLC=" << frame.second.dlfc << ", len=" << - frame.first.size(); + "DFLC=" << frame.metadata.dlfc << ", len=" << + frame.data.size(); } - if (wait_times.size() == 250) { // every six seconds - const double n = wait_times.size(); + if (buffering_stats.size() == 250) { // every six seconds + const double n = buffering_stats.size(); - double sum = accumulate(wait_times.begin(), wait_times.end(), 0); - size_t num_late = std::count_if(wait_times.begin(), wait_times.end(), - [](double v){ return v < 0; }); + size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(), + [](const buffering_stat_t& s){ return s.late; }); + + double sum = 0.0; + double min = std::numeric_limits<double>::max(); + double max = -std::numeric_limits<double>::max(); + for (const auto& s : buffering_stats) { + // convert to milliseconds + const double t = s.buffering_time_us / 1000.0; + sum += t; + + if (t < min) { + min = t; + } + + if (t > max) { + max = t; + } + } double mean = sum / n; double sq_sum = 0; - for (const auto t : wait_times) { + for (const auto& s : buffering_stats) { + const double t = s.buffering_time_us / 1000.0; sq_sum += (t-mean) * (t-mean); } double stdev = sqrt(sq_sum / n); - auto min_max = minmax_element(wait_times.begin(), wait_times.end()); /* Debug code stringstream ss; ss << "times:"; - for (const auto t : wait_times) { - ss << " " << t; + for (const auto t : buffering_stats) { + ss << " " << lrint(t.buffering_time_us / 1000.0); } etiLog.level(debug) << ss.str(); - */ + // */ - etiLog.level(info) << "Wait time statistics [microseconds]:" - " min: " << *min_max.first << - " max: " << *min_max.second << + etiLog.level(info) << "Buffering time statistics [milliseconds]:" + " min: " << min << + " max: " << max << " mean: " << mean << " stdev: " << stdev << " late: " << - num_late << " of " << wait_times.size() << " (" << + num_late << " of " << buffering_stats.size() << " (" << num_late * 100.0 / n << "%)"; - wait_times.clear(); + buffering_stats.clear(); } } } diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index 3525b4b..c953563 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -45,7 +45,11 @@ struct metadata_t { uint16_t dlfc; }; -using frame_t = std::pair<std::vector<uint8_t>, metadata_t>; +struct frame_t { + std::vector<uint8_t> data; + metadata_t metadata; + std::chrono::steady_clock::time_point received_at; +}; class EDISender { public: @@ -55,11 +59,11 @@ class EDISender { ~EDISender(); void start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets); - void push_frame(const frame_t& frame); + void push_frame(frame_t&& frame); void print_configuration(void); private: - void send_eti_frame(uint8_t* p, metadata_t metadata); + void send_eti_frame(frame_t& frame); void process(void); int tist_delay_ms; @@ -67,13 +71,15 @@ class EDISender { std::atomic<bool> running; std::thread process_thread; edi::configuration_t edi_conf; - std::chrono::steady_clock::time_point startTime; ThreadsafeQueue<frame_t> frames; std::shared_ptr<edi::Sender> edi_sender; - // For statistics about wait time before we transmit packets, - // in microseconds - std::vector<double> wait_times; + struct buffering_stat_t { + // Time between when we received the packets and when we transmit packets, in microseconds + double buffering_time_us = 0.0; + bool late = false; + }; + std::vector<buffering_stat_t> buffering_stats; }; diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index f7d733c..20ddcbe 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -59,6 +59,8 @@ static void usage() cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; cerr << " Negative delay values are also allowed." << endl; + cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl; + cerr << " This is useful for checking that NTP is properly synchronised" << endl; cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; cerr << " -p <destination port> Set the destination port." << endl; cerr << " -P Disable PFT and send AFPackets." << endl; @@ -222,6 +224,8 @@ static void parse_destination_args(char option) } } +class FCTDiscontinuity { }; + int start(int argc, char **argv) { edi_conf.enable_pft = true; @@ -234,13 +238,17 @@ int start(int argc, char **argv) int delay_ms = 500; bool drop_late_packets = false; uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; + std::string startupcheck; int ch = 0; while (ch != -1) { - ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh"); + ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xh"); switch (ch) { case -1: break; + case 'C': + startupcheck = optarg; + break; case 'd': case 's': case 'S': @@ -298,6 +306,25 @@ int start(int argc, char **argv) } } + if (not startupcheck.empty()) { + etiLog.level(info) << "Running startup check '" << startupcheck << "'"; + int wstatus = system(startupcheck.c_str()); + + if (WIFEXITED(wstatus)) { + if (WEXITSTATUS(wstatus) == 0) { + etiLog.level(info) << "Startup check ok"; + } + else { + etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus); + return 1; + } + } + else { + etiLog.level(error) << "Startup check failed, child didn't terminate normally"; + return 1; + } + } + add_edi_destination(); if (optind >= argc) { @@ -325,88 +352,100 @@ int start(int argc, char **argv) zmq::context_t zmq_ctx(1); etiLog.level(info) << "Opening ZMQ input: " << source_url; - size_t num_consecutive_resets = 0; while (true) { zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); zmq_sock.connect(source_url); zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages size_t error_count = 0; + int previous_fct = -1; + + try { + while (error_count < MAX_ERROR_COUNT) { + zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); + if (num_events == 0) { // timeout + error_count++; + } + else { + // Event received: recv will not block + zmq_sock.recv(&incoming); + const auto received_at = std::chrono::steady_clock::now(); - while (error_count < MAX_ERROR_COUNT) { - zmq::message_t incoming; - zmq::pollitem_t items[1]; - items[0].socket = zmq_sock; - items[0].events = ZMQ_POLLIN; - const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); - if (num_events == 0) { // timeout - error_count++; - } - else { - num_consecutive_resets = 0; + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - // Event received: recv will not block - zmq_sock.recv(&incoming); + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + error_count++; + } - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; - error_count++; - } + std::vector<frame_t> all_frames; + all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { + etiLog.level(error) << "ZeroMQ buffer " << i << + " has invalid length " << dab_msg->buflen[i]; + error_count++; + } + else { + frame_t frame; + frame.data.resize(6144, 0x55); + frame.received_at = received_at; - std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + const int framesize = dab_msg->buflen[i]; - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { - etiLog.level(error) << "ZeroMQ buffer " << i << - " has invalid length " << dab_msg->buflen[i]; - error_count++; - } - else { - std::vector<uint8_t> buf(6144, 0x55); + memcpy(frame.data.data(), + ((uint8_t*)incoming.data()) + offset, + framesize); - const int framesize = dab_msg->buflen[i]; + const int fct = frame.data[4]; - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); + const int expected_fct = (previous_fct + 1) % 250; + if (previous_fct != -1 and expected_fct != fct) { + etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct; + throw FCTDiscontinuity(); + } + previous_fct = fct; - all_frames.emplace_back( - std::piecewise_construct, - std::make_tuple(std::move(buf)), - std::make_tuple()); + all_frames.push_back(std::move(frame)); - offset += framesize; + offset += framesize; + } } - } - for (auto &f : all_frames) { - size_t consumed_bytes = 0; + for (auto &f : all_frames) { + size_t consumed_bytes = 0; - f.second = get_md_one_frame( - static_cast<uint8_t*>(incoming.data()) + offset, - incoming.size() - offset, - &consumed_bytes); + f.metadata = get_md_one_frame( + static_cast<uint8_t*>(incoming.data()) + offset, + incoming.size() - offset, + &consumed_bytes); - offset += consumed_bytes; - } + offset += consumed_bytes; + } - for (auto &f : all_frames) { - edisender.push_frame(f); + for (auto &f : all_frames) { + edisender.push_frame(std::move(f)); + } } } - } - num_consecutive_resets++; + etiLog.level(info) << "Backoff " << backoff_after_reset_ms << + "ms due to ZMQ input (" << source_url << ") timeout"; + } + catch (const FCTDiscontinuity&) { + etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity"; + } zmq_sock.close(); std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); - etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << - num_consecutive_resets << " consecutive resets."; } return 0; @@ -423,12 +462,20 @@ int main(int argc, char **argv) #endif " starting up"; + int ret = 1; + try { - return start(argc, argv); + ret = start(argc, argv); + + // To make sure things get printed to stderr + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + catch (const std::runtime_error &e) { + etiLog.level(error) << "Runtime error: " << e.what(); } - catch (std::runtime_error &e) { - etiLog.level(error) << "Error: " << e.what(); + catch (const std::logic_error &e) { + etiLog.level(error) << "Logic error! " << e.what(); } - return 1; + return ret; } |