diff options
-rw-r--r-- | src/zmq2edi/EDISender.cpp | 98 | ||||
-rw-r--r-- | src/zmq2edi/EDISender.h | 22 | ||||
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 21 |
3 files changed, 86 insertions, 55 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 2188f8a..4a70105 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,6 +31,7 @@ #include <numeric> #include <map> #include <algorithm> +#include <limits> using namespace std; @@ -47,8 +48,7 @@ EDISender::~EDISender() } } -void EDISender::start(const edi::configuration_t& conf, - int delay_ms, bool drop_late_packets) +void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets) { edi_conf = conf; tist_delay_ms = delay_ms; @@ -56,14 +56,13 @@ void EDISender::start(const edi::configuration_t& conf, edi_sender = make_shared<edi::Sender>(edi_conf); - startTime = std::chrono::steady_clock::now(); running.store(true); process_thread = thread(&EDISender::process, this); } -void EDISender::push_frame(const frame_t& frame) +void EDISender::push_frame(frame_t&& frame) { - frames.push(frame); + frames.push(move(frame)); } void EDISender::print_configuration() @@ -76,8 +75,10 @@ void EDISender::print_configuration() } } -void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) +void EDISender::send_eti_frame(frame_t& frame) { + uint8_t *p = frame.data.data(); + edi::TagDETI edi_tagDETI; edi::TagStarPTR edi_tagStarPtr("DETI"); map<int, edi::TagESTn> edi_subchannelToTag; @@ -88,12 +89,12 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) edi_tagDETI.stat = p[0]; // LIDATA FCT - edi_tagDETI.dlfc = metadata.dlfc; + edi_tagDETI.dlfc = frame.metadata.dlfc; const int fct = p[4]; - if (metadata.dlfc % 250 != fct) { + if (frame.metadata.dlfc % 250 != fct) { etiLog.level(warn) << "Frame FCT=" << fct << - " does not correspond to DLFC=" << metadata.dlfc; + " does not correspond to DLFC=" << frame.metadata.dlfc; } bool ficf = (p[5] & 0x80) >> 7; @@ -182,25 +183,32 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0)); const auto t_frame = system_clock::from_time_t( - metadata.edi_time + posix_timestamp_1_jan_2000 - metadata.utc_offset) + pps_offset; + frame.metadata.edi_time + posix_timestamp_1_jan_2000 - frame.metadata.utc_offset) + pps_offset; const auto t_release = t_frame + milliseconds(tist_delay_ms); const auto t_now = system_clock::now(); - const auto wait_time = t_release - t_now; - wait_times.push_back(duration_cast<microseconds>(wait_time).count()); + const bool late = t_release < t_now; + + buffering_stat_t stat; + stat.late = late; - if (t_release > t_now) { + if (not late) { + const auto wait_time = t_release - t_now; std::this_thread::sleep_for(wait_time); } - else if (drop_late) { + + stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - frame.received_at).count(); + buffering_stats.push_back(std::move(stat)); + + if (late and drop_late) { return; } edi_tagDETI.tsta = tist; edi_tagDETI.atstf = 1; - edi_tagDETI.utco = metadata.utc_offset; - edi_tagDETI.seconds = metadata.edi_time; + edi_tagDETI.utco = frame.metadata.utc_offset; + edi_tagDETI.seconds = frame.metadata.edi_time; if (edi_sender and edi_conf.enabled()) { // put tags *ptr, DETI and all subchannels into one TagPacket @@ -221,53 +229,69 @@ void EDISender::process() frame_t frame; frames.wait_and_pop(frame); - if (not running.load() or frame.first.empty()) { + if (not running.load() or frame.data.empty()) { break; } - if (frame.first.size() == 6144) { - send_eti_frame(frame.first.data(), frame.second); + if (frame.data.size() == 6144) { + send_eti_frame(frame); } else { etiLog.level(warn) << "Ignoring short ETI frame, " - "DFLC=" << frame.second.dlfc << ", len=" << - frame.first.size(); + "DFLC=" << frame.metadata.dlfc << ", len=" << + frame.data.size(); } - if (wait_times.size() == 250) { // every six seconds - const double n = wait_times.size(); + if (buffering_stats.size() == 250) { // every six seconds + const double n = buffering_stats.size(); - double sum = accumulate(wait_times.begin(), wait_times.end(), 0); - size_t num_late = std::count_if(wait_times.begin(), wait_times.end(), - [](double v){ return v < 0; }); + size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(), + [](const buffering_stat_t& s){ return s.late; }); + + double sum = 0.0; + double min = std::numeric_limits<double>::max(); + double max = -std::numeric_limits<double>::max(); + for (const auto& s : buffering_stats) { + // convert to milliseconds + const double t = s.buffering_time_us / 1000.0; + sum += t; + + if (t < min) { + min = t; + } + + if (t > max) { + max = t; + } + } double mean = sum / n; double sq_sum = 0; - for (const auto t : wait_times) { + for (const auto& s : buffering_stats) { + const double t = s.buffering_time_us / 1000.0; sq_sum += (t-mean) * (t-mean); } double stdev = sqrt(sq_sum / n); - auto min_max = minmax_element(wait_times.begin(), wait_times.end()); /* Debug code stringstream ss; ss << "times:"; - for (const auto t : wait_times) { - ss << " " << t; + for (const auto t : buffering_stats) { + ss << " " << lrint(t.buffering_time_us / 1000.0); } etiLog.level(debug) << ss.str(); - */ + // */ - etiLog.level(info) << "Wait time statistics [microseconds]:" - " min: " << *min_max.first << - " max: " << *min_max.second << + etiLog.level(info) << "Buffering time statistics [milliseconds]:" + " min: " << min << + " max: " << max << " mean: " << mean << " stdev: " << stdev << " late: " << - num_late << " of " << wait_times.size() << " (" << + num_late << " of " << buffering_stats.size() << " (" << num_late * 100.0 / n << "%)"; - wait_times.clear(); + buffering_stats.clear(); } } } diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index 3525b4b..c953563 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -45,7 +45,11 @@ struct metadata_t { uint16_t dlfc; }; -using frame_t = std::pair<std::vector<uint8_t>, metadata_t>; +struct frame_t { + std::vector<uint8_t> data; + metadata_t metadata; + std::chrono::steady_clock::time_point received_at; +}; class EDISender { public: @@ -55,11 +59,11 @@ class EDISender { ~EDISender(); void start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets); - void push_frame(const frame_t& frame); + void push_frame(frame_t&& frame); void print_configuration(void); private: - void send_eti_frame(uint8_t* p, metadata_t metadata); + void send_eti_frame(frame_t& frame); void process(void); int tist_delay_ms; @@ -67,13 +71,15 @@ class EDISender { std::atomic<bool> running; std::thread process_thread; edi::configuration_t edi_conf; - std::chrono::steady_clock::time_point startTime; ThreadsafeQueue<frame_t> frames; std::shared_ptr<edi::Sender> edi_sender; - // For statistics about wait time before we transmit packets, - // in microseconds - std::vector<double> wait_times; + struct buffering_stat_t { + // Time between when we received the packets and when we transmit packets, in microseconds + double buffering_time_us = 0.0; + bool late = false; + }; + std::vector<buffering_stat_t> buffering_stats; }; diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 885f01d..dacb573 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -347,6 +347,7 @@ int start(int argc, char **argv) // Event received: recv will not block zmq_sock.recv(&incoming); + const auto received_at = std::chrono::steady_clock::now(); zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); @@ -358,7 +359,8 @@ int start(int argc, char **argv) int offset = sizeof(dab_msg->version) + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + std::vector<frame_t> all_frames; + all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { @@ -367,18 +369,17 @@ int start(int argc, char **argv) error_count++; } else { - std::vector<uint8_t> buf(6144, 0x55); + frame_t frame; + frame.data.resize(6144, 0x55); + frame.received_at = received_at; const int framesize = dab_msg->buflen[i]; - memcpy(&buf.front(), + memcpy(frame.data.data(), ((uint8_t*)incoming.data()) + offset, framesize); - all_frames.emplace_back( - std::piecewise_construct, - std::make_tuple(std::move(buf)), - std::make_tuple()); + all_frames.push_back(std::move(frame)); offset += framesize; } @@ -387,7 +388,7 @@ int start(int argc, char **argv) for (auto &f : all_frames) { size_t consumed_bytes = 0; - f.second = get_md_one_frame( + f.metadata = get_md_one_frame( static_cast<uint8_t*>(incoming.data()) + offset, incoming.size() - offset, &consumed_bytes); @@ -396,7 +397,7 @@ int start(int argc, char **argv) } for (auto &f : all_frames) { - edisender.push_frame(f); + edisender.push_frame(std::move(f)); } } } |