aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/stats_zmq2edi_munin.py148
-rw-r--r--lib/edi/common.cpp17
-rw-r--r--src/input/Zmq.cpp4
-rw-r--r--src/input/Zmq.h4
-rw-r--r--src/zmq2edi/EDISender.cpp98
-rw-r--r--src/zmq2edi/EDISender.h22
-rw-r--r--src/zmq2edi/zmq2edi.cpp167
7 files changed, 346 insertions, 114 deletions
diff --git a/doc/stats_zmq2edi_munin.py b/doc/stats_zmq2edi_munin.py
new file mode 100644
index 0000000..cf84afc
--- /dev/null
+++ b/doc/stats_zmq2edi_munin.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python3
+#
+# A munin plugin for ODR-ZMQ2EDI
+#
+# Reads the logfile, and the previously rotated logfile (suffixed by .1) and
+# analyses the output. Generates a graph with percentage of frames late, and a
+# graph with min/max wait time.
+#
+# Copy this to /etc/munin/plugins/stats_zmq2edi_munin
+# and make it executable (chmod +x)
+#
+# Then make sure that zmq2edi log output gets written to LOGFILE below,
+# and setup up a logrotate script to rotate the log. The rotated log
+# filename must be appended with .1
+
+# Every six seconds a line is output. We are polled in 5 min = 300s intervals
+NUM_LINES = int(300 / 6)
+LOGFILE = "/var/log/supervisor/zmq2edi.log"
+
+import time
+import sys
+import os
+import re
+
+munin_config = """
+multigraph wait_time_zmq2edi
+graph_title zmq2edi wait_time
+graph_order high low
+graph_args --base 1000
+graph_vlabel max/min wait times during last ${graph_period}
+graph_category zmq2edi
+graph_info This graph shows the min and max wait times
+
+high.info Max wait time
+high.label Max wait time ms
+high.min 0
+high.warning 1:
+low.info Min wait time
+low.label Min wait time ms
+low.min -6000
+low.warning 1:
+
+multigraph late_packets_zmq2edi
+graph_title EDI packets delivered too late
+graph_order late
+graph_args --base 1000
+graph_vlabel late packets during last ${graph_period}
+graph_category zmq2edi
+graph_info This graph shows the number late EDI packets (250 packets = 6 seconds)
+
+late.info Number of late packets
+late.label Number of late packets
+late.min 0
+late.max %s
+late.warning 0:0
+""" % (NUM_LINES * 250,)
+
+def parse_logs():
+ # example lines:
+ # Buffering time statistics [milliseconds]: min: 907.799 max: 981.409 mean: 944.335 stdev: 26.827 late: 0 of 250 (0%)
+ # Values might also be in scientific form, e.g. -1.80938e+07
+ re_logline = re.compile(r"""Buffering time statistics.* min: (.+) max: (.+) mean: (.+) stdev: (.+) late: (.+) of 250""", flags=re.ASCII)
+
+ # The following lines are output at startup and during a reset respectively:
+ startup_pattern = "starting up"
+ backoff_pattern = "Backoff"
+
+ lines = []
+
+ # Check that the file exists and was last written to in the previous 2* 6s,
+ # otherwise assume the tool isn't running
+ if not os.path.exists(LOGFILE) or (time.time() - os.stat(LOGFILE).st_mtime) > 12:
+ num_late = None
+ t_min_period = None
+ t_max_period = None
+ else:
+ # Keep only the last NUM_LINES
+
+ # Read the previously rotated logfile too to make sure we have enough data
+ for fname in [LOGFILE+ ".1", LOGFILE]:
+ if os.path.exists(fname):
+ with open(fname, "r") as fd:
+ for line in fd:
+ lines.append(line)
+ if len(lines) > NUM_LINES:
+ del lines[0]
+
+ # Calculate min, max over the whole period, and sum the number of late
+ num_late = 0
+ t_min_period = None
+ t_max_period = None
+ num_statistics = 0
+
+ for line in lines:
+ if startup_pattern in line:
+ num_late += 250
+ elif backoff_pattern in line:
+ num_late += 250
+ else:
+ match = re_logline.search(line)
+ if match:
+ num_statistics += 1
+ t_min = float(match.group(1))
+ t_max = float(match.group(2))
+ t_mean = float(match.group(3))
+ stdev = float(match.group(4))
+ late = int(match.group(5))
+
+ if t_min_period is None or t_min < t_min_period:
+ t_min_period = t_min
+
+ if t_max_period is None or t_max > t_max_period:
+ t_max_period = t_max
+
+ if num_late is None:
+ num_late = 0
+
+ num_late += late
+
+ # The min can take extremely low values, we clamp it here to -6 seconds
+ # to keep the graph readable
+ if t_min_period is not None and t_min_period < -6000:
+ t_min_period = -6000
+
+ return num_late, round(t_min_period) if t_min_period is not None else None, round(t_max_period) if t_max_period is not None else None
+
+def muninify(value):
+ """ According to http://guide.munin-monitoring.org/en/latest/develop/plugins/plugin-concise.html#plugin-concise
+ "If the plugin - for any reason - has no value to report, then it may send the value U for undefined."
+ """
+ return 'U' if value is None else value
+
+# No arguments means that munin wants values
+if len(sys.argv) == 1:
+ num_late, t_min, t_max = parse_logs()
+
+ munin_values = "multigraph wait_time_zmq2edi\n"
+ munin_values += "high.value {}\n".format(muninify(t_max))
+ munin_values += "low.value {}\n".format(muninify(t_min))
+
+ munin_values += "multigraph late_packets_zmq2edi\n"
+ munin_values += "late.value {}\n".format(muninify(num_late))
+ print(munin_values)
+
+elif len(sys.argv) == 2 and sys.argv[1] == "config":
+ print(munin_config)
+else:
+ sys.exit(1)
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index e4a51b4..470b3ba 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -103,7 +103,10 @@ std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const
auto ts = chrono::system_clock::from_time_t(to_unix_epoch());
// PPS offset in seconds = tsta / 16384000
- ts += chrono::nanoseconds(std::lrint(tsta / 0.016384));
+ // We cannot use nanosecond resolution because not all platforms use a
+ // system_clock that has nanosecond precision. It's not really important,
+ // as this function is only used for debugging.
+ ts += chrono::microseconds(std::lrint(tsta / 16.384));
return ts;
}
@@ -239,7 +242,9 @@ decode_state_t TagDispatcher::decode_afpacket(
return {false, 0};
}
- if (m_last_seq + (uint16_t)1 != seq) {
+ // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional
+ const uint16_t expected_seq = m_last_seq + 1;
+ if (expected_seq != seq) {
etiLog.level(warn) << "EDI AF Packet sequence error, " << seq;
}
m_last_seq = seq;
@@ -307,15 +312,17 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
uint32_t taglength = read_32b(payload.begin() + i + 4);
if (taglength % 8 != 0) {
- etiLog.log(warn, "Invalid tag length: not multiple of 8!");
+ etiLog.log(warn, "Invalid EDI tag length, not multiple of 8!");
break;
}
taglength /= 8;
length = taglength;
- if (i + 8 + taglength >= payload.size()) {
- etiLog.log(warn, "Invalid tag length: tag larger than tagpacket!");
+ const size_t calculated_length = i + 8 + taglength;
+ if (calculated_length > payload.size()) {
+ etiLog.log(warn, "Invalid EDI tag length: tag larger %zu than tagpacket %zu!",
+ calculated_length, payload.size());
break;
}
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index 603f514..305653b 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -369,7 +369,7 @@ int ZmqMPEG::readFromSocket(size_t framesize)
if ( msg.size() >= sizeof(zmq_frame_header_t) and
msg.size() == ZMQ_FRAME_SIZE(frame) and
frame->version == 1 and
- frame->encoder == ZMQ_ENCODER_TOOLAME) {
+ frame->encoder == ZMQ_ENCODER_MPEG_L2) {
datalen = frame->datasize;
data = ZMQ_FRAME_DATA(frame);
@@ -439,7 +439,7 @@ int ZmqAAC::readFromSocket(size_t framesize)
if ( msg.size() >= sizeof(zmq_frame_header_t) and
msg.size() == ZMQ_FRAME_SIZE(frame) and
frame->version == 1 and
- frame->encoder == ZMQ_ENCODER_FDK) {
+ frame->encoder == ZMQ_ENCODER_AACPLUS) {
datalen = frame->datasize;
data = ZMQ_FRAME_DATA(frame);
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
index f4992f1..c101da0 100644
--- a/src/input/Zmq.h
+++ b/src/input/Zmq.h
@@ -119,8 +119,8 @@ struct dab_input_zmq_config_t
std::string curve_encoder_keyfile;
};
-#define ZMQ_ENCODER_FDK 1
-#define ZMQ_ENCODER_TOOLAME 2
+#define ZMQ_ENCODER_AACPLUS 1
+#define ZMQ_ENCODER_MPEG_L2 2
/* This defines the on-wire representation of a ZMQ message header.
*
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 2188f8a..4a70105 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,6 +31,7 @@
#include <numeric>
#include <map>
#include <algorithm>
+#include <limits>
using namespace std;
@@ -47,8 +48,7 @@ EDISender::~EDISender()
}
}
-void EDISender::start(const edi::configuration_t& conf,
- int delay_ms, bool drop_late_packets)
+void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets)
{
edi_conf = conf;
tist_delay_ms = delay_ms;
@@ -56,14 +56,13 @@ void EDISender::start(const edi::configuration_t& conf,
edi_sender = make_shared<edi::Sender>(edi_conf);
- startTime = std::chrono::steady_clock::now();
running.store(true);
process_thread = thread(&EDISender::process, this);
}
-void EDISender::push_frame(const frame_t& frame)
+void EDISender::push_frame(frame_t&& frame)
{
- frames.push(frame);
+ frames.push(move(frame));
}
void EDISender::print_configuration()
@@ -76,8 +75,10 @@ void EDISender::print_configuration()
}
}
-void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
+void EDISender::send_eti_frame(frame_t& frame)
{
+ uint8_t *p = frame.data.data();
+
edi::TagDETI edi_tagDETI;
edi::TagStarPTR edi_tagStarPtr("DETI");
map<int, edi::TagESTn> edi_subchannelToTag;
@@ -88,12 +89,12 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
edi_tagDETI.stat = p[0];
// LIDATA FCT
- edi_tagDETI.dlfc = metadata.dlfc;
+ edi_tagDETI.dlfc = frame.metadata.dlfc;
const int fct = p[4];
- if (metadata.dlfc % 250 != fct) {
+ if (frame.metadata.dlfc % 250 != fct) {
etiLog.level(warn) << "Frame FCT=" << fct <<
- " does not correspond to DLFC=" << metadata.dlfc;
+ " does not correspond to DLFC=" << frame.metadata.dlfc;
}
bool ficf = (p[5] & 0x80) >> 7;
@@ -182,25 +183,32 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
const auto t_frame = system_clock::from_time_t(
- metadata.edi_time + posix_timestamp_1_jan_2000 - metadata.utc_offset) + pps_offset;
+ frame.metadata.edi_time + posix_timestamp_1_jan_2000 - frame.metadata.utc_offset) + pps_offset;
const auto t_release = t_frame + milliseconds(tist_delay_ms);
const auto t_now = system_clock::now();
- const auto wait_time = t_release - t_now;
- wait_times.push_back(duration_cast<microseconds>(wait_time).count());
+ const bool late = t_release < t_now;
+
+ buffering_stat_t stat;
+ stat.late = late;
- if (t_release > t_now) {
+ if (not late) {
+ const auto wait_time = t_release - t_now;
std::this_thread::sleep_for(wait_time);
}
- else if (drop_late) {
+
+ stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - frame.received_at).count();
+ buffering_stats.push_back(std::move(stat));
+
+ if (late and drop_late) {
return;
}
edi_tagDETI.tsta = tist;
edi_tagDETI.atstf = 1;
- edi_tagDETI.utco = metadata.utc_offset;
- edi_tagDETI.seconds = metadata.edi_time;
+ edi_tagDETI.utco = frame.metadata.utc_offset;
+ edi_tagDETI.seconds = frame.metadata.edi_time;
if (edi_sender and edi_conf.enabled()) {
// put tags *ptr, DETI and all subchannels into one TagPacket
@@ -221,53 +229,69 @@ void EDISender::process()
frame_t frame;
frames.wait_and_pop(frame);
- if (not running.load() or frame.first.empty()) {
+ if (not running.load() or frame.data.empty()) {
break;
}
- if (frame.first.size() == 6144) {
- send_eti_frame(frame.first.data(), frame.second);
+ if (frame.data.size() == 6144) {
+ send_eti_frame(frame);
}
else {
etiLog.level(warn) << "Ignoring short ETI frame, "
- "DFLC=" << frame.second.dlfc << ", len=" <<
- frame.first.size();
+ "DFLC=" << frame.metadata.dlfc << ", len=" <<
+ frame.data.size();
}
- if (wait_times.size() == 250) { // every six seconds
- const double n = wait_times.size();
+ if (buffering_stats.size() == 250) { // every six seconds
+ const double n = buffering_stats.size();
- double sum = accumulate(wait_times.begin(), wait_times.end(), 0);
- size_t num_late = std::count_if(wait_times.begin(), wait_times.end(),
- [](double v){ return v < 0; });
+ size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(),
+ [](const buffering_stat_t& s){ return s.late; });
+
+ double sum = 0.0;
+ double min = std::numeric_limits<double>::max();
+ double max = -std::numeric_limits<double>::max();
+ for (const auto& s : buffering_stats) {
+ // convert to milliseconds
+ const double t = s.buffering_time_us / 1000.0;
+ sum += t;
+
+ if (t < min) {
+ min = t;
+ }
+
+ if (t > max) {
+ max = t;
+ }
+ }
double mean = sum / n;
double sq_sum = 0;
- for (const auto t : wait_times) {
+ for (const auto& s : buffering_stats) {
+ const double t = s.buffering_time_us / 1000.0;
sq_sum += (t-mean) * (t-mean);
}
double stdev = sqrt(sq_sum / n);
- auto min_max = minmax_element(wait_times.begin(), wait_times.end());
/* Debug code
stringstream ss;
ss << "times:";
- for (const auto t : wait_times) {
- ss << " " << t;
+ for (const auto t : buffering_stats) {
+ ss << " " << lrint(t.buffering_time_us / 1000.0);
}
etiLog.level(debug) << ss.str();
- */
+ // */
- etiLog.level(info) << "Wait time statistics [microseconds]:"
- " min: " << *min_max.first <<
- " max: " << *min_max.second <<
+ etiLog.level(info) << "Buffering time statistics [milliseconds]:"
+ " min: " << min <<
+ " max: " << max <<
" mean: " << mean <<
" stdev: " << stdev <<
" late: " <<
- num_late << " of " << wait_times.size() << " (" <<
+ num_late << " of " << buffering_stats.size() << " (" <<
num_late * 100.0 / n << "%)";
- wait_times.clear();
+ buffering_stats.clear();
}
}
}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index 3525b4b..c953563 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -45,7 +45,11 @@ struct metadata_t {
uint16_t dlfc;
};
-using frame_t = std::pair<std::vector<uint8_t>, metadata_t>;
+struct frame_t {
+ std::vector<uint8_t> data;
+ metadata_t metadata;
+ std::chrono::steady_clock::time_point received_at;
+};
class EDISender {
public:
@@ -55,11 +59,11 @@ class EDISender {
~EDISender();
void start(const edi::configuration_t& conf,
int delay_ms, bool drop_late_packets);
- void push_frame(const frame_t& frame);
+ void push_frame(frame_t&& frame);
void print_configuration(void);
private:
- void send_eti_frame(uint8_t* p, metadata_t metadata);
+ void send_eti_frame(frame_t& frame);
void process(void);
int tist_delay_ms;
@@ -67,13 +71,15 @@ class EDISender {
std::atomic<bool> running;
std::thread process_thread;
edi::configuration_t edi_conf;
- std::chrono::steady_clock::time_point startTime;
ThreadsafeQueue<frame_t> frames;
std::shared_ptr<edi::Sender> edi_sender;
- // For statistics about wait time before we transmit packets,
- // in microseconds
- std::vector<double> wait_times;
+ struct buffering_stat_t {
+ // Time between when we received the packets and when we transmit packets, in microseconds
+ double buffering_time_us = 0.0;
+ bool late = false;
+ };
+ std::vector<buffering_stat_t> buffering_stats;
};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index f7d733c..20ddcbe 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -59,6 +59,8 @@ static void usage()
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
cerr << " Negative delay values are also allowed." << endl;
+ cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
+ cerr << " This is useful for checking that NTP is properly synchronised" << endl;
cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
cerr << " -p <destination port> Set the destination port." << endl;
cerr << " -P Disable PFT and send AFPackets." << endl;
@@ -222,6 +224,8 @@ static void parse_destination_args(char option)
}
}
+class FCTDiscontinuity { };
+
int start(int argc, char **argv)
{
edi_conf.enable_pft = true;
@@ -234,13 +238,17 @@ int start(int argc, char **argv)
int delay_ms = 500;
bool drop_late_packets = false;
uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
+ std::string startupcheck;
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh");
+ ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xh");
switch (ch) {
case -1:
break;
+ case 'C':
+ startupcheck = optarg;
+ break;
case 'd':
case 's':
case 'S':
@@ -298,6 +306,25 @@ int start(int argc, char **argv)
}
}
+ if (not startupcheck.empty()) {
+ etiLog.level(info) << "Running startup check '" << startupcheck << "'";
+ int wstatus = system(startupcheck.c_str());
+
+ if (WIFEXITED(wstatus)) {
+ if (WEXITSTATUS(wstatus) == 0) {
+ etiLog.level(info) << "Startup check ok";
+ }
+ else {
+ etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus);
+ return 1;
+ }
+ }
+ else {
+ etiLog.level(error) << "Startup check failed, child didn't terminate normally";
+ return 1;
+ }
+ }
+
add_edi_destination();
if (optind >= argc) {
@@ -325,88 +352,100 @@ int start(int argc, char **argv)
zmq::context_t zmq_ctx(1);
etiLog.level(info) << "Opening ZMQ input: " << source_url;
- size_t num_consecutive_resets = 0;
while (true) {
zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
zmq_sock.connect(source_url);
zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
size_t error_count = 0;
+ int previous_fct = -1;
+
+ try {
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
+ error_count++;
+ }
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
+ const auto received_at = std::chrono::steady_clock::now();
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- num_consecutive_resets = 0;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ error_count++;
+ }
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
- error_count++;
- }
+ std::vector<frame_t> all_frames;
+ all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ frame_t frame;
+ frame.data.resize(6144, 0x55);
+ frame.received_at = received_at;
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ const int framesize = dab_msg->buflen[i];
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
- error_count++;
- }
- else {
- std::vector<uint8_t> buf(6144, 0x55);
+ memcpy(frame.data.data(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- const int framesize = dab_msg->buflen[i];
+ const int fct = frame.data[4];
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ const int expected_fct = (previous_fct + 1) % 250;
+ if (previous_fct != -1 and expected_fct != fct) {
+ etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct;
+ throw FCTDiscontinuity();
+ }
+ previous_fct = fct;
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ all_frames.push_back(std::move(frame));
- offset += framesize;
+ offset += framesize;
+ }
}
- }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.metadata = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
+ for (auto &f : all_frames) {
+ edisender.push_frame(std::move(f));
+ }
}
}
- }
- num_consecutive_resets++;
+ etiLog.level(info) << "Backoff " << backoff_after_reset_ms <<
+ "ms due to ZMQ input (" << source_url << ") timeout";
+ }
+ catch (const FCTDiscontinuity&) {
+ etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity";
+ }
zmq_sock.close();
std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms));
- etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
- num_consecutive_resets << " consecutive resets.";
}
return 0;
@@ -423,12 +462,20 @@ int main(int argc, char **argv)
#endif
" starting up";
+ int ret = 1;
+
try {
- return start(argc, argv);
+ ret = start(argc, argv);
+
+ // To make sure things get printed to stderr
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ }
+ catch (const std::runtime_error &e) {
+ etiLog.level(error) << "Runtime error: " << e.what();
}
- catch (std::runtime_error &e) {
- etiLog.level(error) << "Error: " << e.what();
+ catch (const std::logic_error &e) {
+ etiLog.level(error) << "Logic error! " << e.what();
}
- return 1;
+ return ret;
}