aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2edi
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq2edi')
-rw-r--r--src/zmq2edi/EDISender.cpp98
-rw-r--r--src/zmq2edi/EDISender.h22
-rw-r--r--src/zmq2edi/zmq2edi.cpp21
3 files changed, 86 insertions, 55 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 2188f8a..4a70105 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,6 +31,7 @@
#include <numeric>
#include <map>
#include <algorithm>
+#include <limits>
using namespace std;
@@ -47,8 +48,7 @@ EDISender::~EDISender()
}
}
-void EDISender::start(const edi::configuration_t& conf,
- int delay_ms, bool drop_late_packets)
+void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets)
{
edi_conf = conf;
tist_delay_ms = delay_ms;
@@ -56,14 +56,13 @@ void EDISender::start(const edi::configuration_t& conf,
edi_sender = make_shared<edi::Sender>(edi_conf);
- startTime = std::chrono::steady_clock::now();
running.store(true);
process_thread = thread(&EDISender::process, this);
}
-void EDISender::push_frame(const frame_t& frame)
+void EDISender::push_frame(frame_t&& frame)
{
- frames.push(frame);
+ frames.push(move(frame));
}
void EDISender::print_configuration()
@@ -76,8 +75,10 @@ void EDISender::print_configuration()
}
}
-void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
+void EDISender::send_eti_frame(frame_t& frame)
{
+ uint8_t *p = frame.data.data();
+
edi::TagDETI edi_tagDETI;
edi::TagStarPTR edi_tagStarPtr("DETI");
map<int, edi::TagESTn> edi_subchannelToTag;
@@ -88,12 +89,12 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
edi_tagDETI.stat = p[0];
// LIDATA FCT
- edi_tagDETI.dlfc = metadata.dlfc;
+ edi_tagDETI.dlfc = frame.metadata.dlfc;
const int fct = p[4];
- if (metadata.dlfc % 250 != fct) {
+ if (frame.metadata.dlfc % 250 != fct) {
etiLog.level(warn) << "Frame FCT=" << fct <<
- " does not correspond to DLFC=" << metadata.dlfc;
+ " does not correspond to DLFC=" << frame.metadata.dlfc;
}
bool ficf = (p[5] & 0x80) >> 7;
@@ -182,25 +183,32 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
const auto t_frame = system_clock::from_time_t(
- metadata.edi_time + posix_timestamp_1_jan_2000 - metadata.utc_offset) + pps_offset;
+ frame.metadata.edi_time + posix_timestamp_1_jan_2000 - frame.metadata.utc_offset) + pps_offset;
const auto t_release = t_frame + milliseconds(tist_delay_ms);
const auto t_now = system_clock::now();
- const auto wait_time = t_release - t_now;
- wait_times.push_back(duration_cast<microseconds>(wait_time).count());
+ const bool late = t_release < t_now;
+
+ buffering_stat_t stat;
+ stat.late = late;
- if (t_release > t_now) {
+ if (not late) {
+ const auto wait_time = t_release - t_now;
std::this_thread::sleep_for(wait_time);
}
- else if (drop_late) {
+
+ stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - frame.received_at).count();
+ buffering_stats.push_back(std::move(stat));
+
+ if (late and drop_late) {
return;
}
edi_tagDETI.tsta = tist;
edi_tagDETI.atstf = 1;
- edi_tagDETI.utco = metadata.utc_offset;
- edi_tagDETI.seconds = metadata.edi_time;
+ edi_tagDETI.utco = frame.metadata.utc_offset;
+ edi_tagDETI.seconds = frame.metadata.edi_time;
if (edi_sender and edi_conf.enabled()) {
// put tags *ptr, DETI and all subchannels into one TagPacket
@@ -221,53 +229,69 @@ void EDISender::process()
frame_t frame;
frames.wait_and_pop(frame);
- if (not running.load() or frame.first.empty()) {
+ if (not running.load() or frame.data.empty()) {
break;
}
- if (frame.first.size() == 6144) {
- send_eti_frame(frame.first.data(), frame.second);
+ if (frame.data.size() == 6144) {
+ send_eti_frame(frame);
}
else {
etiLog.level(warn) << "Ignoring short ETI frame, "
- "DFLC=" << frame.second.dlfc << ", len=" <<
- frame.first.size();
+ "DFLC=" << frame.metadata.dlfc << ", len=" <<
+ frame.data.size();
}
- if (wait_times.size() == 250) { // every six seconds
- const double n = wait_times.size();
+ if (buffering_stats.size() == 250) { // every six seconds
+ const double n = buffering_stats.size();
- double sum = accumulate(wait_times.begin(), wait_times.end(), 0);
- size_t num_late = std::count_if(wait_times.begin(), wait_times.end(),
- [](double v){ return v < 0; });
+ size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(),
+ [](const buffering_stat_t& s){ return s.late; });
+
+ double sum = 0.0;
+ double min = std::numeric_limits<double>::max();
+ double max = -std::numeric_limits<double>::max();
+ for (const auto& s : buffering_stats) {
+ // convert to milliseconds
+ const double t = s.buffering_time_us / 1000.0;
+ sum += t;
+
+ if (t < min) {
+ min = t;
+ }
+
+ if (t > max) {
+ max = t;
+ }
+ }
double mean = sum / n;
double sq_sum = 0;
- for (const auto t : wait_times) {
+ for (const auto& s : buffering_stats) {
+ const double t = s.buffering_time_us / 1000.0;
sq_sum += (t-mean) * (t-mean);
}
double stdev = sqrt(sq_sum / n);
- auto min_max = minmax_element(wait_times.begin(), wait_times.end());
/* Debug code
stringstream ss;
ss << "times:";
- for (const auto t : wait_times) {
- ss << " " << t;
+ for (const auto t : buffering_stats) {
+ ss << " " << lrint(t.buffering_time_us / 1000.0);
}
etiLog.level(debug) << ss.str();
- */
+ // */
- etiLog.level(info) << "Wait time statistics [microseconds]:"
- " min: " << *min_max.first <<
- " max: " << *min_max.second <<
+ etiLog.level(info) << "Buffering time statistics [milliseconds]:"
+ " min: " << min <<
+ " max: " << max <<
" mean: " << mean <<
" stdev: " << stdev <<
" late: " <<
- num_late << " of " << wait_times.size() << " (" <<
+ num_late << " of " << buffering_stats.size() << " (" <<
num_late * 100.0 / n << "%)";
- wait_times.clear();
+ buffering_stats.clear();
}
}
}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index 3525b4b..c953563 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -45,7 +45,11 @@ struct metadata_t {
uint16_t dlfc;
};
-using frame_t = std::pair<std::vector<uint8_t>, metadata_t>;
+struct frame_t {
+ std::vector<uint8_t> data;
+ metadata_t metadata;
+ std::chrono::steady_clock::time_point received_at;
+};
class EDISender {
public:
@@ -55,11 +59,11 @@ class EDISender {
~EDISender();
void start(const edi::configuration_t& conf,
int delay_ms, bool drop_late_packets);
- void push_frame(const frame_t& frame);
+ void push_frame(frame_t&& frame);
void print_configuration(void);
private:
- void send_eti_frame(uint8_t* p, metadata_t metadata);
+ void send_eti_frame(frame_t& frame);
void process(void);
int tist_delay_ms;
@@ -67,13 +71,15 @@ class EDISender {
std::atomic<bool> running;
std::thread process_thread;
edi::configuration_t edi_conf;
- std::chrono::steady_clock::time_point startTime;
ThreadsafeQueue<frame_t> frames;
std::shared_ptr<edi::Sender> edi_sender;
- // For statistics about wait time before we transmit packets,
- // in microseconds
- std::vector<double> wait_times;
+ struct buffering_stat_t {
+ // Time between when we received the packets and when we transmit packets, in microseconds
+ double buffering_time_us = 0.0;
+ bool late = false;
+ };
+ std::vector<buffering_stat_t> buffering_stats;
};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 885f01d..dacb573 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -347,6 +347,7 @@ int start(int argc, char **argv)
// Event received: recv will not block
zmq_sock.recv(&incoming);
+ const auto received_at = std::chrono::steady_clock::now();
zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
@@ -358,7 +359,8 @@ int start(int argc, char **argv)
int offset = sizeof(dab_msg->version) +
NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ std::vector<frame_t> all_frames;
+ all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE);
for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
@@ -367,18 +369,17 @@ int start(int argc, char **argv)
error_count++;
}
else {
- std::vector<uint8_t> buf(6144, 0x55);
+ frame_t frame;
+ frame.data.resize(6144, 0x55);
+ frame.received_at = received_at;
const int framesize = dab_msg->buflen[i];
- memcpy(&buf.front(),
+ memcpy(frame.data.data(),
((uint8_t*)incoming.data()) + offset,
framesize);
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ all_frames.push_back(std::move(frame));
offset += framesize;
}
@@ -387,7 +388,7 @@ int start(int argc, char **argv)
for (auto &f : all_frames) {
size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
+ f.metadata = get_md_one_frame(
static_cast<uint8_t*>(incoming.data()) + offset,
incoming.size() - offset,
&consumed_bytes);
@@ -396,7 +397,7 @@ int start(int argc, char **argv)
}
for (auto &f : all_frames) {
- edisender.push_frame(f);
+ edisender.push_frame(std::move(f));
}
}
}