summaryrefslogtreecommitdiffstats
path: root/src/zmq2edi
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 15:04:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 15:04:51 +0200
commit956814cc526bdd245e52c5004bf5661a57d848cc (patch)
tree10a4e368432740fc0514ae9d1de572bb1c844768 /src/zmq2edi
parent8cb5b3eac1bb669b8828777489d54e9d9057fe6f (diff)
downloaddabmux-956814cc526bdd245e52c5004bf5661a57d848cc.tar.gz
dabmux-956814cc526bdd245e52c5004bf5661a57d848cc.tar.bz2
dabmux-956814cc526bdd245e52c5004bf5661a57d848cc.zip
EDI: put more code in common between DabMux and ZMQ2EDI
Diffstat (limited to 'src/zmq2edi')
-rw-r--r--src/zmq2edi/EDISender.cpp110
-rw-r--r--src/zmq2edi/EDISender.h20
-rw-r--r--src/zmq2edi/zmq2edi.cpp23
3 files changed, 23 insertions, 130 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 0df633f..2128abf 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -47,51 +47,14 @@ EDISender::~EDISender()
}
}
-void EDISender::start(const edi_configuration_t& conf,
+void EDISender::start(const edi::configuration_t& conf,
int delay_ms, bool drop_late_packets)
{
edi_conf = conf;
tist_delay_ms = delay_ms;
drop_late = drop_late_packets;
- if (edi_conf.verbose) {
- etiLog.log(info, "Setup EDI");
- }
-
- if (edi_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (edi_conf.enabled()) {
- for (auto& edi_destination : edi_conf.destinations) {
- auto edi_output = make_shared<UdpSocket>(edi_destination.source_port);
-
- if (not edi_destination.source_addr.empty()) {
- int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = edi_output->setMulticastTTL(edi_destination.ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
- }
-
- edi_destination.socket = edi_output;
- }
- }
-
- if (edi_conf.verbose) {
- etiLog.log(info, "EDI set up");
- }
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT pft(edi_conf);
- edi_pft = pft;
-
- if (edi_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(edi_conf.latency_frames);
- }
+ edi_sender = make_shared<edi::Sender>(edi_conf);
startTime = std::chrono::steady_clock::now();
running.store(true);
@@ -106,19 +69,7 @@ void EDISender::push_frame(const frame_t& frame)
void EDISender::print_configuration()
{
if (edi_conf.enabled()) {
- etiLog.level(info) << "EDI";
- etiLog.level(info) << " verbose " << edi_conf.verbose;
- for (auto& edi_dest : edi_conf.destinations) {
- etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
- if (not edi_dest.source_addr.empty()) {
- etiLog.level(info) << " source " << edi_dest.source_addr;
- etiLog.level(info) << " ttl " << edi_dest.ttl;
- }
- etiLog.level(info) << " source port " << edi_dest.source_port;
- }
- if (edi_conf.interleaver_enabled()) {
- etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
- }
+ edi_conf.print();
}
else {
etiLog.level(info) << "EDI disabled";
@@ -251,7 +202,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
edi_tagDETI.utco = metadata.utc_offset;
edi_tagDETI.seconds = metadata.edi_time;
- if (edi_conf.enabled()) {
+ if (edi_sender and edi_conf.enabled()) {
// put tags *ptr, DETI and all subchannels into one TagPacket
edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
edi_tagpacket.tag_items.push_back(&edi_tagDETI);
@@ -260,58 +211,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
edi_tagpacket.tag_items.push_back(&tag.second);
}
- // Assemble into one AF Packet
- edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
-
- if (edi_conf.enable_pft) {
- // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
- edi_fragments.size());
- }
-
- if (edi_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- // Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_frag, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu\n",
- edi_fragments.size());
- }
- }
- else {
- // Send over ethernet
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_afpacket, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
- }
- }
+ edi_sender->write(edi_tagpacket);
}
}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index 4c2af54..bb9c8bc 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -36,9 +36,7 @@
#include "dabOutput/dabOutput.h"
#include "dabOutput/edi/TagItems.h"
#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/PFT.h"
-#include "dabOutput/edi/Interleaver.h"
+#include "dabOutput/edi/Transport.h"
// This metadata gets transmitted in the zmq stream
struct metadata_t {
@@ -55,7 +53,7 @@ class EDISender {
EDISender(const EDISender& other) = delete;
EDISender& operator=(const EDISender& other) = delete;
~EDISender();
- void start(const edi_configuration_t& conf,
+ void start(const edi::configuration_t& conf,
int delay_ms, bool drop_late_packets);
void push_frame(const frame_t& frame);
void print_configuration(void);
@@ -68,19 +66,11 @@ class EDISender {
bool drop_late;
std::atomic<bool> running;
std::thread process_thread;
- edi_configuration_t edi_conf;
+ edi::configuration_t edi_conf;
std::chrono::steady_clock::time_point startTime;
ThreadsafeQueue<frame_t> frames;
- std::ofstream edi_debug_file;
- // The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
-
- // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
- edi::Interleaver edi_interleaver;
+ std::shared_ptr<edi::Sender> edi_sender;
// For statistics about wait time before we transmit packets,
// in microseconds
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 3364faa..ee5776e 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -40,7 +40,7 @@
constexpr size_t MAX_ERROR_COUNT = 10;
constexpr long ZMQ_TIMEOUT_MS = 1000;
-static edi_configuration_t edi_conf;
+static edi::configuration_t edi_conf;
static EDISender edisender;
@@ -155,7 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b
/* There is some state inside the parsing of destination arguments,
* because several destinations can be given. */
-static edi_destination_t edi_destination;
+static std::shared_ptr<edi::udp_destination_t> edi_destination;
static bool source_port_set = false;
static bool source_addr_set = false;
static bool ttl_set = false;
@@ -168,9 +168,8 @@ static void add_edi_destination(void)
std::to_string(edi_conf.destinations.size() + 1));
}
- edi_conf.destinations.push_back(edi_destination);
- edi_destination_t newdest;
- edi_destination = newdest;
+ edi_conf.destinations.push_back(move(edi_destination));
+ edi_destination.reset();
source_port_set = false;
source_addr_set = false;
@@ -180,33 +179,37 @@ static void add_edi_destination(void)
static void parse_destination_args(char option)
{
+ if (not edi_destination) {
+ edi_destination = std::make_shared<edi::udp_destination_t>();
+ }
+
switch (option) {
case 's':
if (source_port_set) {
add_edi_destination();
}
- edi_destination.source_port = std::stoi(optarg);
+ edi_destination->source_port = std::stoi(optarg);
source_port_set = true;
break;
case 'S':
if (source_addr_set) {
add_edi_destination();
}
- edi_destination.source_addr = optarg;
+ edi_destination->source_addr = optarg;
source_addr_set = true;
break;
case 't':
if (ttl_set) {
add_edi_destination();
}
- edi_destination.ttl = std::stoi(optarg);
+ edi_destination->ttl = std::stoi(optarg);
ttl_set = true;
break;
case 'd':
if (dest_addr_set) {
add_edi_destination();
}
- edi_destination.dest_addr = optarg;
+ edi_destination->dest_addr = optarg;
dest_addr_set = true;
break;
default: