summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-08-11 13:57:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-08-11 13:57:51 +0200
commit83ec06a70ac83473e1de1168ad0d88462292fd1f (patch)
tree3641f813932f745e494d6a9221bf0e97ad9e3213
parent232790942327e272d2a3ce407b2fa7cfcb77e05d (diff)
downloaddabmux-83ec06a70ac83473e1de1168ad0d88462292fd1f.tar.gz
dabmux-83ec06a70ac83473e1de1168ad0d88462292fd1f.tar.bz2
dabmux-83ec06a70ac83473e1de1168ad0d88462292fd1f.zip
Add delay logic to odr-zmq2edi
-rwxr-xr-xdoc/stats_dabmux_multi.py2
-rw-r--r--src/Makefile.am4
-rw-r--r--src/zmq2edi/EDISender.cpp330
-rw-r--r--src/zmq2edi/EDISender.h83
-rw-r--r--src/zmq2edi/zmq2edi.cpp275
5 files changed, 433 insertions, 261 deletions
diff --git a/doc/stats_dabmux_multi.py b/doc/stats_dabmux_multi.py
index 05f52af..664f2f1 100755
--- a/doc/stats_dabmux_multi.py
+++ b/doc/stats_dabmux_multi.py
@@ -32,7 +32,7 @@ low.warning 1:
multigraph over_underruns_{ident}
graph_title Contribution {ident} over/underruns
graph_order underruns overruns
-graph_args --base 1000
+graph_args --base 1000 --logarithmic
graph_vlabel number of underruns/overruns during last ${{graph_period}}
graph_category dabmux
graph_info This graph shows the number of under/overruns for the {ident} ZMQ input
diff --git a/src/Makefile.am b/src/Makefile.am
index 2e0bb9c..badc0b9 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -126,6 +126,7 @@ odr_zmq2farsync_CFLAGS = -Wall $(GITVERSION_FLAGS) -I$(FARSYNC_DIR)
odr_zmq2farsync_CXXFLAGS = -Wall -std=c++11 $(GITVERSION_FLAGS) -I$(FARSYNC_DIR)
odr_zmq2edi_SOURCES = zmq2edi/zmq2edi.cpp \
+ zmq2edi/EDISender.h zmq2edi/EDISender.cpp \
dabOutput/dabOutput.h \
dabOutput/metadata.h dabOutput/metadata.cpp \
dabOutput/edi/AFPacket.cpp dabOutput/edi/AFPacket.h \
@@ -144,7 +145,8 @@ odr_zmq2edi_SOURCES = zmq2edi/zmq2edi.cpp \
Log.h Log.cpp \
crc.h crc.c \
zmq.hpp
-odr_zmq2edi_LDADD = $(ZMQ_LIBS)
+odr_zmq2edi_LDADD = $(ZMQ_LIBS) \
+ -lpthread -lboost_thread -lboost_system
odr_zmq2edi_CFLAGS = -Wall $(GITVERSION_FLAGS)
odr_zmq2edi_CXXFLAGS = -Wall -std=c++11 $(GITVERSION_FLAGS)
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
new file mode 100644
index 0000000..c9033e9
--- /dev/null
+++ b/src/zmq2edi/EDISender.cpp
@@ -0,0 +1,330 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <math.h>
+#include "EDISender.h"
+#include "Log.h"
+
+using namespace std;
+
+EDISender::~EDISender()
+{
+ if (running.load()) {
+ running.store(false);
+
+ // Unblock thread
+ frame_t emptyframe;
+ frames.push(emptyframe);
+
+ process_thread.join();
+ }
+}
+
+void EDISender::start(const edi_configuration_t& conf, int delay_ms)
+{
+ edi_conf = conf;
+ tist_delay_ms = delay_ms;
+
+ if (edi_conf.verbose) {
+ etiLog.log(info, "Setup EDI");
+ }
+
+ if (edi_conf.dump) {
+ edi_debug_file.open("./edi.debug");
+ }
+
+ if (edi_conf.enabled()) {
+ for (auto& edi_destination : edi_conf.destinations) {
+ auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port);
+
+ if (not edi_destination.source_addr.empty()) {
+ int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
+ if (err) {
+ throw std::runtime_error("EDI socket set source failed!");
+ }
+ err = edi_output->setMulticastTTL(edi_destination.ttl);
+ if (err) {
+ throw std::runtime_error("EDI socket set TTL failed!");
+ }
+ }
+
+ edi_destination.socket = edi_output;
+ }
+ }
+
+ if (edi_conf.verbose) {
+ etiLog.log(info, "EDI set up");
+ }
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT pft(edi_conf);
+ edi_pft = pft;
+
+ if (edi_conf.interleaver_enabled()) {
+ edi_interleaver.SetLatency(edi_conf.latency_frames);
+ }
+
+ startTime = std::chrono::steady_clock::now();
+ running.store(true);
+ process_thread = thread(&EDISender::process, this);
+}
+
+void EDISender::push_frame(const frame_t& frame)
+{
+ frames.push(frame);
+}
+
+void EDISender::print_configuration()
+{
+ if (edi_conf.enabled()) {
+ etiLog.level(info) << "EDI";
+ etiLog.level(info) << " verbose " << edi_conf.verbose;
+ for (auto& edi_dest : edi_conf.destinations) {
+ etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
+ if (not edi_dest.source_addr.empty()) {
+ etiLog.level(info) << " source " << edi_dest.source_addr;
+ etiLog.level(info) << " ttl " << edi_dest.ttl;
+ }
+ etiLog.level(info) << " source port " << edi_dest.source_port;
+ }
+ if (edi_conf.interleaver_enabled()) {
+ etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
+ }
+ }
+ else {
+ etiLog.level(info) << "EDI disabled";
+ }
+}
+
+void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
+{
+ edi::TagDETI edi_tagDETI;
+ edi::TagStarPTR edi_tagStarPtr;
+ std::map<int, edi::TagESTn> edi_subchannelToTag;
+ // The above Tag Items will be assembled into a TAG Packet
+ edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
+
+ // SYNC
+ edi_tagDETI.stat = p[0];
+
+ // LIDATA FCT
+ edi_tagDETI.dlfc = metadata.dlfc;
+
+ const int fct = p[4];
+ if (metadata.dlfc % 250 != fct) {
+ etiLog.level(warn) << "Frame FCT=" << fct <<
+ " does not correspond to DLFC=" << metadata.dlfc;
+ }
+
+ etiLog.level(debug) << "tx " << fct;
+
+ bool ficf = (p[5] & 0x80) >> 7;
+ edi_tagDETI.ficf = ficf;
+
+ const int nst = p[5] & 0x7F;
+
+ edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
+ const int mid = (p[6] & 0x18) >> 3;
+ edi_tagDETI.mid = mid;
+ //const int fl = (p[6] & 0x07) * 256 + p[7];
+
+ int ficl = 0;
+ if (ficf == 0) {
+ etiLog.level(warn) << "Not FIC in data stream!";
+ return;
+ }
+ else if (mid == 3) {
+ ficl = 32;
+ }
+ else {
+ ficl = 24;
+ }
+
+ std::vector<uint32_t> sad(nst);
+ std::vector<uint32_t> stl(nst);
+ // Loop over STC subchannels:
+ for (int i=0; i < nst; i++) {
+ // EDI stream index is 1-indexed
+ const int edi_stream_id = i + 1;
+
+ uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
+ sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
+ uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
+ stl[i] = (p[10+4*i] & 0x03) * 256 + \
+ p[11+4*i];
+
+ edi::TagESTn tag_ESTn;
+ tag_ESTn.id = edi_stream_id;
+ tag_ESTn.scid = scid;
+ tag_ESTn.sad = sad[i];
+ tag_ESTn.tpl = tpl;
+ tag_ESTn.rfa = 0; // two bits
+ tag_ESTn.mst_length = stl[i];
+ tag_ESTn.mst_data = nullptr;
+
+ edi_subchannelToTag[i] = tag_ESTn;
+ }
+
+ const uint16_t mnsc = p[8 + 4*nst] * 256 + \
+ p[8 + 4*nst + 1];
+ edi_tagDETI.mnsc = mnsc;
+
+ /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
+ p[8 + 4*nst + 3]; */
+
+ edi_tagDETI.fic_data = p + 12 + 4*nst;
+ edi_tagDETI.fic_length = ficl * 4;
+
+ // loop over MSC subchannels
+ int offset = 0;
+ for (int i=0; i < nst; i++) {
+ edi::TagESTn& tag = edi_subchannelToTag[i];
+ tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
+
+ offset += stl[i] * 8;
+ }
+
+ /*
+ const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
+ p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
+
+ // TIST
+ const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
+ uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
+ (uint32_t)(p[tist_ix+1]) << 16 |
+ (uint32_t)(p[tist_ix+2]) << 8 |
+ (uint32_t)(p[tist_ix+3]);
+
+ std::time_t posix_timestamp_1_jan_2000 = 946684800;
+
+ // Wait until our time is tist_delay after the TIST before
+ // we release that frame
+
+ const auto seconds = metadata.edi_time;
+ const auto pps_offset = std::chrono::milliseconds(
+ std::lrint((tist & 0xFFFFFF) / 16384.0));
+ const auto t_frame = std::chrono::system_clock::from_time_t(
+ seconds + posix_timestamp_1_jan_2000) + pps_offset;
+ const auto t_release = t_frame + std::chrono::milliseconds(tist_delay_ms);
+ const auto t_now = chrono::system_clock::now();
+ const auto wait_time = t_release - t_now;
+ const auto duration_0 = std::chrono::milliseconds(0);
+
+ /*
+ etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000;
+ etiLog.level(debug) << "now " << chrono::system_clock::to_time_t(t_now);
+ etiLog.level(debug) << "wait " << wait_time.count();
+ */
+
+ if (wait_time > duration_0) {
+ std::this_thread::sleep_for(wait_time);
+ }
+
+
+ edi_tagDETI.tsta = tist;
+ edi_tagDETI.atstf = 1;
+ edi_tagDETI.utco = metadata.utc_offset;
+ edi_tagDETI.seconds = metadata.edi_time;
+
+ if (edi_conf.enabled()) {
+ // put tags *ptr, DETI and all subchannels into one TagPacket
+ edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
+ edi_tagpacket.tag_items.push_back(&edi_tagDETI);
+
+ for (auto& tag : edi_subchannelToTag) {
+ edi_tagpacket.tag_items.push_back(&tag.second);
+ }
+
+ // Assemble into one AF Packet
+ edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
+
+ if (edi_conf.enable_pft) {
+ // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
+ std::vector<edi::PFTFragment> edi_fragments =
+ edi_pft.Assemble(edi_afpacket);
+
+ if (edi_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
+ edi_fragments.size());
+ }
+
+ if (edi_conf.interleaver_enabled()) {
+ edi_fragments = edi_interleaver.Interleave(edi_fragments);
+ }
+
+ // Send over ethernet
+ for (const auto& edi_frag : edi_fragments) {
+ for (auto& dest : edi_conf.destinations) {
+ InetAddress addr;
+ addr.setAddress(dest.dest_addr.c_str());
+ addr.setPort(edi_conf.dest_port);
+
+ dest.socket->send(edi_frag, addr);
+ }
+
+ if (edi_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ }
+ }
+
+ if (edi_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
+ edi_fragments.size());
+ }
+ }
+ else {
+ // Send over ethernet
+ for (auto& dest : edi_conf.destinations) {
+ InetAddress addr;
+ addr.setAddress(dest.dest_addr.c_str());
+ addr.setPort(edi_conf.dest_port);
+
+ dest.socket->send(edi_afpacket, addr);
+ }
+
+ if (edi_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
+ }
+ }
+ }
+}
+
+void EDISender::process()
+{
+ while (running.load()) {
+ frame_t frame;
+ frames.wait_and_pop(frame);
+
+ if (not running.load()) {
+ break;
+ }
+
+ send_eti_frame(frame.first.data(), frame.second);
+ }
+}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
new file mode 100644
index 0000000..c269652
--- /dev/null
+++ b/src/zmq2edi/EDISender.h
@@ -0,0 +1,83 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+#include <iostream>
+#include <iterator>
+#include <thread>
+#include <vector>
+#include <chrono>
+#include <atomic>
+#include "ThreadsafeQueue.h"
+#include "dabOutput/dabOutput.h"
+#include "dabOutput/edi/TagItems.h"
+#include "dabOutput/edi/TagPacket.h"
+#include "dabOutput/edi/AFPacket.h"
+#include "dabOutput/edi/PFT.h"
+#include "dabOutput/edi/Interleaver.h"
+
+// This metadata gets transmitted in the zmq stream
+struct metadata_t {
+ uint32_t edi_time;
+ int16_t utc_offset;
+ uint16_t dlfc;
+};
+
+using frame_t = std::pair<std::vector<uint8_t>, metadata_t>;
+
+class EDISender {
+ public:
+ EDISender() = default;
+ EDISender(const EDISender& other) = delete;
+ EDISender& operator=(const EDISender& other) = delete;
+ ~EDISender();
+ void start(const edi_configuration_t& conf, int delay_ms);
+ void push_frame(const frame_t& frame);
+ void print_configuration(void);
+
+ private:
+ void send_eti_frame(uint8_t* p, metadata_t metadata);
+ void process(void);
+
+ int tist_delay_ms;
+ std::atomic<bool> running;
+ std::thread process_thread;
+ edi_configuration_t edi_conf;
+ std::chrono::steady_clock::time_point startTime;
+ ThreadsafeQueue<frame_t> frames;
+ std::ofstream edi_debug_file;
+
+ // The TagPacket will then be placed into an AFPacket
+ edi::AFPacketiser edi_afPacketiser;
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT edi_pft;
+
+ // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
+ edi::Interleaver edi_interleaver;
+
+};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 1597f87..a915363 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -25,40 +25,21 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/PFT.h"
-#include "dabOutput/edi/Interleaver.h"
#include "Log.h"
#include "zmq.hpp"
-#include "math.h"
+#include <math.h>
#include <getopt.h>
#include <string.h>
#include <iostream>
#include <iterator>
#include <vector>
-static edi_configuration_t edi_conf;
-
-static std::ofstream edi_debug_file;
-
-// The TagPacket will then be placed into an AFPacket
-static edi::AFPacketiser edi_afPacketiser;
-
-// The AF Packet will be protected with reed-solomon and split in fragments
-static edi::PFT edi_pft;
+#include "EDISender.h"
+#include "dabOutput/dabOutput.h"
-// To mitigate for burst packet loss, PFT fragments can be sent out-of-order
-static edi::Interleaver edi_interleaver;
+static edi_configuration_t edi_conf;
-// This metadata gets transmitted in the zmq stream
-struct metadata_t {
- uint32_t edi_time;
- int16_t utc_offset;
- uint16_t dlfc;
-};
+static EDISender edisender;
void usage(void)
{
@@ -69,6 +50,7 @@ void usage(void)
cerr << "Where the options are:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds before current system time." << endl;
cerr << " -d <destination ip> sets the destination ip." << endl;
cerr << " -p <destination port> sets the destination port." << endl;
cerr << " -s <source port> sets the source port." << endl;
@@ -82,69 +64,6 @@ void usage(void)
cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl;
}
-static void edi_setup(void) {
- if (edi_conf.verbose) {
- etiLog.log(info, "Setup EDI");
- }
-
- if (edi_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (edi_conf.enabled()) {
- for (auto& edi_destination : edi_conf.destinations) {
- auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port);
-
- if (not edi_destination.source_addr.empty()) {
- int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
- if (err) {
- throw std::runtime_error("EDI socket set source failed!");
- }
- err = edi_output->setMulticastTTL(edi_destination.ttl);
- if (err) {
- throw std::runtime_error("EDI socket set TTL failed!");
- }
- }
-
- edi_destination.socket = edi_output;
- }
- }
-
- if (edi_conf.verbose) {
- etiLog.log(info, "EDI set up");
- }
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT pft(edi_conf);
- edi_pft = pft;
-
- if (edi_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(edi_conf.latency_frames);
- }
-}
-
-static void print_edi_conf(void)
-{
- if (edi_conf.enabled()) {
- etiLog.level(info) << "EDI";
- etiLog.level(info) << " verbose " << edi_conf.verbose;
- for (auto& edi_dest : edi_conf.destinations) {
- etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
- if (not edi_dest.source_addr.empty()) {
- etiLog.level(info) << " source " << edi_dest.source_addr;
- etiLog.level(info) << " ttl " << edi_dest.ttl;
- }
- etiLog.level(info) << " source port " << edi_dest.source_port;
- }
- if (edi_conf.interleaver_enabled()) {
- etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
- }
- }
- else {
- etiLog.level(info) << "EDI disabled";
- }
-}
-
static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes)
{
size_t remaining = size;
@@ -222,171 +141,6 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b
throw std::runtime_error("Insufficient data");
}
-static void send_eti_frame(uint8_t* p, metadata_t metadata)
-{
- edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
- std::map<int, edi::TagESTn> edi_subchannelToTag;
- // The above Tag Items will be assembled into a TAG Packet
- edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
-
- // SYNC
- edi_tagDETI.stat = p[0];
-
- // LIDATA FCT
- edi_tagDETI.dlfc = metadata.dlfc;
-
- const int fct = p[4];
- if (metadata.dlfc % 250 != fct) {
- etiLog.level(warn) << "Frame FCT=" << fct << " does not correspond to DLFC=" << metadata.dlfc;
- }
-
- bool ficf = (p[5] & 0x80) >> 7;
- edi_tagDETI.ficf = ficf;
-
- const int nst = p[5] & 0x7F;
-
- edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
- const int mid = (p[6] & 0x18) >> 3;
- edi_tagDETI.mid = mid;
- //const int fl = (p[6] & 0x07) * 256 + p[7];
-
- int ficl = 0;
- if (ficf == 0) {
- etiLog.level(warn) << "Not FIC in data stream!";
- return;
- }
- else if (mid == 3) {
- ficl = 32;
- }
- else {
- ficl = 24;
- }
-
- std::vector<uint32_t> sad(nst);
- std::vector<uint32_t> stl(nst);
- // Loop over STC subchannels:
- for (int i=0; i < nst; i++) {
- // EDI stream index is 1-indexed
- const int edi_stream_id = i + 1;
-
- uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
- sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
- uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
- stl[i] = (p[10+4*i] & 0x03) * 256 + \
- p[11+4*i];
-
- edi::TagESTn tag_ESTn;
- tag_ESTn.id = edi_stream_id;
- tag_ESTn.scid = scid;
- tag_ESTn.sad = sad[i];
- tag_ESTn.tpl = tpl;
- tag_ESTn.rfa = 0; // two bits
- tag_ESTn.mst_length = stl[i];
- tag_ESTn.mst_data = nullptr;
-
- edi_subchannelToTag[i] = tag_ESTn;
- }
-
- const uint16_t mnsc = p[8 + 4*nst] * 256 + \
- p[8 + 4*nst + 1];
- edi_tagDETI.mnsc = mnsc;
-
- /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
- p[8 + 4*nst + 3]; */
-
- edi_tagDETI.fic_data = p + 12 + 4*nst;
- edi_tagDETI.fic_length = ficl * 4;
-
- // loop over MSC subchannels
- int offset = 0;
- for (int i=0; i < nst; i++) {
- edi::TagESTn& tag = edi_subchannelToTag[i];
- tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
-
- offset += stl[i] * 8;
- }
-
- /*
- const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
- p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
-
- // TIST
- const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
- uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
- (uint32_t)(p[tist_ix+1]) << 16 |
- (uint32_t)(p[tist_ix+2]) << 8 |
- (uint32_t)(p[tist_ix+3]);
-
- edi_tagDETI.tsta = tist;
- edi_tagDETI.atstf = 1;
- edi_tagDETI.utco = metadata.utc_offset;
- edi_tagDETI.seconds = metadata.edi_time;
-
- if (edi_conf.enabled()) {
- // put tags *ptr, DETI and all subchannels into one TagPacket
- edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
- edi_tagpacket.tag_items.push_back(&edi_tagDETI);
-
- for (auto& tag : edi_subchannelToTag) {
- edi_tagpacket.tag_items.push_back(&tag.second);
- }
-
- // Assemble into one AF Packet
- edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
-
- if (edi_conf.enable_pft) {
- // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- std::vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
- edi_fragments.size());
- }
-
- if (edi_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- // Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_frag, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu\n",
- edi_fragments.size());
- }
- }
- else {
- // Send over ethernet
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_afpacket, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
- }
- }
- }
-}
-
int start(int argc, char **argv)
{
edi_destination_t edi_destination;
@@ -398,9 +152,11 @@ int start(int argc, char **argv)
return 1;
}
+ int delay_ms = 500;
+
char ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:");
+ ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:");
switch (ch) {
case -1:
break;
@@ -451,11 +207,11 @@ int start(int argc, char **argv)
case 'a':
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
+ case 'w':
+ delay_ms = std::stoi(optarg);
+ break;
case 'h':
- usage();
- return 1;
default:
- etiLog.log(error, "Option '%c' not understood", ch);
usage();
return 1;
}
@@ -468,8 +224,9 @@ int start(int argc, char **argv)
edi_conf.destinations.push_back(edi_destination);
- print_edi_conf();
- edi_setup();
+ etiLog.level(info) << "Setting up EDI Sender withe delay " << delay_ms << " ms";
+ edisender.start(edi_conf, delay_ms);
+ edisender.print_configuration();
const char* source_url = argv[optind];
@@ -539,7 +296,7 @@ int start(int argc, char **argv)
}
for (auto &f : all_frames) {
- send_eti_frame(f.first.data(), f.second);
+ edisender.push_frame(f);
frame_count++;
}