diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/zmq2edi/EDISender.cpp | 330 | ||||
| -rw-r--r-- | src/zmq2edi/EDISender.h | 83 | ||||
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 275 | 
4 files changed, 432 insertions, 260 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 2e0bb9c..badc0b9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -126,6 +126,7 @@ odr_zmq2farsync_CFLAGS   = -Wall $(GITVERSION_FLAGS) -I$(FARSYNC_DIR)  odr_zmq2farsync_CXXFLAGS = -Wall -std=c++11 $(GITVERSION_FLAGS) -I$(FARSYNC_DIR)  odr_zmq2edi_SOURCES  = zmq2edi/zmq2edi.cpp \ +					   zmq2edi/EDISender.h zmq2edi/EDISender.cpp \  					   dabOutput/dabOutput.h \  					   dabOutput/metadata.h dabOutput/metadata.cpp \  					   dabOutput/edi/AFPacket.cpp dabOutput/edi/AFPacket.h \ @@ -144,7 +145,8 @@ odr_zmq2edi_SOURCES  = zmq2edi/zmq2edi.cpp \  					   Log.h Log.cpp \  					   crc.h crc.c \  					   zmq.hpp -odr_zmq2edi_LDADD    = $(ZMQ_LIBS) +odr_zmq2edi_LDADD    = $(ZMQ_LIBS) \ +					  -lpthread -lboost_thread -lboost_system  odr_zmq2edi_CFLAGS   = -Wall $(GITVERSION_FLAGS)  odr_zmq2edi_CXXFLAGS = -Wall -std=c++11 $(GITVERSION_FLAGS) diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp new file mode 100644 index 0000000..c9033e9 --- /dev/null +++ b/src/zmq2edi/EDISender.cpp @@ -0,0 +1,330 @@ +/* +   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +   2011, 2012 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2017 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <math.h> +#include "EDISender.h" +#include "Log.h" + +using namespace std; + +EDISender::~EDISender() +{ +    if (running.load()) { +        running.store(false); + +        // Unblock thread +        frame_t emptyframe; +        frames.push(emptyframe); + +        process_thread.join(); +    } +} + +void EDISender::start(const edi_configuration_t& conf, int delay_ms) +{ +    edi_conf = conf; +    tist_delay_ms = delay_ms; + +    if (edi_conf.verbose) { +        etiLog.log(info, "Setup EDI"); +    } + +    if (edi_conf.dump) { +        edi_debug_file.open("./edi.debug"); +    } + +    if (edi_conf.enabled()) { +        for (auto& edi_destination : edi_conf.destinations) { +            auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port); + +            if (not edi_destination.source_addr.empty()) { +                int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); +                if (err) { +                    throw std::runtime_error("EDI socket set source failed!"); +                } +                err = edi_output->setMulticastTTL(edi_destination.ttl); +                if (err) { +                    throw std::runtime_error("EDI socket set TTL failed!"); +                } +            } + +            edi_destination.socket = edi_output; +        } +    } + +    if (edi_conf.verbose) { +        etiLog.log(info, "EDI set up"); +    } + +    // The AF Packet will be protected with reed-solomon and split in fragments +    edi::PFT pft(edi_conf); +    edi_pft = pft; + +    if (edi_conf.interleaver_enabled()) { +        edi_interleaver.SetLatency(edi_conf.latency_frames); +    } + +    startTime = std::chrono::steady_clock::now(); +    running.store(true); +    process_thread = thread(&EDISender::process, this); +} + +void EDISender::push_frame(const frame_t& frame) +{ +    frames.push(frame); +} + +void EDISender::print_configuration() +{ +    if (edi_conf.enabled()) { +        etiLog.level(info) << "EDI"; +        etiLog.level(info) << " verbose     " << edi_conf.verbose; +        for (auto& edi_dest : edi_conf.destinations) { +            etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; +            if (not edi_dest.source_addr.empty()) { +                etiLog.level(info) << "  source      " << edi_dest.source_addr; +                etiLog.level(info) << "  ttl         " << edi_dest.ttl; +            } +            etiLog.level(info) << "  source port " << edi_dest.source_port; +        } +        if (edi_conf.interleaver_enabled()) { +            etiLog.level(info) << " interleave     " << edi_conf.latency_frames * 24 << " ms"; +        } +    } +    else { +        etiLog.level(info) << "EDI disabled"; +    } +} + +void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) +{ +    edi::TagDETI edi_tagDETI; +    edi::TagStarPTR edi_tagStarPtr; +    std::map<int, edi::TagESTn> edi_subchannelToTag; +    // The above Tag Items will be assembled into a TAG Packet +    edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); + +    // SYNC +    edi_tagDETI.stat = p[0]; + +    // LIDATA FCT +    edi_tagDETI.dlfc = metadata.dlfc; + +    const int fct = p[4]; +    if (metadata.dlfc % 250 != fct) { +        etiLog.level(warn) << "Frame FCT=" << fct << +            " does not correspond to DLFC=" << metadata.dlfc; +    } + +    etiLog.level(debug) << "tx " << fct; + +    bool ficf = (p[5] & 0x80) >> 7; +    edi_tagDETI.ficf = ficf; + +    const int nst = p[5] & 0x7F; + +    edi_tagDETI.fp = (p[6] & 0xE0) >> 5; +    const int mid = (p[6] & 0x18) >> 3; +    edi_tagDETI.mid = mid; +    //const int fl = (p[6] & 0x07) * 256 + p[7]; + +    int ficl = 0; +    if (ficf == 0) { +        etiLog.level(warn) << "Not FIC in data stream!"; +        return; +    } +    else if (mid == 3) { +        ficl = 32; +    } +    else { +        ficl = 24; +    } + +    std::vector<uint32_t> sad(nst); +    std::vector<uint32_t> stl(nst); +    // Loop over STC subchannels: +    for (int i=0; i < nst; i++) { +        // EDI stream index is 1-indexed +        const int edi_stream_id = i + 1; + +        uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2; +        sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i]; +        uint32_t tpl = (p[10+4*i] & 0xFC) >> 2; +        stl[i] = (p[10+4*i] & 0x03) * 256 + \ +                 p[11+4*i]; + +        edi::TagESTn tag_ESTn; +        tag_ESTn.id = edi_stream_id; +        tag_ESTn.scid = scid; +        tag_ESTn.sad = sad[i]; +        tag_ESTn.tpl = tpl; +        tag_ESTn.rfa = 0; // two bits +        tag_ESTn.mst_length = stl[i]; +        tag_ESTn.mst_data = nullptr; + +        edi_subchannelToTag[i] = tag_ESTn; +    } + +    const uint16_t mnsc = p[8 + 4*nst] * 256 + \ +                          p[8 + 4*nst + 1]; +    edi_tagDETI.mnsc = mnsc; + +    /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \ +                          p[8 + 4*nst + 3]; */ + +    edi_tagDETI.fic_data = p + 12 + 4*nst; +    edi_tagDETI.fic_length = ficl * 4; + +    // loop over MSC subchannels +    int offset = 0; +    for (int i=0; i < nst; i++) { +        edi::TagESTn& tag = edi_subchannelToTag[i]; +        tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset); + +        offset += stl[i] * 8; +    } + +    /* +    const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \ +                          p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */ + +    // TIST +    const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4; +    uint32_t tist = (uint32_t)(p[tist_ix]) << 24 | +                    (uint32_t)(p[tist_ix+1]) << 16 | +                    (uint32_t)(p[tist_ix+2]) << 8 | +                    (uint32_t)(p[tist_ix+3]); + +    std::time_t posix_timestamp_1_jan_2000 = 946684800; + +    // Wait until our time is tist_delay after the TIST before +    // we release that frame + +    const auto seconds = metadata.edi_time; +    const auto pps_offset = std::chrono::milliseconds( +            std::lrint((tist & 0xFFFFFF) / 16384.0)); +    const auto t_frame = std::chrono::system_clock::from_time_t( +            seconds + posix_timestamp_1_jan_2000) + pps_offset; +    const auto t_release = t_frame + std::chrono::milliseconds(tist_delay_ms); +    const auto t_now = chrono::system_clock::now(); +    const auto wait_time = t_release - t_now; +    const auto duration_0 = std::chrono::milliseconds(0); + +    /* +    etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000; +    etiLog.level(debug) << "now " << chrono::system_clock::to_time_t(t_now); +    etiLog.level(debug) << "wait " << wait_time.count(); +    */ + +    if (wait_time > duration_0) { +        std::this_thread::sleep_for(wait_time); +    } + + +    edi_tagDETI.tsta = tist; +    edi_tagDETI.atstf = 1; +    edi_tagDETI.utco = metadata.utc_offset; +    edi_tagDETI.seconds = metadata.edi_time; + +    if (edi_conf.enabled()) { +        // put tags *ptr, DETI and all subchannels into one TagPacket +        edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); +        edi_tagpacket.tag_items.push_back(&edi_tagDETI); + +        for (auto& tag : edi_subchannelToTag) { +            edi_tagpacket.tag_items.push_back(&tag.second); +        } + +        // Assemble into one AF Packet +        edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); + +        if (edi_conf.enable_pft) { +            // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) +            std::vector<edi::PFTFragment> edi_fragments = +                edi_pft.Assemble(edi_afpacket); + +            if (edi_conf.verbose) { +                fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", +                        edi_fragments.size()); +            } + +            if (edi_conf.interleaver_enabled()) { +                edi_fragments = edi_interleaver.Interleave(edi_fragments); +            } + +            // Send over ethernet +            for (const auto& edi_frag : edi_fragments) { +                for (auto& dest : edi_conf.destinations) { +                    InetAddress addr; +                    addr.setAddress(dest.dest_addr.c_str()); +                    addr.setPort(edi_conf.dest_port); + +                    dest.socket->send(edi_frag, addr); +                } + +                if (edi_conf.dump) { +                    std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                    std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +                } +            } + +            if (edi_conf.verbose) { +                fprintf(stderr, "EDI number of PFT fragments %zu\n", +                        edi_fragments.size()); +            } +        } +        else { +            // Send over ethernet +            for (auto& dest : edi_conf.destinations) { +                InetAddress addr; +                addr.setAddress(dest.dest_addr.c_str()); +                addr.setPort(edi_conf.dest_port); + +                dest.socket->send(edi_afpacket, addr); +            } + +            if (edi_conf.dump) { +                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); +            } +        } +    } +} + +void EDISender::process() +{ +    while (running.load()) { +        frame_t frame; +        frames.wait_and_pop(frame); + +        if (not running.load()) { +            break; +        } + +        send_eti_frame(frame.first.data(), frame.second); +    } +} diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h new file mode 100644 index 0000000..c269652 --- /dev/null +++ b/src/zmq2edi/EDISender.h @@ -0,0 +1,83 @@ +/* +   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +   2011, 2012 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2017 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once +#include <iostream> +#include <iterator> +#include <thread> +#include <vector> +#include <chrono> +#include <atomic> +#include "ThreadsafeQueue.h" +#include "dabOutput/dabOutput.h" +#include "dabOutput/edi/TagItems.h" +#include "dabOutput/edi/TagPacket.h" +#include "dabOutput/edi/AFPacket.h" +#include "dabOutput/edi/PFT.h" +#include "dabOutput/edi/Interleaver.h" + +// This metadata gets transmitted in the zmq stream +struct metadata_t { +    uint32_t edi_time; +    int16_t utc_offset; +    uint16_t dlfc; +}; + +using frame_t = std::pair<std::vector<uint8_t>, metadata_t>; + +class EDISender { +    public: +        EDISender() = default; +        EDISender(const EDISender& other) = delete; +        EDISender& operator=(const EDISender& other) = delete; +        ~EDISender(); +        void start(const edi_configuration_t& conf, int delay_ms); +        void push_frame(const frame_t& frame); +        void print_configuration(void); + +    private: +        void send_eti_frame(uint8_t* p, metadata_t metadata); +        void process(void); + +        int tist_delay_ms; +        std::atomic<bool> running; +        std::thread process_thread; +        edi_configuration_t edi_conf; +        std::chrono::steady_clock::time_point startTime; +        ThreadsafeQueue<frame_t> frames; +        std::ofstream edi_debug_file; + +        // The TagPacket will then be placed into an AFPacket +        edi::AFPacketiser edi_afPacketiser; + +        // The AF Packet will be protected with reed-solomon and split in fragments +        edi::PFT edi_pft; + +        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order +        edi::Interleaver edi_interleaver; + +}; diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 1597f87..a915363 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -25,40 +25,21 @@     along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h"  #include "Log.h"  #include "zmq.hpp" -#include "math.h" +#include <math.h>  #include <getopt.h>  #include <string.h>  #include <iostream>  #include <iterator>  #include <vector> -static edi_configuration_t edi_conf; - -static std::ofstream edi_debug_file; - -// The TagPacket will then be placed into an AFPacket -static edi::AFPacketiser edi_afPacketiser; - -// The AF Packet will be protected with reed-solomon and split in fragments -static edi::PFT edi_pft; +#include "EDISender.h" +#include "dabOutput/dabOutput.h" -// To mitigate for burst packet loss, PFT fragments can be sent out-of-order -static edi::Interleaver edi_interleaver; +static edi_configuration_t edi_conf; -// This metadata gets transmitted in the zmq stream -struct metadata_t { -    uint32_t edi_time; -    int16_t utc_offset; -    uint16_t dlfc; -}; +static EDISender edisender;  void usage(void)  { @@ -69,6 +50,7 @@ void usage(void)      cerr << "Where the options are:" << endl;      cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; +    cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds before current system time." << endl;      cerr << " -d <destination ip> sets the destination ip." << endl;      cerr << " -p <destination port> sets the destination port." << endl;      cerr << " -s <source port> sets the source port." << endl; @@ -82,69 +64,6 @@ void usage(void)      cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl;  } -static void edi_setup(void) { -    if (edi_conf.verbose) { -        etiLog.log(info, "Setup EDI"); -    } - -    if (edi_conf.dump) { -        edi_debug_file.open("./edi.debug"); -    } - -    if (edi_conf.enabled()) { -        for (auto& edi_destination : edi_conf.destinations) { -            auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port); - -            if (not edi_destination.source_addr.empty()) { -                int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); -                if (err) { -                    throw std::runtime_error("EDI socket set source failed!"); -                } -                err = edi_output->setMulticastTTL(edi_destination.ttl); -                if (err) { -                    throw std::runtime_error("EDI socket set TTL failed!"); -                } -            } - -            edi_destination.socket = edi_output; -        } -    } - -    if (edi_conf.verbose) { -        etiLog.log(info, "EDI set up"); -    } - -    // The AF Packet will be protected with reed-solomon and split in fragments -    edi::PFT pft(edi_conf); -    edi_pft = pft; - -    if (edi_conf.interleaver_enabled()) { -        edi_interleaver.SetLatency(edi_conf.latency_frames); -    } -} - -static void print_edi_conf(void) -{ -    if (edi_conf.enabled()) { -        etiLog.level(info) << "EDI"; -        etiLog.level(info) << " verbose     " << edi_conf.verbose; -        for (auto& edi_dest : edi_conf.destinations) { -            etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; -            if (not edi_dest.source_addr.empty()) { -                etiLog.level(info) << "  source      " << edi_dest.source_addr; -                etiLog.level(info) << "  ttl         " << edi_dest.ttl; -            } -            etiLog.level(info) << "  source port " << edi_dest.source_port; -        } -        if (edi_conf.interleaver_enabled()) { -            etiLog.level(info) << " interleave     " << edi_conf.latency_frames * 24 << " ms"; -        } -    } -    else { -        etiLog.level(info) << "EDI disabled"; -    } -} -  static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes)  {      size_t remaining = size; @@ -222,171 +141,6 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b      throw std::runtime_error("Insufficient data");  } -static void send_eti_frame(uint8_t* p, metadata_t metadata) -{ -    edi::TagDETI edi_tagDETI; -    edi::TagStarPTR edi_tagStarPtr; -    std::map<int, edi::TagESTn> edi_subchannelToTag; -    // The above Tag Items will be assembled into a TAG Packet -    edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); - -    // SYNC -    edi_tagDETI.stat = p[0]; - -    // LIDATA FCT -    edi_tagDETI.dlfc = metadata.dlfc; - -    const int fct = p[4]; -    if (metadata.dlfc % 250 != fct) { -        etiLog.level(warn) << "Frame FCT=" << fct << " does not correspond to DLFC=" << metadata.dlfc; -    } - -    bool ficf = (p[5] & 0x80) >> 7; -    edi_tagDETI.ficf = ficf; - -    const int nst = p[5] & 0x7F; - -    edi_tagDETI.fp = (p[6] & 0xE0) >> 5; -    const int mid = (p[6] & 0x18) >> 3; -    edi_tagDETI.mid = mid; -    //const int fl = (p[6] & 0x07) * 256 + p[7]; - -    int ficl = 0; -    if (ficf == 0) { -        etiLog.level(warn) << "Not FIC in data stream!"; -        return; -    } -    else if (mid == 3) { -        ficl = 32; -    } -    else { -        ficl = 24; -    } - -    std::vector<uint32_t> sad(nst); -    std::vector<uint32_t> stl(nst); -    // Loop over STC subchannels: -    for (int i=0; i < nst; i++) { -        // EDI stream index is 1-indexed -        const int edi_stream_id = i + 1; - -        uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2; -        sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i]; -        uint32_t tpl = (p[10+4*i] & 0xFC) >> 2; -        stl[i] = (p[10+4*i] & 0x03) * 256 + \ -                 p[11+4*i]; - -        edi::TagESTn tag_ESTn; -        tag_ESTn.id = edi_stream_id; -        tag_ESTn.scid = scid; -        tag_ESTn.sad = sad[i]; -        tag_ESTn.tpl = tpl; -        tag_ESTn.rfa = 0; // two bits -        tag_ESTn.mst_length = stl[i]; -        tag_ESTn.mst_data = nullptr; - -        edi_subchannelToTag[i] = tag_ESTn; -    } - -    const uint16_t mnsc = p[8 + 4*nst] * 256 + \ -                          p[8 + 4*nst + 1]; -    edi_tagDETI.mnsc = mnsc; - -    /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \ -                          p[8 + 4*nst + 3]; */ - -    edi_tagDETI.fic_data = p + 12 + 4*nst; -    edi_tagDETI.fic_length = ficl * 4; - -    // loop over MSC subchannels -    int offset = 0; -    for (int i=0; i < nst; i++) { -        edi::TagESTn& tag = edi_subchannelToTag[i]; -        tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset); - -        offset += stl[i] * 8; -    } - -    /* -    const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \ -                          p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */ - -    // TIST -    const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4; -    uint32_t tist = (uint32_t)(p[tist_ix]) << 24 | -                    (uint32_t)(p[tist_ix+1]) << 16 | -                    (uint32_t)(p[tist_ix+2]) << 8 | -                    (uint32_t)(p[tist_ix+3]); - -    edi_tagDETI.tsta = tist; -    edi_tagDETI.atstf = 1; -    edi_tagDETI.utco = metadata.utc_offset; -    edi_tagDETI.seconds = metadata.edi_time; - -    if (edi_conf.enabled()) { -        // put tags *ptr, DETI and all subchannels into one TagPacket -        edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); -        edi_tagpacket.tag_items.push_back(&edi_tagDETI); - -        for (auto& tag : edi_subchannelToTag) { -            edi_tagpacket.tag_items.push_back(&tag.second); -        } - -        // Assemble into one AF Packet -        edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - -        if (edi_conf.enable_pft) { -            // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) -            std::vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket); - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", -                        edi_fragments.size()); -            } - -            if (edi_conf.interleaver_enabled()) { -                edi_fragments = edi_interleaver.Interleave(edi_fragments); -            } - -            // Send over ethernet -            for (const auto& edi_frag : edi_fragments) { -                for (auto& dest : edi_conf.destinations) { -                    InetAddress addr; -                    addr.setAddress(dest.dest_addr.c_str()); -                    addr.setPort(edi_conf.dest_port); - -                    dest.socket->send(edi_frag, addr); -                } - -                if (edi_conf.dump) { -                    std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                    std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -                } -            } - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragments %zu\n", -                        edi_fragments.size()); -            } -        } -        else { -            // Send over ethernet -            for (auto& dest : edi_conf.destinations) { -                InetAddress addr; -                addr.setAddress(dest.dest_addr.c_str()); -                addr.setPort(edi_conf.dest_port); - -                dest.socket->send(edi_afpacket, addr); -            } - -            if (edi_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); -            } -        } -    } -} -  int start(int argc, char **argv)  {      edi_destination_t edi_destination; @@ -398,9 +152,11 @@ int start(int argc, char **argv)          return 1;      } +    int delay_ms = 500; +      char ch = 0;      while (ch != -1) { -        ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:"); +        ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:");          switch (ch) {              case -1:                  break; @@ -451,11 +207,11 @@ int start(int argc, char **argv)              case 'a':                  edi_conf.tagpacket_alignment = std::stoi(optarg);                  break; +            case 'w': +                delay_ms = std::stoi(optarg); +                break;              case 'h': -                usage(); -                return 1;              default: -                etiLog.log(error, "Option '%c' not understood", ch);                  usage();                  return 1;          } @@ -468,8 +224,9 @@ int start(int argc, char **argv)      edi_conf.destinations.push_back(edi_destination); -    print_edi_conf(); -    edi_setup(); +    etiLog.level(info) << "Setting up EDI Sender withe delay " << delay_ms << " ms"; +    edisender.start(edi_conf, delay_ms); +    edisender.print_configuration();      const char* source_url = argv[optind]; @@ -539,7 +296,7 @@ int start(int argc, char **argv)          }          for (auto &f : all_frames) { -            send_eti_frame(f.first.data(), f.second); +            edisender.push_frame(f);              frame_count++;          }  | 
