diff options
| -rw-r--r-- | ChangeLog | 4 | ||||
| -rw-r--r-- | Makefile.am | 43 | ||||
| -rw-r--r-- | README.md | 11 | ||||
| -rw-r--r-- | doc/TIMESTAMPS.rst | 2 | ||||
| -rw-r--r-- | man/odr-dabmux.1 | 2 | ||||
| -rw-r--r-- | man/odr-zmq2edi.1 | 75 | ||||
| -rw-r--r-- | src/zmq2edi/README.md | 8 | ||||
| -rw-r--r-- | src/zmq2edi/Sender.cpp | 320 | ||||
| -rw-r--r-- | src/zmq2edi/Sender.h | 98 | ||||
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 527 | 
10 files changed, 15 insertions, 1075 deletions
| @@ -1,6 +1,10 @@  This file contains information about the changes done to  ODR-DabMux in this repository +upcoming: +	Make compatible with easydab again +	Remove odr-zmq2edi +  2024-05-05: Matthias P. Braendli <matthias@mpb.li>  	(v4.5.0):  		Switch project to C++17. diff --git a/Makefile.am b/Makefile.am index 0c730dd..be2eed3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -25,7 +25,7 @@ else  GITVERSION_FLAGS =  endif -bin_PROGRAMS=odr-dabmux zmqinput-keygen odr-zmq2edi +bin_PROGRAMS=odr-dabmux zmqinput-keygen  if HAVE_OUTPUT_RAW_TEST  bin_PROGRAMS+=odr-zmq2farsync @@ -206,46 +206,7 @@ odr_zmq2farsync_LDADD    = $(ZMQ_LIBS)  odr_zmq2farsync_CFLAGS   = -Wall $(ZMQ_CPPFLAGS) $(PTHREAD_CFLAGS) $(GITVERSION_FLAGS) $(INCLUDE)  odr_zmq2farsync_CXXFLAGS = -Wall $(PTHREAD_CFLAGS) $(PTHREAD_LIBS) $(ZMQ_CPPFLAGS) $(GITVERSION_FLAGS) $(INCLUDE) -odr_zmq2edi_SOURCES  = src/zmq2edi/zmq2edi.cpp \ -					   src/zmq2edi/Sender.h \ -					   src/zmq2edi/Sender.cpp \ -					   src/dabOutput/dabOutput.h \ -					   src/dabOutput/metadata.h \ -					   src/dabOutput/metadata.cpp \ -					   lib/edioutput/AFPacket.cpp \ -					   lib/edioutput/AFPacket.h \ -					   lib/edioutput/EDIConfig.h \ -					   lib/edioutput/PFT.cpp \ -					   lib/edioutput/PFT.h \ -					   lib/edioutput/TagItems.cpp \ -					   lib/edioutput/TagItems.h \ -					   lib/edioutput/TagPacket.cpp \ -					   lib/edioutput/TagPacket.h \ -					   lib/edioutput/Transport.cpp \ -					   lib/edioutput/Transport.h \ -					   lib/Globals.cpp \ -					   lib/Log.h \ -					   lib/Log.cpp \ -					   lib/RemoteControl.cpp \ -					   lib/RemoteControl.h \ -					   lib/Json.h \ -					   lib/Json.cpp \ -					   lib/crc.h \ -					   lib/crc.c \ -					   lib/ReedSolomon.h \ -					   lib/ReedSolomon.cpp \ -					   lib/Socket.h \ -					   lib/Socket.cpp \ -					   lib/ThreadsafeQueue.h \ -					   lib/zmq.hpp \ -					   $(lib_fec_sources) - -odr_zmq2edi_LDADD    = $(ZMQ_LIBS) $(PTHREAD_CFLAGS) $(PTHREAD_LIBS) $(BOOST_SYSTEM_LIB) -odr_zmq2edi_CFLAGS   = -Wall $(ZMQ_CPPFLAGS) $(PTHREAD_CFLAGS) $(GITVERSION_FLAGS) $(INCLUDE) -odr_zmq2edi_CXXFLAGS = -Wall $(PTHREAD_LIBS) $(ZMQ_CPPFLAGS) $(GITVERSION_FLAGS) $(INCLUDE) - -man_MANS = 	man/odr-dabmux.1 \ -		man/odr-zmq2edi.1 +man_MANS = man/odr-dabmux.1  EXTRA_DIST	= COPYING NEWS README.md INSTALL.md LICENCE AUTHORS ChangeLog TODO.md doc \  			  lib/fec/README.md lib/fec/LICENSE \ @@ -30,11 +30,7 @@ Features of ODR-DabMux:  Additional tools: -`odr-zmq2edi`, a tool that can convert a ZeroMQ ETI stream -to an EDI stream. - -`odr-zmq2farsync`, a tool that can drive a FarSync card from -a ZeroMQ ETI stream. +`odr-zmq2farsync`, a tool that can drive a FarSync card from a ZeroMQ ETI stream.  The `src/` directory contains the source code of ODR-DabMux and the additional  tools. @@ -45,6 +41,11 @@ configuration files, and the munin and xymon scripts for the statistics server.  The `lib/` directory contains source code of libraries needed to build  ODR-DabMux. +Up to v4.5, this repository also contained +`odr-zmq2edi`, a tool that can convert a ZeroMQ ETI stream to an EDI or ZMQ stream. +This was superseded by `digris-zmq-converter` in the +[digris-edi-zmq-bridge](https://github.com/digris/digris-edi-zmq-bridge) repository. +  Install  ======= diff --git a/doc/TIMESTAMPS.rst b/doc/TIMESTAMPS.rst index 7f48b58..f3d5031 100644 --- a/doc/TIMESTAMPS.rst +++ b/doc/TIMESTAMPS.rst @@ -28,3 +28,5 @@ Running ODR-DabMux against the absolute timestamp firmware has uncovered a few i  **Important** Do not combine odr-zmq2edi with odr-dabmux of a different version! +Do not combine digris-zmq-converter with odr-dabmux older than v4! + diff --git a/man/odr-dabmux.1 b/man/odr-dabmux.1 index 1dca3cf..44a58dc 100644 --- a/man/odr-dabmux.1 +++ b/man/odr-dabmux.1 @@ -19,6 +19,6 @@ The configuration of the multiplexer is given in a configuration file, whose  format is defined in the example files in the doc/ folder inside the ODR-DabMux  repository.  .SH SEE ALSO -odr\-audioenc(1), odr\-zmq2edi(1), odr\-dabmod(1) +odr\-audioenc(1), odr\-dabmod(1)  A user guide for the mmbTools is available http://www.opendigitalradio.org/ diff --git a/man/odr-zmq2edi.1 b/man/odr-zmq2edi.1 deleted file mode 100644 index d06e34b..0000000 --- a/man/odr-zmq2edi.1 +++ /dev/null @@ -1,75 +0,0 @@ -.TH ODR-ZMQ2EDI "1" "May 2024" "odr-zmq2edi" "User Commands" -.SH NAME -\fBodr-zmq2edi\fR \- Convert an ZeroMQ stream to EDI -.SH SYNOPSIS -odr\-zmq2edi [options] <source> -.PP -<source> is a ZMQ URL that points to a ODR\-DabMux ZMQ output. -.SH DESCRIPTION -zmq2edi can receive a ZMQ ETI stream from ODR-DabMux and generate an EDI or ZMQ stream. -It buffers and releases frames according to their timestamp. -This is quite useful if your modulator wants EDI input, and your network is not  -good enough making you want to use something based on TCP. -.PP -The input socket will be reset if no data is received for 10 seconds. -It is best practice to run this tool under a process supervisor that will restart it automatically. -.SH OPTIONS -.SS "The following options can be given only once:" -.TP -\fB\-w\fR <delay> -Keep every ETI frame until TIST is <delay> milliseconds after current system time. -Negative delay values are also allowed. -.TP -\fB\-C\fR <path to script> -Before starting, run the given script, and only start if it returns 0. -This is useful for checking that NTP is properly synchronised -.TP -\fB\-x\fR -Drop frames where for which the wait time would be negative, i.e. frames that arrived too late. -.TP -\fB\-P\fR -Disable PFT and send AFPackets. -.TP -\fB\-f\fR <fec> -Set the FEC. -.TP -\fB\-i\fR <spread> -Configure the UDP packet spread/interleaver with given percentage: 0% send all fragments -at once, 100% spread over 24ms, >100% spread and interleave. Default 95% -.TP -\fB\-D\fR -Dump the EDI to edi.debug file. -.TP -\fB\-v\fR -Enables verbose mode. -.TP -\fB\-a\fR <alignement> -Set the alignment of the TAG Packet (default 8). -.TP -\fB\-b\fR <backoff> -Number of milliseconds to backoff after an input reset (default 5000). -.SS -The following options can be given several times, when more than one UDP destination is desired: -.TP -\fB\-d\fR <destination ip> -Set the destination ip. -.TP -\fB\-p\fR <destination port> -Set the destination port. -.TP -\fB\-s\fR <source port> -Set the source port. -.TP -\fB\-S\fR <source ip> -Select the source IP in case we want to use multicast. -.TP -\fB\-t\fR <ttl> -Set the packet's TTL. -.TP -\fB\-Z\fR <url> -Add a zmq output to URL, e.g. tcp:*:9876 to listen for connections on port 9876 - -.SH SEE ALSO -odr\-dabmux(1), odr\-audioenc(1), odr\-dabmod(1) - -A user guide for the mmbTools is available http://www.opendigitalradio.org/ diff --git a/src/zmq2edi/README.md b/src/zmq2edi/README.md deleted file mode 100644 index e509479..0000000 --- a/src/zmq2edi/README.md +++ /dev/null @@ -1,8 +0,0 @@ -Convert an ZeroMQ stream to EDI -=============================== - -This *zmq2edi* tool can receive a ZMQ ETI stream from -ODR-DabMux and generate and EDI stream. - -Quite useful if your modulator wants EDI input, and your network is not good -enough making you want to use something based on TCP. diff --git a/src/zmq2edi/Sender.cpp b/src/zmq2edi/Sender.cpp deleted file mode 100644 index fe46846..0000000 --- a/src/zmq2edi/Sender.cpp +++ /dev/null @@ -1,320 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2024 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "Sender.h" -#include "Log.h" -#include <cmath> -#include <numeric> -#include <map> -#include <algorithm> -#include <limits> - -using namespace std; - -Sender::Sender() : -    zmq_ctx(2) -{ -} - -Sender::~Sender() -{ -    if (running.load()) { -        running.store(false); - -        // Unblock thread -        frame_t emptyframe; -        frames.push(std::move(emptyframe)); - -        process_thread.join(); -    } -} - -void Sender::start(const edi::configuration_t& conf, -        const zmq_send_config_t& zmq_conf, -        int delay_ms, bool drop_late_packets) -{ -    edi_conf = conf; -    tist_delay_ms = delay_ms; -    drop_late = drop_late_packets; - -    edi_sender = make_shared<edi::Sender>(edi_conf); - -    for (const auto& url : zmq_conf.urls) { -        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB); -        zmq_sock.bind(url.c_str()); -        zmq_sockets.emplace_back(std::move(zmq_sock)); -    } - -    running.store(true); -    process_thread = thread(&Sender::process, this); -} - -void Sender::push_frame(frame_t&& frame) -{ -    frames.push(std::move(frame)); -} - -void Sender::print_configuration() -{ -    if (edi_conf.enabled()) { -        edi_conf.print(); -    } -    else { -        etiLog.level(info) << "EDI disabled"; -    } -} - -void Sender::send_eti_frame(frame_t& frame) -{ -    uint8_t *p = frame.data.data(); - -    edi::TagDETI edi_tagDETI; -    edi::TagStarPTR edi_tagStarPtr("DETI"); -    map<int, edi::TagESTn> edi_subchannelToTag; -    // The above Tag Items will be assembled into a TAG Packet -    edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); - -    // SYNC -    edi_tagDETI.stat = p[0]; - -    // LIDATA FCT -    edi_tagDETI.dlfc = frame.metadata.dlfc; - -    const int fct = p[4]; -    if (frame.metadata.dlfc % 250 != fct) { -        etiLog.level(warn) << "Frame FCT=" << fct << -            " does not correspond to DLFC=" << frame.metadata.dlfc; -    } - -    bool ficf = (p[5] & 0x80) >> 7; -    edi_tagDETI.ficf = ficf; - -    const int nst = p[5] & 0x7F; - -    edi_tagDETI.fp = (p[6] & 0xE0) >> 5; -    const int mid = (p[6] & 0x18) >> 3; -    edi_tagDETI.mid = mid; -    //const int fl = (p[6] & 0x07) * 256 + p[7]; - -    int ficl = 0; -    if (ficf == 0) { -        etiLog.level(warn) << "Not FIC in data stream!"; -        return; -    } -    else if (mid == 3) { -        ficl = 32; -    } -    else { -        ficl = 24; -    } - -    vector<uint32_t> sad(nst); -    vector<uint32_t> stl(nst); -    // Loop over STC subchannels: -    for (int i=0; i < nst; i++) { -        // EDI stream index is 1-indexed -        const int edi_stream_id = i + 1; - -        uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2; -        sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i]; -        uint32_t tpl = (p[10+4*i] & 0xFC) >> 2; -        stl[i] = (p[10+4*i] & 0x03) * 256 + \ -                 p[11+4*i]; - -        edi::TagESTn tag_ESTn; -        tag_ESTn.id = edi_stream_id; -        tag_ESTn.scid = scid; -        tag_ESTn.sad = sad[i]; -        tag_ESTn.tpl = tpl; -        tag_ESTn.rfa = 0; // two bits -        tag_ESTn.mst_length = stl[i]; -        tag_ESTn.mst_data = nullptr; - -        edi_subchannelToTag[i] = tag_ESTn; -    } - -    uint16_t mnsc = 0; -    std::memcpy(&mnsc, p + 8 + 4*nst, sizeof(uint16_t)); -    edi_tagDETI.mnsc = mnsc; - -    /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \ -                          p[8 + 4*nst + 3]; */ - -    edi_tagDETI.fic_data = p + 12 + 4*nst; -    edi_tagDETI.fic_length = ficl * 4; - -    // loop over MSC subchannels -    int offset = 0; -    for (int i=0; i < nst; i++) { -        edi::TagESTn& tag = edi_subchannelToTag[i]; -        tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset); - -        offset += stl[i] * 8; -    } - -    /* -    const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \ -                          p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */ - -    // TIST -    const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4; -    uint32_t tist = (uint32_t)(p[tist_ix]) << 24 | -                    (uint32_t)(p[tist_ix+1]) << 16 | -                    (uint32_t)(p[tist_ix+2]) << 8 | -                    (uint32_t)(p[tist_ix+3]); - -    std::time_t posix_timestamp_1_jan_2000 = 946684800; - -    // Wait until our time is tist_delay after the TIST before -    // we release that frame - -    using namespace std::chrono; - -    const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0)); -    const auto t_frame = system_clock::from_time_t( -            frame.metadata.edi_time + posix_timestamp_1_jan_2000 - frame.metadata.utc_offset) + pps_offset; - -    const auto t_release = t_frame + milliseconds(tist_delay_ms); -    const auto t_now = system_clock::now(); - -    const bool late = t_release < t_now; - -    buffering_stat_t stat; -    stat.late = late; - -    if (not late) { -        const auto wait_time = t_release - t_now; -        std::this_thread::sleep_for(wait_time); -    } - -    stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - frame.received_at).count(); -    buffering_stats.push_back(std::move(stat)); - -    if (late and drop_late) { -        return; -    } - -    edi_tagDETI.tsta = tist; -    edi_tagDETI.atstf = 1; -    edi_tagDETI.utco = frame.metadata.utc_offset; -    edi_tagDETI.seconds = frame.metadata.edi_time; - -    if (edi_sender and edi_conf.enabled()) { -        // put tags *ptr, DETI and all subchannels into one TagPacket -        edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); -        edi_tagpacket.tag_items.push_back(&edi_tagDETI); - -        for (auto& tag : edi_subchannelToTag) { -            edi_tagpacket.tag_items.push_back(&tag.second); -        } - -        edi_sender->write(edi_tagpacket); -    } - -    if (not frame.original_zmq_message.empty()) { -        for (auto& sock : zmq_sockets) { -            const auto send_result = sock.send(frame.original_zmq_message, zmq::send_flags::dontwait); -            if (not send_result.has_value()) { -                num_zmq_send_errors++; -            } -        } -    } -} - -void Sender::process() -{ -    while (running.load()) { -        frame_t frame; -        frames.wait_and_pop(frame); - -        if (not running.load() or frame.data.empty()) { -            break; -        } - -        if (frame.data.size() == 6144) { -            send_eti_frame(frame); -        } -        else { -            etiLog.level(warn) << "Ignoring short ETI frame, " -                "DFLC=" << frame.metadata.dlfc << ", len=" << -                frame.data.size(); -        } - -        if (buffering_stats.size() == 250) { // every six seconds -            const double n = buffering_stats.size(); - -            size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(), -                    [](const buffering_stat_t& s){ return s.late; }); - -            double sum = 0.0; -            double min = std::numeric_limits<double>::max(); -            double max = -std::numeric_limits<double>::max(); -            for (const auto& s : buffering_stats) { -                // convert to milliseconds -                const double t = s.buffering_time_us / 1000.0; -                sum += t; - -                if (t < min) { -                    min = t; -                } - -                if (t > max) { -                    max = t; -                } -            } -            double mean = sum / n; - -            double sq_sum = 0; -            for (const auto& s : buffering_stats) { -                const double t = s.buffering_time_us / 1000.0; -                sq_sum += (t-mean) * (t-mean); -            } -            double stdev = sqrt(sq_sum / n); - -            /* Debug code -            stringstream ss; -            ss << "times:"; -            for (const auto t : buffering_stats) { -                ss << " " << lrint(t.buffering_time_us / 1000.0); -            } -            etiLog.level(debug) << ss.str(); -            // */ - -            etiLog.level(info) << "Buffering time statistics [milliseconds]:" -                " min: " << min << -                " max: " << max << -                " mean: " << mean << -                " stdev: " << stdev << -                " late: " << -                num_late << " of " << buffering_stats.size() << " (" << -                num_late * 100.0 / n << "%) " << -                "Num ZMQ send errors: " << num_zmq_send_errors; - -            buffering_stats.clear(); -        } -    } -} diff --git a/src/zmq2edi/Sender.h b/src/zmq2edi/Sender.h deleted file mode 100644 index 6dfd615..0000000 --- a/src/zmq2edi/Sender.h +++ /dev/null @@ -1,98 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2024 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#pragma once -#include <iostream> -#include <iterator> -#include <thread> -#include <vector> -#include <chrono> -#include <atomic> -#include "ThreadsafeQueue.h" -#include "dabOutput/dabOutput.h" -#include "edioutput/TagItems.h" -#include "edioutput/TagPacket.h" -#include "edioutput/Transport.h" - -// This metadata gets transmitted in the zmq stream -struct metadata_t { -    uint32_t edi_time = 0; -    int16_t utc_offset = 0; -    uint16_t dlfc = 0; -}; - -struct frame_t { -    // Since a zmq message actually contains 4 frames, the -    // original_zmq_msg is only non-empty for the first of the -    // four calls to Sender::send_edi_frame(). -    zmq::message_t original_zmq_message; -    std::vector<uint8_t> data; -    metadata_t metadata; -    std::chrono::steady_clock::time_point received_at; -}; - -struct zmq_send_config_t { -    std::vector<std::string> urls; -}; - -class Sender { -    public: -        Sender(); -        Sender(const Sender& other) = delete; -        Sender& operator=(const Sender& other) = delete; -        ~Sender(); -        void start(const edi::configuration_t& conf, -                const zmq_send_config_t& zmq_conf, -                int delay_ms, bool drop_late_packets); -        void push_frame(frame_t&& frame); -        void print_configuration(void); - -    private: -        void send_eti_frame(frame_t& frame); -        void process(void); - -        int tist_delay_ms; -        bool drop_late; -        std::atomic<bool> running; -        std::thread process_thread; -        edi::configuration_t edi_conf; -        ThreadsafeQueue<frame_t> frames; - -        std::shared_ptr<edi::Sender> edi_sender; - -        zmq::context_t zmq_ctx; -        std::vector<zmq::socket_t> zmq_sockets; - -        struct buffering_stat_t { -            // Time between when we received the packets and when we transmit packets, in microseconds -            double buffering_time_us = 0.0; -            bool late = false; -        }; -        std::vector<buffering_stat_t> buffering_stats; -        size_t num_zmq_send_errors = 0; - -}; diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp deleted file mode 100644 index 41d92b5..0000000 --- a/src/zmq2edi/zmq2edi.cpp +++ /dev/null @@ -1,527 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2024 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "Log.h" -#include "zmq.hpp" -#include <getopt.h> -#include <cmath> -#include <cstring> -#include <chrono> -#include <iostream> -#include <iterator> -#include <thread> -#include <vector> - -#include "Sender.h" -#include "dabOutput/dabOutput.h" - -constexpr size_t MAX_ERROR_COUNT = 10; -constexpr long ZMQ_TIMEOUT_MS = 1000; -constexpr long DEFAULT_BACKOFF = 5000; - -static edi::configuration_t edi_conf; - -static Sender edisender; - -static void usage() -{ -    using namespace std; - -    cerr << "Usage:" << endl; -    cerr << "odr-zmq2edi [options] <source>" << endl << endl; - -    cerr << "ODR-ZMQ2EDI can output to both EDI and ZMQ. It buffers and releases frames according to their timestamp." << endl; - -    cerr << "Options:" << endl; -    cerr << "The following options can be given only once:" << endl; -    cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; -    cerr << " -w <delay>            Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; -    cerr << "                       Negative delay values are also allowed." << endl; -    cerr << " -C <path to script>   Before starting, run the given script, and only start if it returns 0." << endl; -    cerr << "                       This is useful for checking that NTP is properly synchronised" << endl; -    cerr << " -x                    Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; -    cerr << " -b <backoff>          Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl; - -    cerr << " ZMQ Output options:" << endl; -    cerr << " -Z <url>              Add a zmq output to URL, e.g. tcp:*:9876 to listen for connections on port 9876 " << endl << endl; - -    cerr << " EDI Output options:" << endl; -    cerr << " -v                    Enables verbose mode." << endl; -    cerr << " -P                    Disable PFT and send AFPackets." << endl; -    cerr << " -f <fec>              Set the FEC." << endl; -    cerr << " -i <spread>           Configure the UDP packet spread/interleaver with given percentage: 0% send all fragments at once, 100% spread over 24ms, >100% spread and interleave. Default 95%\n"; -    cerr << " -D                    Dump the EDI to edi.debug file." << endl; -    cerr << " -a <alignement>       Set the alignment of the TAG Packet (default 8)." << endl; - -    cerr << "The following options can be given several times, when more than EDI/UDP destination is desired:" << endl; -    cerr << " -d <destination ip>   Set the destination ip." << endl; -    cerr << " -p <destination port> Set the destination port." << endl; -    cerr << " -s <source port>      Set the source port." << endl; -    cerr << " -S <source ip>        Select the source IP in case we want to use multicast." << endl; -    cerr << " -t <ttl>              Set the packet's TTL." << endl << endl; - -    cerr << "The input socket will be reset if no data is received for " << -        (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; -    cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl; -} - -static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes) -{ -    size_t remaining = size; -    if (remaining < 3) { -        etiLog.level(warn) << "Insufficient data to parse metadata"; -        throw std::runtime_error("Insufficient data"); -    } - -    metadata_t md; -    bool utc_offset_received = false; -    bool edi_time_received = false; -    bool dlfc_received = false; - -    while (remaining) { -        uint8_t id = buf[0]; -        uint16_t len = (((uint16_t)buf[1]) << 8) + buf[2]; - -        if (id == static_cast<uint8_t>(output_metadata_id_e::separation_marker)) { -            if (len != 0) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: separation_marker"; -            } - -            if (not utc_offset_received or not edi_time_received or not dlfc_received) { -                throw std::runtime_error("Incomplete metadata received"); -            } - -            remaining -= 3; -            *consumed_bytes = size - remaining; -            return md; -        } -        else if (id == static_cast<uint8_t>(output_metadata_id_e::utc_offset)) { -            if (len != 2) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: utc_offset"; -            } -            if (remaining < 2) { -                throw std::runtime_error("Insufficient data for utc_offset"); -            } -            uint16_t utco; -            std::memcpy(&utco, buf + 3, sizeof(utco)); -            md.utc_offset = ntohs(utco); -            utc_offset_received = true; -            remaining -= 5; -            buf += 5; -        } -        else if (id == static_cast<uint8_t>(output_metadata_id_e::edi_time)) { -            if (len != 4) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: edi_time"; -            } -            if (remaining < 4) { -                throw std::runtime_error("Insufficient data for edi_time"); -            } -            uint32_t edi_time; -            std::memcpy(&edi_time, buf + 3, sizeof(edi_time)); -            md.edi_time = ntohl(edi_time); -            edi_time_received = true; -            remaining -= 7; -            buf += 7; -        } -        else if (id == static_cast<uint8_t>(output_metadata_id_e::dlfc)) { -            if (len != 2) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: dlfc"; -            } -            if (remaining < 2) { -                throw std::runtime_error("Insufficient data for dlfc"); -            } -            uint16_t dlfc; -            std::memcpy(&dlfc, buf + 3, sizeof(dlfc)); -            md.dlfc = ntohs(dlfc); -            dlfc_received = true; -            remaining -= 5; -            buf += 5; -        } -    } - -    throw std::runtime_error("Insufficient data"); -} - -/* There is some state inside the parsing of destination arguments, - * because several destinations can be given.  */ - -static std::shared_ptr<edi::udp_destination_t> edi_destination; -static bool dest_port_set = false; -static bool source_port_set = false; -static bool source_addr_set = false; -static bool ttl_set = false; -static bool dest_addr_set = false; - -static void add_edi_destination(void) -{ -    if (not dest_addr_set) { -        throw std::runtime_error("Destination address not specified for destination number " + -                std::to_string(edi_conf.destinations.size() + 1)); -    } - -    edi_conf.destinations.push_back(std::move(edi_destination)); -    edi_destination = std::make_shared<edi::udp_destination_t>(); - -    dest_port_set = false; -    source_port_set = false; -    source_addr_set = false; -    ttl_set = false; -    dest_addr_set = false; -} - -static void parse_destination_args(char option) -{ -    if (not edi_destination) { -        edi_destination = std::make_shared<edi::udp_destination_t>(); -    } - -    switch (option) { -        case 'p': -            if (dest_port_set) { -                add_edi_destination(); -            } -            edi_destination->dest_port = std::stoi(optarg); -            dest_port_set = true; -            break; -        case 's': -            if (source_port_set) { -                add_edi_destination(); -            } -            edi_destination->source_port = std::stoi(optarg); -            source_port_set = true; -            break; -        case 'S': -            if (source_addr_set) { -                add_edi_destination(); -            } -            edi_destination->source_addr = optarg; -            source_addr_set = true; -            break; -        case 't': -            if (ttl_set) { -                add_edi_destination(); -            } -            edi_destination->ttl = std::stoi(optarg); -            ttl_set = true; -            break; -        case 'd': -            if (dest_addr_set) { -                add_edi_destination(); -            } -            edi_destination->dest_addr = optarg; -            dest_addr_set = true; -            break; -        default: -            throw std::logic_error("parse_destination_args invalid"); -    } -} - -class FCTDiscontinuity { }; - -int start(int argc, char **argv) -{ -    edi_conf.enable_pft = true; - -    if (argc == 0) { -        usage(); -        return 1; -    } - -    int delay_ms = 500; -    bool drop_late_packets = false; -    uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; -    std::string startupcheck; - -    zmq_send_config_t zmq_conf; - -    int ch = 0; -    while (ch != -1) { -        ch = getopt(argc, argv, "C:d:p:s:S:t:Pf:i:Dva:b:w:xhZ:"); -        switch (ch) { -            case -1: -                break; -            case 'C': -                startupcheck = optarg; -                break; -            case 'd': -            case 's': -            case 'S': -            case 't': -            case 'p': -                parse_destination_args(ch); -                break; -            case 'P': -                edi_conf.enable_pft = false; -                break; -            case 'f': -                edi_conf.fec = std::stoi(optarg); -                break; -            case 'i': -                { -                    int spread_percent = std::stoi(optarg); -                    if (spread_percent < 0) { -                        throw std::runtime_error("EDI output: negative spread value is invalid."); -                    } - -                    edi_conf.fragment_spreading_factor = (double)spread_percent / 100.0; -                    if (edi_conf.fragment_spreading_factor > 30000) { -                        throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); -                    } -                } -                break; -            case 'D': -                edi_conf.dump = true; -                break; -            case 'v': -                edi_conf.verbose = true; -                break; -            case 'a': -                edi_conf.tagpacket_alignment = std::stoi(optarg); -                break; -            case 'b': -                backoff_after_reset_ms = std::stoi(optarg); -                break; -            case 'w': -                delay_ms = std::stoi(optarg); -                break; -            case 'x': -                drop_late_packets = true; -                break; -            case 'Z': -                zmq_conf.urls.push_back(optarg); -                break; -            case 'h': -            default: -                usage(); -                return 1; -        } -    } - -    if (dest_addr_set) { -        add_edi_destination(); -    } - -    if (optind >= argc) { -        etiLog.level(error) << "source option is missing"; -        return 1; -    } - -    if (edi_conf.destinations.empty() and zmq_conf.urls.empty()) { -        etiLog.level(error) << "No destinations set"; -        return 1; -    } - -    if (not edi_conf.destinations.empty()) { -        edisender.print_configuration(); -    } - -    if (not zmq_conf.urls.empty()) { -        etiLog.level(info) << "Setting up ZMQ to:"; -        for (const auto& url : zmq_conf.urls) { -            etiLog.level(info) << " " << url; -        } -    } - - -    if (not startupcheck.empty()) { -        etiLog.level(info) << "Running startup check '" << startupcheck << "'"; -        int wstatus = system(startupcheck.c_str()); - -        if (WIFEXITED(wstatus)) { -            if (WEXITSTATUS(wstatus) == 0) { -                etiLog.level(info) << "Startup check ok"; -            } -            else { -                etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus); -                return 1; -            } -        } -        else { -            etiLog.level(error) << "Startup check failed, child didn't terminate normally"; -            return 1; -        } -    } - -    etiLog.level(info) << "Setting up Sender with delay " << delay_ms << " ms. " << -        (drop_late_packets ? "Will" : "Will not") << " drop late packets"; -    edisender.start(edi_conf, zmq_conf, delay_ms, drop_late_packets); - -    const char* source_url = argv[optind]; - -    zmq::context_t zmq_ctx(1); -    etiLog.level(info) << "Opening ZMQ input: " << source_url; - -    while (true) { -        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); -        zmq_sock.connect(source_url); -        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - -        size_t error_count = 0; -        int previous_fct = -1; - -        try { -            while (error_count < MAX_ERROR_COUNT) { -                zmq::message_t incoming; -                zmq::pollitem_t items[1]; -                items[0].socket = zmq_sock; -                items[0].events = ZMQ_POLLIN; -                const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -                if (num_events == 0) { // timeout -                    error_count++; -                } -                else { -                    // Event received: recv will not block -                    const auto recv_result = zmq_sock.recv(incoming, zmq::recv_flags::none); -                    if (not recv_result.has_value()) { -                        continue; -                    } - -                    const auto received_at = std::chrono::steady_clock::now(); - -                    // Casting incoming.data() to zmq_dab_message_t* is not allowed, because -                    // it might be misaligned -                    zmq_dab_message_t dab_msg; -                    memcpy(&dab_msg, incoming.data(), ZMQ_DAB_MESSAGE_HEAD_LENGTH); - -                    if (dab_msg.version != 1) { -                        etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg.version; -                        error_count++; -                    } - -                    int offset = sizeof(dab_msg.version) + -                        NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen); - -                    std::vector<frame_t> all_frames; -                    all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); - -                    for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                        if (dab_msg.buflen[i] <= 0 or dab_msg.buflen[i] > 6144) { -                            etiLog.level(error) << "ZeroMQ buffer " << i << -                                " has invalid length " << dab_msg.buflen[i]; -                            error_count++; -                        } -                        else { -                            frame_t frame; -                            frame.data.resize(6144, 0x55); -                            frame.received_at = received_at; - -                            const int framesize = dab_msg.buflen[i]; - -                            memcpy(frame.data.data(), -                                    ((uint8_t*)incoming.data()) + offset, -                                    framesize); - -                            const int fct = frame.data[4]; - -                            const int expected_fct = (previous_fct + 1) % 250; -                            if (previous_fct != -1 and expected_fct != fct) { -                                etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct; -                                throw FCTDiscontinuity(); -                            } -                            previous_fct = fct; - -                            all_frames.push_back(std::move(frame)); - -                            offset += framesize; -                        } -                    } - -                    for (auto &f : all_frames) { -                        size_t consumed_bytes = 0; - -                        f.metadata = get_md_one_frame( -                                static_cast<uint8_t*>(incoming.data()) + offset, -                                incoming.size() - offset, -                                &consumed_bytes); - -                        offset += consumed_bytes; -                    } - -                    if (not all_frames.empty()) { -                        all_frames[0].original_zmq_message = std::move(incoming); -                    } - -                    for (auto &f : all_frames) { -                        edisender.push_frame(std::move(f)); -                    } -                } -            } - -            etiLog.level(info) << "Backoff " << backoff_after_reset_ms << -                "ms due to ZMQ input (" << source_url << ") timeout"; -        } -        catch (const FCTDiscontinuity&) { -            etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity"; -        } - -        zmq_sock.close(); -        std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); -    } - -    return 0; -} - -int main(int argc, char **argv) -{ -    // Version handling is done very early to ensure nothing else but the version gets printed out -    if (argc == 2 and strcmp(argv[1], "--version") == 0) { -        fprintf(stdout, "%s\n", -#if defined(GITVERSION) -                GITVERSION -#else -                PACKAGE_VERSION -#endif -               ); -        return 0; -    } - -    etiLog.level(info) << "ZMQ2EDI converter from " << -        PACKAGE_NAME << " " << -#if defined(GITVERSION) -        GITVERSION << -#else -        PACKAGE_VERSION << -#endif -        " starting up"; - -    int ret = 1; - -    try { -        ret = start(argc, argv); -    } -    catch (const std::runtime_error &e) { -        etiLog.level(error) << "Runtime error: " << e.what(); -    } -    catch (const std::logic_error &e) { -        etiLog.level(error) << "Logic error! " << e.what(); -    } - -    // To make sure things get printed to stderr -    std::this_thread::sleep_for(std::chrono::milliseconds(300)); - -    return ret; -} | 
