aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2edi
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq2edi')
-rw-r--r--src/zmq2edi/EDISender.cpp71
-rw-r--r--src/zmq2edi/EDISender.h4
-rw-r--r--src/zmq2edi/zmq2edi.cpp11
3 files changed, 59 insertions, 27 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index c9033e9..6027067 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -25,6 +25,8 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <numeric>
+#include <algorithm>
#include <math.h>
#include "EDISender.h"
#include "Log.h"
@@ -59,16 +61,16 @@ void EDISender::start(const edi_configuration_t& conf, int delay_ms)
if (edi_conf.enabled()) {
for (auto& edi_destination : edi_conf.destinations) {
- auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port);
+ auto edi_output = make_shared<UdpSocket>(edi_destination.source_port);
if (not edi_destination.source_addr.empty()) {
int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
if (err) {
- throw std::runtime_error("EDI socket set source failed!");
+ throw runtime_error("EDI socket set source failed!");
}
err = edi_output->setMulticastTTL(edi_destination.ttl);
if (err) {
- throw std::runtime_error("EDI socket set TTL failed!");
+ throw runtime_error("EDI socket set TTL failed!");
}
}
@@ -124,7 +126,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
{
edi::TagDETI edi_tagDETI;
edi::TagStarPTR edi_tagStarPtr;
- std::map<int, edi::TagESTn> edi_subchannelToTag;
+ map<int, edi::TagESTn> edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
@@ -140,8 +142,6 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
" does not correspond to DLFC=" << metadata.dlfc;
}
- etiLog.level(debug) << "tx " << fct;
-
bool ficf = (p[5] & 0x80) >> 7;
edi_tagDETI.ficf = ficf;
@@ -164,8 +164,8 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
ficl = 24;
}
- std::vector<uint32_t> sad(nst);
- std::vector<uint32_t> stl(nst);
+ vector<uint32_t> sad(nst);
+ vector<uint32_t> stl(nst);
// Loop over STC subchannels:
for (int i=0; i < nst; i++) {
// EDI stream index is 1-indexed
@@ -224,26 +224,31 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
// Wait until our time is tist_delay after the TIST before
// we release that frame
+ using namespace std::chrono;
+
const auto seconds = metadata.edi_time;
- const auto pps_offset = std::chrono::milliseconds(
- std::lrint((tist & 0xFFFFFF) / 16384.0));
- const auto t_frame = std::chrono::system_clock::from_time_t(
+ const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
+ const auto t_frame = system_clock::from_time_t(
seconds + posix_timestamp_1_jan_2000) + pps_offset;
- const auto t_release = t_frame + std::chrono::milliseconds(tist_delay_ms);
- const auto t_now = chrono::system_clock::now();
+
+ const auto t_release = t_frame + milliseconds(tist_delay_ms);
+ const auto t_now = system_clock::now();
const auto wait_time = t_release - t_now;
- const auto duration_0 = std::chrono::milliseconds(0);
+ const auto duration_0 = milliseconds(0);
/*
etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000;
- etiLog.level(debug) << "now " << chrono::system_clock::to_time_t(t_now);
+ etiLog.level(debug) << "now " << system_clock::to_time_t(t_now);
etiLog.level(debug) << "wait " << wait_time.count();
*/
if (wait_time > duration_0) {
std::this_thread::sleep_for(wait_time);
+ wait_times.push_back(duration_cast<microseconds>(wait_time).count());
+ }
+ else {
+ wait_times.push_back(0);
}
-
edi_tagDETI.tsta = tist;
edi_tagDETI.atstf = 1;
@@ -264,8 +269,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
if (edi_conf.enable_pft) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- std::vector<edi::PFTFragment> edi_fragments =
- edi_pft.Assemble(edi_afpacket);
+ vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
if (edi_conf.verbose) {
fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
@@ -326,5 +330,36 @@ void EDISender::process()
}
send_eti_frame(frame.first.data(), frame.second);
+
+ if (wait_times.size() == 250) { // every six seconds
+ const double n = wait_times.size();
+
+ double sum = accumulate(wait_times.begin(), wait_times.end(), 0);
+ double mean = sum / n;
+
+ double sq_sum = 0;
+ for (const auto t : wait_times) {
+ sq_sum += (t-mean) * (t-mean);
+ }
+ double stdev = sqrt(sq_sum / n);
+ auto min_max = minmax_element(wait_times.begin(), wait_times.end());
+
+ /* Debug code
+ stringstream ss;
+ ss << "times:";
+ for (const auto t : wait_times) {
+ ss << " " << t;
+ }
+ etiLog.level(debug) << ss.str();
+ */
+
+ etiLog.level(info) << "Wait time statistics:"
+ " min: " << *min_max.first <<
+ " max: " << *min_max.second <<
+ " mean: " << mean <<
+ " stdev: " << stdev;
+
+ wait_times.clear();
+ }
}
}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index c269652..f5959b9 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -80,4 +80,8 @@ class EDISender {
// To mitigate for burst packet loss, PFT fragments can be sent out-of-order
edi::Interleaver edi_interleaver;
+ // For statistics about wait time before we transmit packets,
+ // in microseconds
+ std::vector<double> wait_times;
+
};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index a915363..2d1963b 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -50,7 +50,7 @@ void usage(void)
cerr << "Where the options are:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
- cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds before current system time." << endl;
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
cerr << " -d <destination ip> sets the destination ip." << endl;
cerr << " -p <destination port> sets the destination port." << endl;
cerr << " -s <source port> sets the source port." << endl;
@@ -224,7 +224,7 @@ int start(int argc, char **argv)
edi_conf.destinations.push_back(edi_destination);
- etiLog.level(info) << "Setting up EDI Sender withe delay " << delay_ms << " ms";
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms";
edisender.start(edi_conf, delay_ms);
edisender.print_configuration();
@@ -239,7 +239,6 @@ int start(int argc, char **argv)
etiLog.level(info) << "Entering main loop";
size_t frame_count = 0;
- size_t loop_counter = 0;
size_t error_count = 0;
while (error_count < 10)
{
@@ -299,12 +298,6 @@ int start(int argc, char **argv)
edisender.push_frame(f);
frame_count++;
}
-
- loop_counter++;
- if (loop_counter > 250) {
- etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
- loop_counter = 0;
- }
}
return error_count > 0 ? 2 : 0;