From 956814cc526bdd245e52c5004bf5661a57d848cc Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 6 May 2019 15:04:51 +0200 Subject: EDI: put more code in common between DabMux and ZMQ2EDI --- src/DabMultiplexer.cpp | 99 +---------------------- src/DabMultiplexer.h | 19 +---- src/DabMux.cpp | 26 ++---- src/dabOutput/dabOutput.h | 28 ------- src/dabOutput/edi/Config.h | 71 +++++++++++++++++ src/dabOutput/edi/Interleaver.cpp | 1 + src/dabOutput/edi/PFT.cpp | 30 ++++++- src/dabOutput/edi/PFT.h | 57 +++---------- src/dabOutput/edi/Transport.cpp | 164 ++++++++++++++++++++++++++++++++++++++ src/dabOutput/edi/Transport.h | 68 ++++++++++++++++ src/zmq2edi/EDISender.cpp | 110 ++----------------------- src/zmq2edi/EDISender.h | 20 ++--- src/zmq2edi/zmq2edi.cpp | 23 +++--- 13 files changed, 381 insertions(+), 335 deletions(-) create mode 100644 src/dabOutput/edi/Config.h create mode 100644 src/dabOutput/edi/Transport.cpp create mode 100644 src/dabOutput/edi/Transport.h (limited to 'src') diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 3a7f31f..9ff28a3 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -98,46 +98,10 @@ DabMultiplexer::~DabMultiplexer() rcs.remove_controllable(&m_clock_tai); } -void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) +void DabMultiplexer::set_edi_config(const edi::configuration_t& new_edi_conf) { edi_conf = new_edi_conf; - - if (edi_conf.verbose) { - etiLog.log(info, "Setup EDI"); - } - - if (edi_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (edi_conf.enabled()) { - for (auto& edi_destination : edi_conf.destinations) { - auto edi_output = std::make_shared(edi_destination.source_port); - - if (not edi_destination.source_addr.empty()) { - int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); - if (err) { - etiLog.level(error) << "EDI socket set source failed!"; - throw MuxInitException(); - } - err = edi_output->setMulticastTTL(edi_destination.ttl); - if (err) { - etiLog.level(error) << "EDI socket set TTL failed!"; - throw MuxInitException(); - } - } - - edi_destination.socket = edi_output; - } - } - - if (edi_conf.verbose) { - etiLog.log(info, "EDI set up"); - } - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT pft(edi_conf); - edi_pft = pft; + edi_sender = make_shared(edi_conf); } @@ -211,10 +175,6 @@ void DabMultiplexer::prepare(bool require_tai_clock) } } - if (edi_conf.interleaver_enabled()) { - edi_interleaver.SetLatency(edi_conf.latency_frames); - } - } @@ -749,7 +709,7 @@ void DabMultiplexer::mux_frame(std::vector >& outputs /********************************************************************** *********** Finalise and send EDI ******************************** **********************************************************************/ - if (edi_conf.enabled()) { + if (edi_sender and edi_conf.enabled()) { // put tags *ptr, DETI and all subchannels into one TagPacket edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); edi_tagpacket.tag_items.push_back(&edi_tagDETI); @@ -758,58 +718,7 @@ void DabMultiplexer::mux_frame(std::vector >& outputs edi_tagpacket.tag_items.push_back(&tag.second); } - // Assemble into one AF Packet - edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - - if (edi_conf.enable_pft) { - // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) - vector edi_fragments = edi_pft.Assemble(edi_afpacket); - - if (edi_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", - edi_fragments.size()); - } - - if (edi_conf.interleaver_enabled()) { - edi_fragments = edi_interleaver.Interleave(edi_fragments); - } - - // Send over ethernet - for (const auto& edi_frag : edi_fragments) { - for (auto& dest : edi_conf.destinations) { - InetAddress addr; - addr.setAddress(dest.dest_addr.c_str()); - addr.setPort(edi_conf.dest_port); - - dest.socket->send(edi_frag, addr); - } - - if (edi_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - } - - if (edi_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu", - edi_fragments.size()); - } - } - else { - // Send over ethernet - for (auto& dest : edi_conf.destinations) { - InetAddress addr; - addr.setAddress(dest.dest_addr.c_str()); - addr.setPort(edi_conf.dest_port); - - dest.socket->send(edi_afpacket, addr); - } - - if (edi_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); - } - } + edi_sender->write(edi_tagpacket); } #if _DEBUG diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 7090be7..386c23c 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -33,8 +33,7 @@ #include "dabOutput/edi/TagItems.h" #include "dabOutput/edi/TagPacket.h" #include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h" +#include "dabOutput/edi/Transport.h" #include "fig/FIGCarousel.h" #include "crc.h" #include "utils.h" @@ -67,7 +66,7 @@ class DabMultiplexer : public RemoteControllable { void print_info(void); - void set_edi_config(const edi_configuration_t& new_edi_conf); + void set_edi_config(const edi::configuration_t& new_edi_conf); /* Remote control */ virtual void set_parameter(const std::string& parameter, @@ -88,7 +87,8 @@ class DabMultiplexer : public RemoteControllable { std::time_t edi_time; std::time_t edi_time_latched_for_mnsc; - edi_configuration_t edi_conf; + edi::configuration_t edi_conf; + std::shared_ptr edi_sender; uint32_t sync = 0x49C5F8; unsigned long currentFrame = 0; @@ -99,17 +99,6 @@ class DabMultiplexer : public RemoteControllable { bool m_tai_clock_required = false; ClockTAI m_clock_tai; - std::ofstream edi_debug_file; - - // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; - - // To mitigate for burst packet loss, PFT fragments can be sent out-of-order - edi::Interleaver edi_interleaver; - /* New FIG Carousel */ FIC::FIGCarousel fig_carousel; }; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 3d19ee4..3d0a7d9 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -271,7 +271,7 @@ int main(int argc, char *argv[]) " starting up"; - edi_configuration_t edi_conf; + edi::configuration_t edi_conf; /******************** READ OUTPUT PARAMETERS ***************/ set all_output_names; @@ -293,12 +293,12 @@ int main(int argc, char *argv[]) if (outputuid == "edi") { ptree pt_edi = pt_outputs.get_child("edi"); for (auto pt_edi_dest : pt_edi.get_child("destinations")) { - edi_destination_t dest; - dest.dest_addr = pt_edi_dest.second.get("destination"); - dest.ttl = pt_edi_dest.second.get("ttl", 1); + auto dest = make_shared(); + dest->dest_addr = pt_edi_dest.second.get("destination"); + dest->ttl = pt_edi_dest.second.get("ttl", 1); - dest.source_addr = pt_edi_dest.second.get("source", ""); - dest.source_port = pt_edi_dest.second.get("sourceport"); + dest->source_addr = pt_edi_dest.second.get("source", ""); + dest->source_port = pt_edi_dest.second.get("sourceport"); edi_conf.destinations.push_back(dest); } @@ -460,19 +460,7 @@ int main(int argc, char *argv[]) printOutputs(outputs); if (edi_conf.enabled()) { - etiLog.level(info) << "EDI"; - etiLog.level(info) << " verbose " << edi_conf.verbose; - for (auto& edi_dest : edi_conf.destinations) { - etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; - if (not edi_dest.source_addr.empty()) { - etiLog.level(info) << " source " << edi_dest.source_addr; - etiLog.level(info) << " ttl " << edi_dest.ttl; - } - etiLog.level(info) << " source port " << edi_dest.source_port; - } - if (edi_conf.interleaver_enabled()) { - etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms"; - } + edi_conf.print(); } size_t limit = pt.get("general.nbframes", 0); diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index e5a8a94..9cc18d7 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -48,34 +48,6 @@ #endif #include "dabOutput/metadata.h" -/** Configuration for EDI output */ - -// Can represent both unicast and multicast destinations -struct edi_destination_t { - std::string dest_addr; - std::string source_addr; - unsigned int source_port = 0; - unsigned int ttl = 10; - - std::shared_ptr socket; -}; - -struct edi_configuration_t { - unsigned chunk_len = 207; // RSk, data length of each chunk - unsigned fec = 0; // number of fragments that can be recovered - bool dump = false; // dump a file with the EDI packets - bool verbose = false; - bool enable_pft = false; // Enable protection and fragmentation - unsigned int tagpacket_alignment = 0; - std::vector destinations; - unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer - unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms - - bool enabled() const { return destinations.size() > 0; } - bool interleaver_enabled() const { return latency_frames > 0; } -}; - - // Abstract base class for all outputs class DabOutput { diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h new file mode 100644 index 0000000..d3678d9 --- /dev/null +++ b/src/dabOutput/edi/Config.h @@ -0,0 +1,71 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + UDP and TCP transports and their configuration + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include "config.h" +#include +#include +#include +#include + +namespace edi { + +/** Configuration for EDI output */ + +struct destination_t { + virtual ~destination_t() {}; +}; + +// Can represent both unicast and multicast destinations +struct udp_destination_t : public destination_t { + std::string dest_addr; + std::string source_addr; + unsigned int source_port = 0; + unsigned int ttl = 10; +}; + +struct configuration_t { + unsigned chunk_len = 207; // RSk, data length of each chunk + unsigned fec = 0; // number of fragments that can be recovered + bool dump = false; // dump a file with the EDI packets + bool verbose = false; + bool enable_pft = false; // Enable protection and fragmentation + unsigned int tagpacket_alignment = 0; + std::vector > destinations; + unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer + unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms + + bool enabled() const { return destinations.size() > 0; } + bool interleaver_enabled() const { return latency_frames > 0; } + + void print() const; +}; + +} + + diff --git a/src/dabOutput/edi/Interleaver.cpp b/src/dabOutput/edi/Interleaver.cpp index e31326b..f26a50e 100644 --- a/src/dabOutput/edi/Interleaver.cpp +++ b/src/dabOutput/edi/Interleaver.cpp @@ -30,6 +30,7 @@ */ #include "Interleaver.h" +#include namespace edi { diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 1c885f9..5b93016 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2014 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -50,6 +50,30 @@ using namespace std; // An integer division that rounds up, i.e. ceil(a/b) #define CEIL_DIV(a, b) (a % b == 0 ? a / b : a / b + 1) +PFT::PFT() { } + +PFT::PFT(const configuration_t &conf) : + m_k(conf.chunk_len), + m_m(conf.fec), + m_dest_port(conf.dest_port), + m_pseq(0), + m_num_chunks(0), + m_verbose(conf.verbose) + { + if (m_k > 207) { + etiLog.level(warn) << + "EDI PFT: maximum chunk size is 207."; + throw std::out_of_range("EDI PFT Chunk size too large."); + } + + if (m_m > 5) { + etiLog.level(warn) << + "EDI PFT: high number of recoverable fragments" + " may lead to large overhead"; + // See TS 102 821, 7.2.1 Known values, list entry for 'm' + } + } + RSBlock PFT::Protect(AFPacket af_packet) { RSBlock rs_block; @@ -98,7 +122,7 @@ RSBlock PFT::Protect(AFPacket af_packet) // Calculate RS for each chunk and assemble RS block for (size_t i = 0; i < af_packet.size(); i+= chunk_len) { vector chunk(207); - vector protection(ParityBytes); + vector protection(PARITYBYTES); // copy chunk_len bytes into new chunk memcpy(&chunk.front(), &af_packet[i], chunk_len); @@ -139,7 +163,7 @@ vector< vector > PFT::ProtectAndFragment(AFPacket af_packet) #endif // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h)) - const size_t max_payload_size = ( m_num_chunks * ParityBytes ) / (m_m + 1); + const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1); // Calculate fragment count and size // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) diff --git a/src/dabOutput/edi/PFT.h b/src/dabOutput/edi/PFT.h index 05afdb1..4076bf3 100644 --- a/src/dabOutput/edi/PFT.h +++ b/src/dabOutput/edi/PFT.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2014 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -40,7 +40,7 @@ #include "AFPacket.h" #include "Log.h" #include "ReedSolomon.h" -#include "dabOutput/dabOutput.h" +#include "dabOutput/edi/Config.h" namespace edi { @@ -50,38 +50,10 @@ typedef std::vector PFTFragment; class PFT { public: - static const int ParityBytes = 48; - - PFT() : - m_k(207), - m_m(3), - m_dest_port(12000), - m_pseq(0), - m_num_chunks(0), - m_verbose(false) - { } - - PFT(const edi_configuration_t &conf) : - m_k(conf.chunk_len), - m_m(conf.fec), - m_dest_port(conf.dest_port), - m_pseq(0), - m_num_chunks(0), - m_verbose(conf.verbose) - { - if (m_k > 207) { - etiLog.level(warn) << - "EDI PFT: maximum chunk size is 207."; - throw std::out_of_range("EDI PFT Chunk size too large."); - } - - if (m_m > 5) { - etiLog.level(warn) << - "EDI PFT: high number of recoverable fragments" - " may lead to large overhead"; - // See TS 102 821, 7.2.1 Known values, list entry for 'm' - } - } + static constexpr int PARITYBYTES = 48; + + PFT(); + PFT(const configuration_t& conf); // return a list of PFT fragments with the correct // PFT headers @@ -94,17 +66,12 @@ class PFT std::vector< std::vector > ProtectAndFragment(AFPacket af_packet); private: - unsigned int m_k; // length of RS data word - unsigned int m_m; // number of fragments that can be recovered if lost - - unsigned int m_dest_port; // Destination port for transport header - - uint16_t m_pseq; - - size_t m_num_chunks; - - bool m_verbose; - + unsigned int m_k = 207; // length of RS data word + unsigned int m_m = 3; // number of fragments that can be recovered if lost + unsigned int m_dest_port = 12000; // Destination port for transport header + uint16_t m_pseq = 0; + size_t m_num_chunks = 0; + bool m_verbose = 0; }; } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp new file mode 100644 index 0000000..d433239 --- /dev/null +++ b/src/dabOutput/edi/Transport.cpp @@ -0,0 +1,164 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + UDP and TCP transports and their configuration + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "Transport.h" +#include + +using namespace std; + +namespace edi { + +void configuration_t::print() const +{ + etiLog.level(info) << "EDI"; + etiLog.level(info) << " verbose " << verbose; + for (auto edi_dest : destinations) { + if (auto udp_dest = dynamic_pointer_cast(edi_dest)) { + etiLog.level(info) << " to " << udp_dest->dest_addr << ":" << dest_port; + if (not udp_dest->source_addr.empty()) { + etiLog.level(info) << " source " << udp_dest->source_addr; + etiLog.level(info) << " ttl " << udp_dest->ttl; + } + etiLog.level(info) << " source port " << udp_dest->source_port; + } + else { + throw std::logic_error("EDI destination not implemented"); + } + } + if (interleaver_enabled()) { + etiLog.level(info) << " interleave " << latency_frames * 24 << " ms"; + } +} + + +Sender::Sender(const configuration_t& conf) : + m_conf(conf), + edi_pft(m_conf) +{ + if (m_conf.verbose) { + etiLog.log(info, "Setup EDI"); + } + + for (const auto& edi_dest : m_conf.destinations) { + if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) { + auto udp_socket = std::make_shared(udp_dest->source_port); + + if (not udp_dest->source_addr.empty()) { + int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + if (err) { + throw runtime_error("EDI socket set source failed!"); + } + err = udp_socket->setMulticastTTL(udp_dest->ttl); + if (err) { + throw runtime_error("EDI socket set TTL failed!"); + } + } + + udp_sockets.emplace(udp_dest.get(), udp_socket); + } + } + + if (m_conf.interleaver_enabled()) { + edi_interleaver.SetLatency(m_conf.latency_frames); + } + + if (m_conf.dump) { + edi_debug_file.open("./edi.debug"); + } + + if (m_conf.verbose) { + etiLog.log(info, "EDI set up"); + } +} + +void Sender::write(const TagPacket& tagpacket) +{ + // Assemble into one AF Packet + edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + + if (m_conf.enable_pft) { + // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) + vector edi_fragments = edi_pft.Assemble(af_packet); + + if (m_conf.verbose) { + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", + edi_fragments.size()); + } + + if (m_conf.interleaver_enabled()) { + edi_fragments = edi_interleaver.Interleave(edi_fragments); + } + + // Send over ethernet + for (const auto& edi_frag : edi_fragments) { + for (auto& dest : m_conf.destinations) { + if (const auto& udp_dest = dynamic_pointer_cast(dest)) { + InetAddress addr; + addr.setAddress(udp_dest->dest_addr.c_str()); + addr.setPort(m_conf.dest_port); + + udp_sockets.at(udp_dest.get())->send(edi_frag, addr); + } + else { + throw std::logic_error("EDI destination not implemented"); + } + } + + if (m_conf.dump) { + std::ostream_iterator debug_iterator(edi_debug_file); + std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + } + } + + if (m_conf.verbose) { + fprintf(stderr, "EDI number of PFT fragments %zu", + edi_fragments.size()); + } + } + else { + // Send over ethernet + for (auto& dest : m_conf.destinations) { + if (const auto& udp_dest = dynamic_pointer_cast(dest)) { + InetAddress addr; + addr.setAddress(udp_dest->dest_addr.c_str()); + addr.setPort(m_conf.dest_port); + + udp_sockets.at(udp_dest.get())->send(af_packet, addr); + } + else { + throw std::logic_error("EDI destination not implemented"); + } + } + + if (m_conf.dump) { + std::ostream_iterator debug_iterator(edi_debug_file); + std::copy(af_packet.begin(), af_packet.end(), debug_iterator); + } + } +} + +} diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h new file mode 100644 index 0000000..3c48c96 --- /dev/null +++ b/src/dabOutput/edi/Transport.h @@ -0,0 +1,68 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + UDP and TCP transports and their configuration + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include "config.h" +#include "dabOutput/edi/Config.h" +#include "AFPacket.h" +#include "PFT.h" +#include "Interleaver.h" +#include +#include +#include +#include +#include "dabOutput/dabOutput.h" + +namespace edi { + +/** Configuration for EDI output */ + +class Sender { + public: + Sender(const configuration_t& conf); + + void write(const TagPacket& tagpacket); + + private: + configuration_t m_conf; + std::ofstream edi_debug_file; + + // The TagPacket will then be placed into an AFPacket + edi::AFPacketiser edi_afPacketiser; + + // The AF Packet will be protected with reed-solomon and split in fragments + edi::PFT edi_pft; + + // To mitigate for burst packet loss, PFT fragments can be sent out-of-order + edi::Interleaver edi_interleaver; + + std::unordered_map> udp_sockets; +}; + +} + diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 0df633f..2128abf 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -47,51 +47,14 @@ EDISender::~EDISender() } } -void EDISender::start(const edi_configuration_t& conf, +void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets) { edi_conf = conf; tist_delay_ms = delay_ms; drop_late = drop_late_packets; - if (edi_conf.verbose) { - etiLog.log(info, "Setup EDI"); - } - - if (edi_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (edi_conf.enabled()) { - for (auto& edi_destination : edi_conf.destinations) { - auto edi_output = make_shared(edi_destination.source_port); - - if (not edi_destination.source_addr.empty()) { - int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = edi_output->setMulticastTTL(edi_destination.ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } - } - - edi_destination.socket = edi_output; - } - } - - if (edi_conf.verbose) { - etiLog.log(info, "EDI set up"); - } - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT pft(edi_conf); - edi_pft = pft; - - if (edi_conf.interleaver_enabled()) { - edi_interleaver.SetLatency(edi_conf.latency_frames); - } + edi_sender = make_shared(edi_conf); startTime = std::chrono::steady_clock::now(); running.store(true); @@ -106,19 +69,7 @@ void EDISender::push_frame(const frame_t& frame) void EDISender::print_configuration() { if (edi_conf.enabled()) { - etiLog.level(info) << "EDI"; - etiLog.level(info) << " verbose " << edi_conf.verbose; - for (auto& edi_dest : edi_conf.destinations) { - etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; - if (not edi_dest.source_addr.empty()) { - etiLog.level(info) << " source " << edi_dest.source_addr; - etiLog.level(info) << " ttl " << edi_dest.ttl; - } - etiLog.level(info) << " source port " << edi_dest.source_port; - } - if (edi_conf.interleaver_enabled()) { - etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms"; - } + edi_conf.print(); } else { etiLog.level(info) << "EDI disabled"; @@ -251,7 +202,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) edi_tagDETI.utco = metadata.utc_offset; edi_tagDETI.seconds = metadata.edi_time; - if (edi_conf.enabled()) { + if (edi_sender and edi_conf.enabled()) { // put tags *ptr, DETI and all subchannels into one TagPacket edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); edi_tagpacket.tag_items.push_back(&edi_tagDETI); @@ -260,58 +211,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) edi_tagpacket.tag_items.push_back(&tag.second); } - // Assemble into one AF Packet - edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - - if (edi_conf.enable_pft) { - // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) - vector edi_fragments = edi_pft.Assemble(edi_afpacket); - - if (edi_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", - edi_fragments.size()); - } - - if (edi_conf.interleaver_enabled()) { - edi_fragments = edi_interleaver.Interleave(edi_fragments); - } - - // Send over ethernet - for (const auto& edi_frag : edi_fragments) { - for (auto& dest : edi_conf.destinations) { - InetAddress addr; - addr.setAddress(dest.dest_addr.c_str()); - addr.setPort(edi_conf.dest_port); - - dest.socket->send(edi_frag, addr); - } - - if (edi_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - } - - if (edi_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu\n", - edi_fragments.size()); - } - } - else { - // Send over ethernet - for (auto& dest : edi_conf.destinations) { - InetAddress addr; - addr.setAddress(dest.dest_addr.c_str()); - addr.setPort(edi_conf.dest_port); - - dest.socket->send(edi_afpacket, addr); - } - - if (edi_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); - } - } + edi_sender->write(edi_tagpacket); } } diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index 4c2af54..bb9c8bc 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -36,9 +36,7 @@ #include "dabOutput/dabOutput.h" #include "dabOutput/edi/TagItems.h" #include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h" +#include "dabOutput/edi/Transport.h" // This metadata gets transmitted in the zmq stream struct metadata_t { @@ -55,7 +53,7 @@ class EDISender { EDISender(const EDISender& other) = delete; EDISender& operator=(const EDISender& other) = delete; ~EDISender(); - void start(const edi_configuration_t& conf, + void start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets); void push_frame(const frame_t& frame); void print_configuration(void); @@ -68,19 +66,11 @@ class EDISender { bool drop_late; std::atomic running; std::thread process_thread; - edi_configuration_t edi_conf; + edi::configuration_t edi_conf; std::chrono::steady_clock::time_point startTime; ThreadsafeQueue frames; - std::ofstream edi_debug_file; - // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; - - // To mitigate for burst packet loss, PFT fragments can be sent out-of-order - edi::Interleaver edi_interleaver; + std::shared_ptr edi_sender; // For statistics about wait time before we transmit packets, // in microseconds diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 3364faa..ee5776e 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -40,7 +40,7 @@ constexpr size_t MAX_ERROR_COUNT = 10; constexpr long ZMQ_TIMEOUT_MS = 1000; -static edi_configuration_t edi_conf; +static edi::configuration_t edi_conf; static EDISender edisender; @@ -155,7 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b /* There is some state inside the parsing of destination arguments, * because several destinations can be given. */ -static edi_destination_t edi_destination; +static std::shared_ptr edi_destination; static bool source_port_set = false; static bool source_addr_set = false; static bool ttl_set = false; @@ -168,9 +168,8 @@ static void add_edi_destination(void) std::to_string(edi_conf.destinations.size() + 1)); } - edi_conf.destinations.push_back(edi_destination); - edi_destination_t newdest; - edi_destination = newdest; + edi_conf.destinations.push_back(move(edi_destination)); + edi_destination.reset(); source_port_set = false; source_addr_set = false; @@ -180,33 +179,37 @@ static void add_edi_destination(void) static void parse_destination_args(char option) { + if (not edi_destination) { + edi_destination = std::make_shared(); + } + switch (option) { case 's': if (source_port_set) { add_edi_destination(); } - edi_destination.source_port = std::stoi(optarg); + edi_destination->source_port = std::stoi(optarg); source_port_set = true; break; case 'S': if (source_addr_set) { add_edi_destination(); } - edi_destination.source_addr = optarg; + edi_destination->source_addr = optarg; source_addr_set = true; break; case 't': if (ttl_set) { add_edi_destination(); } - edi_destination.ttl = std::stoi(optarg); + edi_destination->ttl = std::stoi(optarg); ttl_set = true; break; case 'd': if (dest_addr_set) { add_edi_destination(); } - edi_destination.dest_addr = optarg; + edi_destination->dest_addr = optarg; dest_addr_set = true; break; default: -- cgit v1.2.3