diff options
Diffstat (limited to 'src/zmq2edi')
-rw-r--r-- | src/zmq2edi/EDISender.cpp | 110 | ||||
-rw-r--r-- | src/zmq2edi/EDISender.h | 20 | ||||
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 23 |
3 files changed, 23 insertions, 130 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 0df633f..2128abf 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -47,51 +47,14 @@ EDISender::~EDISender() } } -void EDISender::start(const edi_configuration_t& conf, +void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets) { edi_conf = conf; tist_delay_ms = delay_ms; drop_late = drop_late_packets; - if (edi_conf.verbose) { - etiLog.log(info, "Setup EDI"); - } - - if (edi_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (edi_conf.enabled()) { - for (auto& edi_destination : edi_conf.destinations) { - auto edi_output = make_shared<UdpSocket>(edi_destination.source_port); - - if (not edi_destination.source_addr.empty()) { - int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = edi_output->setMulticastTTL(edi_destination.ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } - } - - edi_destination.socket = edi_output; - } - } - - if (edi_conf.verbose) { - etiLog.log(info, "EDI set up"); - } - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT pft(edi_conf); - edi_pft = pft; - - if (edi_conf.interleaver_enabled()) { - edi_interleaver.SetLatency(edi_conf.latency_frames); - } + edi_sender = make_shared<edi::Sender>(edi_conf); startTime = std::chrono::steady_clock::now(); running.store(true); @@ -106,19 +69,7 @@ void EDISender::push_frame(const frame_t& frame) void EDISender::print_configuration() { if (edi_conf.enabled()) { - etiLog.level(info) << "EDI"; - etiLog.level(info) << " verbose " << edi_conf.verbose; - for (auto& edi_dest : edi_conf.destinations) { - etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; - if (not edi_dest.source_addr.empty()) { - etiLog.level(info) << " source " << edi_dest.source_addr; - etiLog.level(info) << " ttl " << edi_dest.ttl; - } - etiLog.level(info) << " source port " << edi_dest.source_port; - } - if (edi_conf.interleaver_enabled()) { - etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms"; - } + edi_conf.print(); } else { etiLog.level(info) << "EDI disabled"; @@ -251,7 +202,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) edi_tagDETI.utco = metadata.utc_offset; edi_tagDETI.seconds = metadata.edi_time; - if (edi_conf.enabled()) { + if (edi_sender and edi_conf.enabled()) { // put tags *ptr, DETI and all subchannels into one TagPacket edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); edi_tagpacket.tag_items.push_back(&edi_tagDETI); @@ -260,58 +211,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) edi_tagpacket.tag_items.push_back(&tag.second); } - // Assemble into one AF Packet - edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - - if (edi_conf.enable_pft) { - // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) - vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket); - - if (edi_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", - edi_fragments.size()); - } - - if (edi_conf.interleaver_enabled()) { - edi_fragments = edi_interleaver.Interleave(edi_fragments); - } - - // Send over ethernet - for (const auto& edi_frag : edi_fragments) { - for (auto& dest : edi_conf.destinations) { - InetAddress addr; - addr.setAddress(dest.dest_addr.c_str()); - addr.setPort(edi_conf.dest_port); - - dest.socket->send(edi_frag, addr); - } - - if (edi_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - } - - if (edi_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu\n", - edi_fragments.size()); - } - } - else { - // Send over ethernet - for (auto& dest : edi_conf.destinations) { - InetAddress addr; - addr.setAddress(dest.dest_addr.c_str()); - addr.setPort(edi_conf.dest_port); - - dest.socket->send(edi_afpacket, addr); - } - - if (edi_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); - } - } + edi_sender->write(edi_tagpacket); } } diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index 4c2af54..bb9c8bc 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -36,9 +36,7 @@ #include "dabOutput/dabOutput.h" #include "dabOutput/edi/TagItems.h" #include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h" +#include "dabOutput/edi/Transport.h" // This metadata gets transmitted in the zmq stream struct metadata_t { @@ -55,7 +53,7 @@ class EDISender { EDISender(const EDISender& other) = delete; EDISender& operator=(const EDISender& other) = delete; ~EDISender(); - void start(const edi_configuration_t& conf, + void start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets); void push_frame(const frame_t& frame); void print_configuration(void); @@ -68,19 +66,11 @@ class EDISender { bool drop_late; std::atomic<bool> running; std::thread process_thread; - edi_configuration_t edi_conf; + edi::configuration_t edi_conf; std::chrono::steady_clock::time_point startTime; ThreadsafeQueue<frame_t> frames; - std::ofstream edi_debug_file; - // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; - - // To mitigate for burst packet loss, PFT fragments can be sent out-of-order - edi::Interleaver edi_interleaver; + std::shared_ptr<edi::Sender> edi_sender; // For statistics about wait time before we transmit packets, // in microseconds diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 3364faa..ee5776e 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -40,7 +40,7 @@ constexpr size_t MAX_ERROR_COUNT = 10; constexpr long ZMQ_TIMEOUT_MS = 1000; -static edi_configuration_t edi_conf; +static edi::configuration_t edi_conf; static EDISender edisender; @@ -155,7 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b /* There is some state inside the parsing of destination arguments, * because several destinations can be given. */ -static edi_destination_t edi_destination; +static std::shared_ptr<edi::udp_destination_t> edi_destination; static bool source_port_set = false; static bool source_addr_set = false; static bool ttl_set = false; @@ -168,9 +168,8 @@ static void add_edi_destination(void) std::to_string(edi_conf.destinations.size() + 1)); } - edi_conf.destinations.push_back(edi_destination); - edi_destination_t newdest; - edi_destination = newdest; + edi_conf.destinations.push_back(move(edi_destination)); + edi_destination.reset(); source_port_set = false; source_addr_set = false; @@ -180,33 +179,37 @@ static void add_edi_destination(void) static void parse_destination_args(char option) { + if (not edi_destination) { + edi_destination = std::make_shared<edi::udp_destination_t>(); + } + switch (option) { case 's': if (source_port_set) { add_edi_destination(); } - edi_destination.source_port = std::stoi(optarg); + edi_destination->source_port = std::stoi(optarg); source_port_set = true; break; case 'S': if (source_addr_set) { add_edi_destination(); } - edi_destination.source_addr = optarg; + edi_destination->source_addr = optarg; source_addr_set = true; break; case 't': if (ttl_set) { add_edi_destination(); } - edi_destination.ttl = std::stoi(optarg); + edi_destination->ttl = std::stoi(optarg); ttl_set = true; break; case 'd': if (dest_addr_set) { add_edi_destination(); } - edi_destination.dest_addr = optarg; + edi_destination->dest_addr = optarg; dest_addr_set = true; break; default: |