diff options
| -rw-r--r-- | Makefile.am | 22 | ||||
| -rw-r--r-- | src/DabMultiplexer.cpp | 99 | ||||
| -rw-r--r-- | src/DabMultiplexer.h | 19 | ||||
| -rw-r--r-- | src/DabMux.cpp | 26 | ||||
| -rw-r--r-- | src/dabOutput/dabOutput.h | 28 | ||||
| -rw-r--r-- | src/dabOutput/edi/Config.h | 71 | ||||
| -rw-r--r-- | src/dabOutput/edi/Interleaver.cpp | 1 | ||||
| -rw-r--r-- | src/dabOutput/edi/PFT.cpp | 30 | ||||
| -rw-r--r-- | src/dabOutput/edi/PFT.h | 57 | ||||
| -rw-r--r-- | src/dabOutput/edi/Transport.cpp | 164 | ||||
| -rw-r--r-- | src/dabOutput/edi/Transport.h | 68 | ||||
| -rw-r--r-- | src/zmq2edi/EDISender.cpp | 110 | ||||
| -rw-r--r-- | src/zmq2edi/EDISender.h | 20 | ||||
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 23 | 
14 files changed, 395 insertions, 343 deletions
| diff --git a/Makefile.am b/Makefile.am index 3dbb918..b756385 100644 --- a/Makefile.am +++ b/Makefile.am @@ -81,14 +81,17 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/dabOutput/metadata.cpp \  					 src/dabOutput/edi/AFPacket.cpp \  					 src/dabOutput/edi/AFPacket.h \ +					 src/dabOutput/edi/Config.h \ +					 src/dabOutput/edi/Interleaver.cpp \ +					 src/dabOutput/edi/Interleaver.h \ +					 src/dabOutput/edi/PFT.cpp \ +					 src/dabOutput/edi/PFT.h \  					 src/dabOutput/edi/TagItems.cpp \  					 src/dabOutput/edi/TagItems.h \  					 src/dabOutput/edi/TagPacket.cpp \  					 src/dabOutput/edi/TagPacket.h \ -					 src/dabOutput/edi/PFT.cpp \ -					 src/dabOutput/edi/PFT.h \ -					 src/dabOutput/edi/Interleaver.cpp \ -					 src/dabOutput/edi/Interleaver.h \ +					 src/dabOutput/edi/Transport.cpp \ +					 src/dabOutput/edi/Transport.h \  					 src/ClockTAI.h \  					 src/ClockTAI.cpp \  					 src/ConfigParser.cpp \ @@ -191,14 +194,17 @@ odr_zmq2edi_SOURCES  = src/zmq2edi/zmq2edi.cpp \  					   src/dabOutput/metadata.cpp \  					   src/dabOutput/edi/AFPacket.cpp \  					   src/dabOutput/edi/AFPacket.h \ +					   src/dabOutput/edi/Config.h \ +					   src/dabOutput/edi/Interleaver.cpp \ +					   src/dabOutput/edi/Interleaver.h \ +					   src/dabOutput/edi/PFT.cpp \ +					   src/dabOutput/edi/PFT.h \  					   src/dabOutput/edi/TagItems.cpp \  					   src/dabOutput/edi/TagItems.h \  					   src/dabOutput/edi/TagPacket.cpp \  					   src/dabOutput/edi/TagPacket.h \ -					   src/dabOutput/edi/PFT.cpp \ -					   src/dabOutput/edi/PFT.h \ -					   src/dabOutput/edi/Interleaver.cpp \ -					   src/dabOutput/edi/Interleaver.h \ +					   src/dabOutput/edi/Transport.cpp \ +					   src/dabOutput/edi/Transport.h \  					   src/InetAddress.h \  					   src/InetAddress.cpp \  					   src/UdpSocket.h \ diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 3a7f31f..9ff28a3 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -98,46 +98,10 @@ DabMultiplexer::~DabMultiplexer()      rcs.remove_controllable(&m_clock_tai);  } -void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) +void DabMultiplexer::set_edi_config(const edi::configuration_t& new_edi_conf)  {      edi_conf = new_edi_conf; - -    if (edi_conf.verbose) { -        etiLog.log(info, "Setup EDI"); -    } - -    if (edi_conf.dump) { -        edi_debug_file.open("./edi.debug"); -    } - -    if (edi_conf.enabled()) { -        for (auto& edi_destination : edi_conf.destinations) { -            auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port); - -            if (not edi_destination.source_addr.empty()) { -                int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); -                if (err) { -                    etiLog.level(error) << "EDI socket set source failed!"; -                    throw MuxInitException(); -                } -                err = edi_output->setMulticastTTL(edi_destination.ttl); -                if (err) { -                    etiLog.level(error) << "EDI socket set TTL failed!"; -                    throw MuxInitException(); -                } -            } - -            edi_destination.socket = edi_output; -        } -    } - -    if (edi_conf.verbose) { -        etiLog.log(info, "EDI set up"); -    } - -    // The AF Packet will be protected with reed-solomon and split in fragments -    edi::PFT pft(edi_conf); -    edi_pft = pft; +    edi_sender = make_shared<edi::Sender>(edi_conf);  } @@ -211,10 +175,6 @@ void DabMultiplexer::prepare(bool require_tai_clock)          }      } -    if (edi_conf.interleaver_enabled()) { -        edi_interleaver.SetLatency(edi_conf.latency_frames); -    } -  } @@ -749,7 +709,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs      /**********************************************************************       ***********   Finalise and send EDI   ********************************       **********************************************************************/ -    if (edi_conf.enabled()) { +    if (edi_sender and edi_conf.enabled()) {          // put tags *ptr, DETI and all subchannels into one TagPacket          edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);          edi_tagpacket.tag_items.push_back(&edi_tagDETI); @@ -758,58 +718,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs              edi_tagpacket.tag_items.push_back(&tag.second);          } -        // Assemble into one AF Packet -        edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - -        if (edi_conf.enable_pft) { -            // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) -            vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket); - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", -                        edi_fragments.size()); -            } - -            if (edi_conf.interleaver_enabled()) { -                edi_fragments = edi_interleaver.Interleave(edi_fragments); -            } - -            // Send over ethernet -            for (const auto& edi_frag : edi_fragments) { -                for (auto& dest : edi_conf.destinations) { -                    InetAddress addr; -                    addr.setAddress(dest.dest_addr.c_str()); -                    addr.setPort(edi_conf.dest_port); - -                    dest.socket->send(edi_frag, addr); -                } - -                if (edi_conf.dump) { -                    std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                    std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -                } -            } - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragments %zu", -                        edi_fragments.size()); -            } -        } -        else { -            // Send over ethernet -            for (auto& dest : edi_conf.destinations) { -                InetAddress addr; -                addr.setAddress(dest.dest_addr.c_str()); -                addr.setPort(edi_conf.dest_port); - -                dest.socket->send(edi_afpacket, addr); -            } - -            if (edi_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); -            } -        } +        edi_sender->write(edi_tagpacket);      }  #if _DEBUG diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 7090be7..386c23c 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -33,8 +33,7 @@  #include "dabOutput/edi/TagItems.h"  #include "dabOutput/edi/TagPacket.h"  #include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h" +#include "dabOutput/edi/Transport.h"  #include "fig/FIGCarousel.h"  #include "crc.h"  #include "utils.h" @@ -67,7 +66,7 @@ class DabMultiplexer : public RemoteControllable {          void print_info(void); -        void set_edi_config(const edi_configuration_t& new_edi_conf); +        void set_edi_config(const edi::configuration_t& new_edi_conf);          /* Remote control */          virtual void set_parameter(const std::string& parameter, @@ -88,7 +87,8 @@ class DabMultiplexer : public RemoteControllable {          std::time_t edi_time;          std::time_t edi_time_latched_for_mnsc; -        edi_configuration_t edi_conf; +        edi::configuration_t edi_conf; +        std::shared_ptr<edi::Sender> edi_sender;          uint32_t sync = 0x49C5F8;          unsigned long currentFrame = 0; @@ -99,17 +99,6 @@ class DabMultiplexer : public RemoteControllable {          bool m_tai_clock_required = false;          ClockTAI m_clock_tai; -        std::ofstream edi_debug_file; - -        // The TagPacket will then be placed into an AFPacket -        edi::AFPacketiser edi_afPacketiser; - -        // The AF Packet will be protected with reed-solomon and split in fragments -        edi::PFT edi_pft; - -        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order -        edi::Interleaver edi_interleaver; -          /* New FIG Carousel */          FIC::FIGCarousel fig_carousel;  }; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 3d19ee4..3d0a7d9 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -271,7 +271,7 @@ int main(int argc, char *argv[])                  " starting up"; -        edi_configuration_t edi_conf; +        edi::configuration_t edi_conf;          /******************** READ OUTPUT PARAMETERS ***************/          set<string> all_output_names; @@ -293,12 +293,12 @@ int main(int argc, char *argv[])              if (outputuid == "edi") {                  ptree pt_edi = pt_outputs.get_child("edi");                  for (auto pt_edi_dest : pt_edi.get_child("destinations")) { -                    edi_destination_t dest; -                    dest.dest_addr   = pt_edi_dest.second.get<string>("destination"); -                    dest.ttl         = pt_edi_dest.second.get<unsigned int>("ttl", 1); +                    auto dest = make_shared<edi::udp_destination_t>(); +                    dest->dest_addr   = pt_edi_dest.second.get<string>("destination"); +                    dest->ttl         = pt_edi_dest.second.get<unsigned int>("ttl", 1); -                    dest.source_addr = pt_edi_dest.second.get<string>("source", ""); -                    dest.source_port = pt_edi_dest.second.get<unsigned int>("sourceport"); +                    dest->source_addr = pt_edi_dest.second.get<string>("source", ""); +                    dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport");                      edi_conf.destinations.push_back(dest);                  } @@ -460,19 +460,7 @@ int main(int argc, char *argv[])          printOutputs(outputs);          if (edi_conf.enabled()) { -            etiLog.level(info) << "EDI"; -            etiLog.level(info) << " verbose     " << edi_conf.verbose; -            for (auto& edi_dest : edi_conf.destinations) { -                etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; -                if (not edi_dest.source_addr.empty()) { -                    etiLog.level(info) << "  source      " << edi_dest.source_addr; -                    etiLog.level(info) << "  ttl         " << edi_dest.ttl; -                } -                etiLog.level(info) << "  source port " << edi_dest.source_port; -            } -            if (edi_conf.interleaver_enabled()) { -                etiLog.level(info) << " interleave     " << edi_conf.latency_frames * 24 << " ms"; -            } +            edi_conf.print();          }          size_t limit = pt.get("general.nbframes", 0); diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index e5a8a94..9cc18d7 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -48,34 +48,6 @@  #endif  #include "dabOutput/metadata.h" -/** Configuration for EDI output */ - -// Can represent both unicast and multicast destinations -struct edi_destination_t { -    std::string dest_addr; -    std::string source_addr; -    unsigned int source_port = 0; -    unsigned int ttl = 10; - -    std::shared_ptr<UdpSocket> socket; -}; - -struct edi_configuration_t { -    unsigned chunk_len = 207;        // RSk, data length of each chunk -    unsigned fec       = 0;          // number of fragments that can be recovered -    bool dump          = false;      // dump a file with the EDI packets -    bool verbose       = false; -    bool enable_pft    = false;      // Enable protection and fragmentation -    unsigned int tagpacket_alignment = 0; -    std::vector<edi_destination_t> destinations; -    unsigned int dest_port = 0;      // common destination port, because it's encoded in the transport layer -    unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms - -    bool enabled() const { return destinations.size() > 0; } -    bool interleaver_enabled() const { return latency_frames > 0; } -}; - -  // Abstract base class for all outputs  class DabOutput  { diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h new file mode 100644 index 0000000..d3678d9 --- /dev/null +++ b/src/dabOutput/edi/Config.h @@ -0,0 +1,71 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   UDP and TCP transports and their configuration + +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#pragma once + +#include "config.h" +#include <vector> +#include <string> +#include <memory> +#include <cstdint> + +namespace edi { + +/** Configuration for EDI output */ + +struct destination_t { +    virtual ~destination_t() {}; +}; + +// Can represent both unicast and multicast destinations +struct udp_destination_t : public destination_t { +    std::string dest_addr; +    std::string source_addr; +    unsigned int source_port = 0; +    unsigned int ttl = 10; +}; + +struct configuration_t { +    unsigned chunk_len = 207;        // RSk, data length of each chunk +    unsigned fec       = 0;          // number of fragments that can be recovered +    bool dump          = false;      // dump a file with the EDI packets +    bool verbose       = false; +    bool enable_pft    = false;      // Enable protection and fragmentation +    unsigned int tagpacket_alignment = 0; +    std::vector<std::shared_ptr<destination_t> > destinations; +    unsigned int dest_port = 0;      // common destination port, because it's encoded in the transport layer +    unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms + +    bool enabled() const { return destinations.size() > 0; } +    bool interleaver_enabled() const { return latency_frames > 0; } + +    void print() const; +}; + +} + + diff --git a/src/dabOutput/edi/Interleaver.cpp b/src/dabOutput/edi/Interleaver.cpp index e31326b..f26a50e 100644 --- a/src/dabOutput/edi/Interleaver.cpp +++ b/src/dabOutput/edi/Interleaver.cpp @@ -30,6 +30,7 @@     */  #include "Interleaver.h" +#include <cassert>  namespace edi { diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 1c885f9..5b93016 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -1,5 +1,5 @@  /* -   Copyright (C) 2014 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -50,6 +50,30 @@ using namespace std;  // An integer division that rounds up, i.e. ceil(a/b)  #define CEIL_DIV(a, b) (a % b == 0  ? a / b : a / b + 1) +PFT::PFT() { } + +PFT::PFT(const configuration_t &conf) : +    m_k(conf.chunk_len), +    m_m(conf.fec), +    m_dest_port(conf.dest_port), +    m_pseq(0), +    m_num_chunks(0), +    m_verbose(conf.verbose) +    { +        if (m_k > 207) { +            etiLog.level(warn) << +                "EDI PFT: maximum chunk size is 207."; +            throw std::out_of_range("EDI PFT Chunk size too large."); +        } + +        if (m_m > 5) { +            etiLog.level(warn) << +                "EDI PFT: high number of recoverable fragments" +                " may lead to large overhead"; +            // See TS 102 821, 7.2.1 Known values, list entry for 'm' +        } +    } +  RSBlock PFT::Protect(AFPacket af_packet)  {      RSBlock rs_block; @@ -98,7 +122,7 @@ RSBlock PFT::Protect(AFPacket af_packet)      // Calculate RS for each chunk and assemble RS block      for (size_t i = 0; i < af_packet.size(); i+= chunk_len) {          vector<uint8_t> chunk(207); -        vector<uint8_t> protection(ParityBytes); +        vector<uint8_t> protection(PARITYBYTES);          // copy chunk_len bytes into new chunk          memcpy(&chunk.front(), &af_packet[i], chunk_len); @@ -139,7 +163,7 @@ vector< vector<uint8_t> > PFT::ProtectAndFragment(AFPacket af_packet)  #endif          // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h)) -        const size_t max_payload_size = ( m_num_chunks * ParityBytes ) / (m_m + 1); +        const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1);          // Calculate fragment count and size          // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) diff --git a/src/dabOutput/edi/PFT.h b/src/dabOutput/edi/PFT.h index 05afdb1..4076bf3 100644 --- a/src/dabOutput/edi/PFT.h +++ b/src/dabOutput/edi/PFT.h @@ -1,5 +1,5 @@  /* -   Copyright (C) 2014 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -40,7 +40,7 @@  #include "AFPacket.h"  #include "Log.h"  #include "ReedSolomon.h" -#include "dabOutput/dabOutput.h" +#include "dabOutput/edi/Config.h"  namespace edi { @@ -50,38 +50,10 @@ typedef std::vector<uint8_t> PFTFragment;  class PFT  {      public: -        static const int ParityBytes = 48; - -        PFT() : -            m_k(207), -            m_m(3), -            m_dest_port(12000), -            m_pseq(0), -            m_num_chunks(0), -            m_verbose(false) -        { } - -        PFT(const edi_configuration_t &conf) : -            m_k(conf.chunk_len), -            m_m(conf.fec), -            m_dest_port(conf.dest_port), -            m_pseq(0), -            m_num_chunks(0), -            m_verbose(conf.verbose) -        { -            if (m_k > 207) { -                etiLog.level(warn) << -                        "EDI PFT: maximum chunk size is 207."; -                throw std::out_of_range("EDI PFT Chunk size too large."); -            } - -            if (m_m > 5) { -                etiLog.level(warn) << -                    "EDI PFT: high number of recoverable fragments" -                    " may lead to large overhead"; -                // See TS 102 821, 7.2.1 Known values, list entry for 'm' -            } -        } +        static constexpr int PARITYBYTES = 48; + +        PFT(); +        PFT(const configuration_t& conf);          // return a list of PFT fragments with the correct          // PFT headers @@ -94,17 +66,12 @@ class PFT          std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet);      private: -        unsigned int m_k; // length of RS data word -        unsigned int m_m; // number of fragments that can be recovered if lost - -        unsigned int m_dest_port; // Destination port for transport header - -        uint16_t m_pseq; - -        size_t m_num_chunks; - -        bool m_verbose; - +        unsigned int m_k = 207; // length of RS data word +        unsigned int m_m = 3; // number of fragments that can be recovered if lost +        unsigned int m_dest_port = 12000; // Destination port for transport header +        uint16_t m_pseq = 0; +        size_t m_num_chunks = 0; +        bool m_verbose = 0;  };  } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp new file mode 100644 index 0000000..d433239 --- /dev/null +++ b/src/dabOutput/edi/Transport.cpp @@ -0,0 +1,164 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   UDP and TCP transports and their configuration + +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include "Transport.h" +#include <iterator> + +using namespace std; + +namespace edi { + +void configuration_t::print() const +{ +    etiLog.level(info) << "EDI"; +    etiLog.level(info) << " verbose     " << verbose; +    for (auto edi_dest : destinations) { +        if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { +            etiLog.level(info) << " to " << udp_dest->dest_addr << ":" << dest_port; +            if (not udp_dest->source_addr.empty()) { +                etiLog.level(info) << "  source      " << udp_dest->source_addr; +                etiLog.level(info) << "  ttl         " << udp_dest->ttl; +            } +            etiLog.level(info) << "  source port " << udp_dest->source_port; +        } +        else { +            throw std::logic_error("EDI destination not implemented"); +        } +    } +    if (interleaver_enabled()) { +        etiLog.level(info) << " interleave     " << latency_frames * 24 << " ms"; +    } +} + + +Sender::Sender(const configuration_t& conf) : +    m_conf(conf), +    edi_pft(m_conf) +{ +    if (m_conf.verbose) { +        etiLog.log(info, "Setup EDI"); +    } + +    for (const auto& edi_dest : m_conf.destinations) { +        if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { +            auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port); + +            if (not udp_dest->source_addr.empty()) { +                int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); +                if (err) { +                    throw runtime_error("EDI socket set source failed!"); +                } +                err = udp_socket->setMulticastTTL(udp_dest->ttl); +                if (err) { +                    throw runtime_error("EDI socket set TTL failed!"); +                } +            } + +            udp_sockets.emplace(udp_dest.get(), udp_socket); +        } +    } + +    if (m_conf.interleaver_enabled()) { +        edi_interleaver.SetLatency(m_conf.latency_frames); +    } + +    if (m_conf.dump) { +        edi_debug_file.open("./edi.debug"); +    } + +    if (m_conf.verbose) { +        etiLog.log(info, "EDI set up"); +    } +} + +void Sender::write(const TagPacket& tagpacket) +{ +    // Assemble into one AF Packet +    edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + +    if (m_conf.enable_pft) { +        // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) +        vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); + +        if (m_conf.verbose) { +            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", +                    edi_fragments.size()); +        } + +        if (m_conf.interleaver_enabled()) { +            edi_fragments = edi_interleaver.Interleave(edi_fragments); +        } + +        // Send over ethernet +        for (const auto& edi_frag : edi_fragments) { +            for (auto& dest : m_conf.destinations) { +                if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { +                    InetAddress addr; +                    addr.setAddress(udp_dest->dest_addr.c_str()); +                    addr.setPort(m_conf.dest_port); + +                    udp_sockets.at(udp_dest.get())->send(edi_frag, addr); +                } +                else { +                    throw std::logic_error("EDI destination not implemented"); +                } +            } + +            if (m_conf.dump) { +                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +            } +        } + +        if (m_conf.verbose) { +            fprintf(stderr, "EDI number of PFT fragments %zu", +                    edi_fragments.size()); +        } +    } +    else { +        // Send over ethernet +        for (auto& dest : m_conf.destinations) { +            if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { +                InetAddress addr; +                addr.setAddress(udp_dest->dest_addr.c_str()); +                addr.setPort(m_conf.dest_port); + +                udp_sockets.at(udp_dest.get())->send(af_packet, addr); +            } +            else { +                throw std::logic_error("EDI destination not implemented"); +            } +        } + +        if (m_conf.dump) { +            std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +            std::copy(af_packet.begin(), af_packet.end(), debug_iterator); +        } +    } +} + +} diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h new file mode 100644 index 0000000..3c48c96 --- /dev/null +++ b/src/dabOutput/edi/Transport.h @@ -0,0 +1,68 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   UDP and TCP transports and their configuration + +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#pragma once + +#include "config.h" +#include "dabOutput/edi/Config.h" +#include "AFPacket.h" +#include "PFT.h" +#include "Interleaver.h" +#include <vector> +#include <unordered_map> +#include <stdexcept> +#include <cstdint> +#include "dabOutput/dabOutput.h" + +namespace edi { + +/** Configuration for EDI output */ + +class Sender { +    public: +        Sender(const configuration_t& conf); + +        void write(const TagPacket& tagpacket); + +    private: +        configuration_t m_conf; +        std::ofstream edi_debug_file; + +        // The TagPacket will then be placed into an AFPacket +        edi::AFPacketiser edi_afPacketiser; + +        // The AF Packet will be protected with reed-solomon and split in fragments +        edi::PFT edi_pft; + +        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order +        edi::Interleaver edi_interleaver; + +        std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets; +}; + +} + diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 0df633f..2128abf 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -47,51 +47,14 @@ EDISender::~EDISender()      }  } -void EDISender::start(const edi_configuration_t& conf, +void EDISender::start(const edi::configuration_t& conf,          int delay_ms, bool drop_late_packets)  {      edi_conf = conf;      tist_delay_ms = delay_ms;      drop_late = drop_late_packets; -    if (edi_conf.verbose) { -        etiLog.log(info, "Setup EDI"); -    } - -    if (edi_conf.dump) { -        edi_debug_file.open("./edi.debug"); -    } - -    if (edi_conf.enabled()) { -        for (auto& edi_destination : edi_conf.destinations) { -            auto edi_output = make_shared<UdpSocket>(edi_destination.source_port); - -            if (not edi_destination.source_addr.empty()) { -                int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); -                if (err) { -                    throw runtime_error("EDI socket set source failed!"); -                } -                err = edi_output->setMulticastTTL(edi_destination.ttl); -                if (err) { -                    throw runtime_error("EDI socket set TTL failed!"); -                } -            } - -            edi_destination.socket = edi_output; -        } -    } - -    if (edi_conf.verbose) { -        etiLog.log(info, "EDI set up"); -    } - -    // The AF Packet will be protected with reed-solomon and split in fragments -    edi::PFT pft(edi_conf); -    edi_pft = pft; - -    if (edi_conf.interleaver_enabled()) { -        edi_interleaver.SetLatency(edi_conf.latency_frames); -    } +    edi_sender = make_shared<edi::Sender>(edi_conf);      startTime = std::chrono::steady_clock::now();      running.store(true); @@ -106,19 +69,7 @@ void EDISender::push_frame(const frame_t& frame)  void EDISender::print_configuration()  {      if (edi_conf.enabled()) { -        etiLog.level(info) << "EDI"; -        etiLog.level(info) << " verbose     " << edi_conf.verbose; -        for (auto& edi_dest : edi_conf.destinations) { -            etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; -            if (not edi_dest.source_addr.empty()) { -                etiLog.level(info) << "  source      " << edi_dest.source_addr; -                etiLog.level(info) << "  ttl         " << edi_dest.ttl; -            } -            etiLog.level(info) << "  source port " << edi_dest.source_port; -        } -        if (edi_conf.interleaver_enabled()) { -            etiLog.level(info) << " interleave     " << edi_conf.latency_frames * 24 << " ms"; -        } +        edi_conf.print();      }      else {          etiLog.level(info) << "EDI disabled"; @@ -251,7 +202,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)      edi_tagDETI.utco = metadata.utc_offset;      edi_tagDETI.seconds = metadata.edi_time; -    if (edi_conf.enabled()) { +    if (edi_sender and edi_conf.enabled()) {          // put tags *ptr, DETI and all subchannels into one TagPacket          edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);          edi_tagpacket.tag_items.push_back(&edi_tagDETI); @@ -260,58 +211,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)              edi_tagpacket.tag_items.push_back(&tag.second);          } -        // Assemble into one AF Packet -        edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - -        if (edi_conf.enable_pft) { -            // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) -            vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket); - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", -                        edi_fragments.size()); -            } - -            if (edi_conf.interleaver_enabled()) { -                edi_fragments = edi_interleaver.Interleave(edi_fragments); -            } - -            // Send over ethernet -            for (const auto& edi_frag : edi_fragments) { -                for (auto& dest : edi_conf.destinations) { -                    InetAddress addr; -                    addr.setAddress(dest.dest_addr.c_str()); -                    addr.setPort(edi_conf.dest_port); - -                    dest.socket->send(edi_frag, addr); -                } - -                if (edi_conf.dump) { -                    std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                    std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -                } -            } - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragments %zu\n", -                        edi_fragments.size()); -            } -        } -        else { -            // Send over ethernet -            for (auto& dest : edi_conf.destinations) { -                InetAddress addr; -                addr.setAddress(dest.dest_addr.c_str()); -                addr.setPort(edi_conf.dest_port); - -                dest.socket->send(edi_afpacket, addr); -            } - -            if (edi_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); -            } -        } +        edi_sender->write(edi_tagpacket);      }  } diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index 4c2af54..bb9c8bc 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -3,7 +3,7 @@     2011, 2012 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2018 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -36,9 +36,7 @@  #include "dabOutput/dabOutput.h"  #include "dabOutput/edi/TagItems.h"  #include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h" +#include "dabOutput/edi/Transport.h"  // This metadata gets transmitted in the zmq stream  struct metadata_t { @@ -55,7 +53,7 @@ class EDISender {          EDISender(const EDISender& other) = delete;          EDISender& operator=(const EDISender& other) = delete;          ~EDISender(); -        void start(const edi_configuration_t& conf, +        void start(const edi::configuration_t& conf,                  int delay_ms, bool drop_late_packets);          void push_frame(const frame_t& frame);          void print_configuration(void); @@ -68,19 +66,11 @@ class EDISender {          bool drop_late;          std::atomic<bool> running;          std::thread process_thread; -        edi_configuration_t edi_conf; +        edi::configuration_t edi_conf;          std::chrono::steady_clock::time_point startTime;          ThreadsafeQueue<frame_t> frames; -        std::ofstream edi_debug_file; -        // The TagPacket will then be placed into an AFPacket -        edi::AFPacketiser edi_afPacketiser; - -        // The AF Packet will be protected with reed-solomon and split in fragments -        edi::PFT edi_pft; - -        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order -        edi::Interleaver edi_interleaver; +        std::shared_ptr<edi::Sender> edi_sender;          // For statistics about wait time before we transmit packets,          // in microseconds diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 3364faa..ee5776e 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -3,7 +3,7 @@     2011, 2012 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2018 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -40,7 +40,7 @@  constexpr size_t MAX_ERROR_COUNT = 10;  constexpr long ZMQ_TIMEOUT_MS = 1000; -static edi_configuration_t edi_conf; +static edi::configuration_t edi_conf;  static EDISender edisender; @@ -155,7 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b  /* There is some state inside the parsing of destination arguments,   * because several destinations can be given.  */ -static edi_destination_t edi_destination; +static std::shared_ptr<edi::udp_destination_t> edi_destination;  static bool source_port_set = false;  static bool source_addr_set = false;  static bool ttl_set = false; @@ -168,9 +168,8 @@ static void add_edi_destination(void)                  std::to_string(edi_conf.destinations.size() + 1));      } -    edi_conf.destinations.push_back(edi_destination); -    edi_destination_t newdest; -    edi_destination = newdest; +    edi_conf.destinations.push_back(move(edi_destination)); +    edi_destination.reset();      source_port_set = false;      source_addr_set = false; @@ -180,33 +179,37 @@ static void add_edi_destination(void)  static void parse_destination_args(char option)  { +    if (not edi_destination) { +        edi_destination = std::make_shared<edi::udp_destination_t>(); +    } +      switch (option) {          case 's':              if (source_port_set) {                  add_edi_destination();              } -            edi_destination.source_port = std::stoi(optarg); +            edi_destination->source_port = std::stoi(optarg);              source_port_set = true;              break;          case 'S':              if (source_addr_set) {                  add_edi_destination();              } -            edi_destination.source_addr = optarg; +            edi_destination->source_addr = optarg;              source_addr_set = true;              break;          case 't':              if (ttl_set) {                  add_edi_destination();              } -            edi_destination.ttl = std::stoi(optarg); +            edi_destination->ttl = std::stoi(optarg);              ttl_set = true;              break;          case 'd':              if (dest_addr_set) {                  add_edi_destination();              } -            edi_destination.dest_addr = optarg; +            edi_destination->dest_addr = optarg;              dest_addr_set = true;              break;          default: | 
