diff options
| -rw-r--r-- | doc/advanced.mux | 4 | ||||
| -rw-r--r-- | doc/example.mux | 6 | ||||
| -rw-r--r-- | lib/edioutput/EDIConfig.h | 3 | ||||
| -rw-r--r-- | lib/edioutput/Transport.cpp | 32 | ||||
| -rw-r--r-- | lib/edioutput/Transport.h | 7 | ||||
| -rw-r--r-- | lib/fec/decode_rs.h | 12 | ||||
| -rw-r--r-- | src/DabMultiplexer.cpp | 2 | ||||
| -rw-r--r-- | src/DabMux.cpp | 13 | ||||
| -rw-r--r-- | src/input/Edi.cpp | 7 | ||||
| -rw-r--r-- | src/zmq2edi/EDISender.cpp | 390 | ||||
| -rw-r--r-- | src/zmq2edi/EDISender.h | 91 | ||||
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 419 | 
12 files changed, 58 insertions, 928 deletions
| diff --git a/doc/advanced.mux b/doc/advanced.mux index 0fc1b53..d2cc0fd 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -36,8 +36,8 @@ general {      ; number in seconds. Granularity: 24ms      ; tist_offset 0.480 -    ; Specify the TIST value for the frame with FCT==0, in microseconds -    ; tist_at_fct0 768000 +    ; Specify the TIST value for the frame with FCT==0, in milliseconds +    ; tist_at_fct0 768      ; The management server is a simple TCP server that can present      ; statistics data (buffers, overruns, underruns, etc) diff --git a/doc/example.mux b/doc/example.mux index 34cd2ee..ae12fb2 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -211,11 +211,13 @@ subchannels {          inputuri "tcp://0.0.0.0:9001"          ; For UDP, PFT should be enabled at the sender. -        ; Unicast UDP input: +        ; Unicast UDP input, bound to all interfaces:          ;inputuri "udp://:9001" +        ; Unicast UDP input, bound to interface with IP 192.168.0.10: +        ;inputuri "udp://192.168.0.10:9001"          ; Multicast UDP input:          ;inputuri "udp://@239.10.0.1:9001" -        ; Multicast UDP input with local interface (192.168.0.10) specification +        ; Multicast UDP input with local interface 192.168.0.10 specification          ;inputuri "udp://192.168.0.10@239.10.0.1:9001"          ; Two buffer-management types are available: prebuffering and timestamped. diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 7016e87..de4217f 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -27,6 +27,7 @@  #pragma once +#include <optional>  #include <vector>  #include <string>  #include <memory> @@ -60,7 +61,7 @@ struct udp_destination_t : public destination_t {      uint16_t dest_port = 0;      std::string source_addr;      uint16_t source_port = 0; -    uint8_t ttl = 10; +    std::optional<uint8_t> ttl = std::nullopt;  };  // TCP server that can accept multiple connections diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index e9559b5..3898213 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -41,10 +41,15 @@ void configuration_t::print() const          if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {              etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port;              if (not udp_dest->source_addr.empty()) { -                etiLog.level(info) << "  source      " << udp_dest->source_addr; -                etiLog.level(info) << "  ttl         " << udp_dest->ttl; +                etiLog.level(info) << "  source address=" << udp_dest->source_addr;              } -            etiLog.level(info) << "  source port " << udp_dest->source_port; +            if (udp_dest->ttl) { +                etiLog.level(info) << "  ttl=" << (int)(*udp_dest->ttl); +            } +            else { +                etiLog.level(info) << "  ttl=(default)"; +            } +            etiLog.level(info) << "  source port=" << udp_dest->source_port;          }          else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {              etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; @@ -80,7 +85,10 @@ Sender::Sender(const configuration_t& conf) :              if (not udp_dest->source_addr.empty()) {                  udp_socket.setMulticastSource(udp_dest->source_addr.c_str()); -                udp_socket.setMulticastTTL(udp_dest->ttl); +            } + +            if (udp_dest->ttl) { +                udp_socket.setMulticastTTL(*udp_dest->ttl);              }              auto sender = make_shared<udp_sender_t>( @@ -99,7 +107,7 @@ Sender::Sender(const configuration_t& conf) :                      make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));          }          else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { -            auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port); +            auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port, m_conf.verbose);              m_pft_spreaders.emplace_back(                      make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));          } @@ -199,7 +207,13 @@ void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame)  void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame)  { -    sock.sendall(frame); +    const auto error_stats = sock.sendall(frame); + +    if (verbose and error_stats.has_seen_new_errors) { +        etiLog.level(warn) << "TCP output " << dest_addr << ":" << dest_port +                          << " has " << error_stats.num_reconnects +                          << " reconnects: most recent error: " << error_stats.last_error; +    }  }  Sender::udp_sender_t::udp_sender_t(std::string dest_addr, @@ -221,7 +235,11 @@ Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,  }  Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr, -                                             uint16_t dest_port) : +                                             uint16_t dest_port, +                                             bool verbose) : +    dest_addr(dest_addr), +    dest_port(dest_port), +    verbose(verbose),      sock(dest_addr, dest_port)  {  } diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index b8a9008..96784c0 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -118,8 +118,13 @@ class Sender {          struct tcp_send_client_t : public i_sender {              tcp_send_client_t(                      const std::string& dest_addr, -                    uint16_t dest_port); +                    uint16_t dest_port, +                    bool verbose); +            std::string dest_addr; +            uint16_t dest_port; +            bool verbose; +            size_t m_num_reconnects_prev = 0;              Socket::TCPSendClient sock;              virtual void send_packet(const std::vector<uint8_t> &frame) override;          }; diff --git a/lib/fec/decode_rs.h b/lib/fec/decode_rs.h index c165cf3..647b885 100644 --- a/lib/fec/decode_rs.h +++ b/lib/fec/decode_rs.h @@ -145,15 +145,15 @@        count++;      }      if (count != no_eras) { -      printf("count = %d no_eras = %d\n lambda(x) is WRONG\n",count,no_eras); +      fprintf(stderr, "count = %d no_eras = %d\n lambda(x) is WRONG\n",count,no_eras);        count = -1;        goto finish;      }  #if DEBUG >= 2 -    printf("\n Erasure positions as determined by roots of Eras Loc Poly:\n"); +    fprintf(stderr, "\n Erasure positions as determined by roots of Eras Loc Poly:\n");      for (i = 0; i < count; i++) -      printf("%d ", loc[i]); -    printf("\n"); +      fprintf(stderr, "%d ", loc[i]); +    fprintf(stderr, "\n");  #endif  #endif    } @@ -227,7 +227,7 @@        continue; /* Not a root */      /* store root (index-form) and error location number */  #if DEBUG>=2 -    printf("count %d root %d loc %d\n",count,i,k); +    fprintf(stderr, "count %d root %d loc %d\n",count,i,k);  #endif      root[count] = i;      loc[count] = k; @@ -279,7 +279,7 @@      }  #if DEBUG >= 1      if (den == 0) { -      printf("\n ERROR: denominator = 0\n"); +      fprintf(stderr, "\n ERROR: denominator = 0\n");        count = -1;        goto finish;      } diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 52f053a..bea82c2 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -77,8 +77,6 @@ uint64_t MuxTime::init(uint32_t tist_at_fct0_ms, double tist_offset)      const auto counter_offset = tist_at_fct0_ms / 24;      const auto offset_as_count = m_pps_offset_ms / 24; -    etiLog.level(debug) << "Init " << counter_offset << " " << offset_as_count; -      return (250 - counter_offset + offset_as_count) % 250;  } diff --git a/src/DabMux.cpp b/src/DabMux.cpp index bf525c1..4b9352f 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -352,10 +352,9 @@ int main(int argc, char *argv[])                      pft_settings.enable_pft = pt.get<bool>("enable_pft", default_enable_pft);                      pft_settings.fec = pt.get<unsigned int>("fec", default_fec);                      pft_settings.fragment_spreading_factor = default_spreading_factor; -                    auto override_spread_percent = pt.get_optional<int>("packet_spread"); -                    if (override_spread_percent) { +                    if (auto override_spread_percent = pt.get_optional<int>("packet_spread"))                          pft_settings.fragment_spreading_factor = check_spreading_factor(*override_spread_percent); -                    } +                      pft_settings.verbose = pt.get<bool>("verbose", edi_conf.verbose);                  }; @@ -364,12 +363,12 @@ int main(int argc, char *argv[])                      if (proto == "udp") {                          auto dest = make_shared<edi::udp_destination_t>();                          dest->dest_addr   = pt_edi_dest.second.get<string>("destination"); -                        dest->ttl         = pt_edi_dest.second.get<unsigned int>("ttl", 1); +                        if (auto ttl = pt_edi_dest.second.get_optional<unsigned int>("ttl")) +                            dest->ttl = *ttl;                          dest->source_addr = pt_edi_dest.second.get<string>("source", ""); -                        dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport"); - -                        dest->dest_port       = pt_edi_dest.second.get<unsigned int>("port", 0); +                        dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport", 0); +                        dest->dest_port   = pt_edi_dest.second.get<unsigned int>("port", 0);                          if (dest->dest_port == 0) {                              // Compatiblity: we have removed the transport and addressing in the                              // PFT layer, which removed the requirement that all outputs must share diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 141641f..b100f32 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -80,6 +80,7 @@ Edi::~Edi() {  void Edi::open(const std::string& name)  {      const std::regex re_udp("udp://:([0-9]+)"); +    const std::regex re_udp_bindto("udp://([^:]+):([0-9]+)");      const std::regex re_udp_multicast("udp://@([0-9.]+):([0-9]+)");      const std::regex re_udp_multicast_bindto("udp://([0-9.])+@([0-9.]+):([0-9]+)");      const std::regex re_tcp("tcp://(.*):([0-9]+)"); @@ -98,6 +99,12 @@ void Edi::open(const std::string& name)          m_udp_sock.reinit(udp_port);          m_udp_sock.setBlocking(false);      } +    else if (std::regex_match(name, m, re_udp_bindto)) { +        const int udp_port = std::stoi(m[2].str()); +        m_input_used = InputUsed::UDP; +        m_udp_sock.reinit(udp_port, m[1].str()); +        m_udp_sock.setBlocking(false); +    }      else if (std::regex_match(name, m, re_udp_multicast_bindto)) {          const string bind_to = m[1].str();          const string multicast_address = m[2].str(); diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp deleted file mode 100644 index 06b7420..0000000 --- a/src/zmq2edi/EDISender.cpp +++ /dev/null @@ -1,390 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2018 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "EDISender.h" -#include "Log.h" -#include <cmath> -#include <numeric> -#include <map> -#include <algorithm> - -using namespace std; - -EDISender::~EDISender() -{ -    if (running.load()) { -        running.store(false); - -        // Unblock thread -        frame_t emptyframe; -        frames.push(emptyframe); - -        process_thread.join(); -    } -} - -void EDISender::start(const edi_configuration_t& conf, -        int delay_ms, int max_delay_ms) -{ -    edi_conf = conf; -    tist_delay_ms = delay_ms; -    tist_max_delay_ms = max_delay_ms; - -    if (edi_conf.verbose) { -        etiLog.log(info, "Setup EDI"); -    } - -    if (edi_conf.dump) { -        edi_debug_file.open("./edi.debug"); -    } - -    if (edi_conf.enabled()) { -        for (auto& edi_destination : edi_conf.destinations) { -            auto edi_output = make_shared<UdpSocket>(edi_destination.source_port); - -            if (not edi_destination.source_addr.empty()) { -                int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); -                if (err) { -                    throw runtime_error("EDI socket set source failed!"); -                } -                err = edi_output->setMulticastTTL(edi_destination.ttl); -                if (err) { -                    throw runtime_error("EDI socket set TTL failed!"); -                } -            } - -            edi_destination.socket = edi_output; -        } -    } - -    if (edi_conf.verbose) { -        etiLog.log(info, "EDI set up"); -    } - -    // The AF Packet will be protected with reed-solomon and split in fragments -    edi::PFT pft(edi_conf); -    edi_pft = pft; - -    if (edi_conf.interleaver_enabled()) { -        edi_interleaver.SetLatency(edi_conf.latency_frames); -    } - -    startTime = std::chrono::steady_clock::now(); -    running.store(true); -    process_thread = thread(&EDISender::process, this); -} - -void EDISender::push_frame(const frame_t& frame) -{ -    frames.push(frame); -} - -void EDISender::print_configuration() -{ -    if (edi_conf.enabled()) { -        etiLog.level(info) << "EDI"; -        etiLog.level(info) << " verbose     " << edi_conf.verbose; -        for (auto& edi_dest : edi_conf.destinations) { -            etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port; -            if (not edi_dest.source_addr.empty()) { -                etiLog.level(info) << "  source      " << edi_dest.source_addr; -                etiLog.level(info) << "  ttl         " << edi_dest.ttl; -            } -            etiLog.level(info) << "  source port " << edi_dest.source_port; -        } -        if (edi_conf.interleaver_enabled()) { -            etiLog.level(info) << " interleave     " << edi_conf.latency_frames * 24 << " ms"; -        } -    } -    else { -        etiLog.level(info) << "EDI disabled"; -    } -} - -void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) -{ -    edi::TagDETI edi_tagDETI; -    edi::TagStarPTR edi_tagStarPtr; -    map<int, edi::TagESTn> edi_subchannelToTag; -    // The above Tag Items will be assembled into a TAG Packet -    edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); - -    // SYNC -    edi_tagDETI.stat = p[0]; - -    // LIDATA FCT -    edi_tagDETI.dlfc = metadata.dlfc; - -    const int fct = p[4]; -    if (metadata.dlfc % 250 != fct) { -        etiLog.level(warn) << "Frame FCT=" << fct << -            " does not correspond to DLFC=" << metadata.dlfc; -    } - -    bool ficf = (p[5] & 0x80) >> 7; -    edi_tagDETI.ficf = ficf; - -    const int nst = p[5] & 0x7F; - -    edi_tagDETI.fp = (p[6] & 0xE0) >> 5; -    const int mid = (p[6] & 0x18) >> 3; -    edi_tagDETI.mid = mid; -    //const int fl = (p[6] & 0x07) * 256 + p[7]; - -    int ficl = 0; -    if (ficf == 0) { -        etiLog.level(warn) << "Not FIC in data stream!"; -        return; -    } -    else if (mid == 3) { -        ficl = 32; -    } -    else { -        ficl = 24; -    } - -    vector<uint32_t> sad(nst); -    vector<uint32_t> stl(nst); -    // Loop over STC subchannels: -    for (int i=0; i < nst; i++) { -        // EDI stream index is 1-indexed -        const int edi_stream_id = i + 1; - -        uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2; -        sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i]; -        uint32_t tpl = (p[10+4*i] & 0xFC) >> 2; -        stl[i] = (p[10+4*i] & 0x03) * 256 + \ -                 p[11+4*i]; - -        edi::TagESTn tag_ESTn; -        tag_ESTn.id = edi_stream_id; -        tag_ESTn.scid = scid; -        tag_ESTn.sad = sad[i]; -        tag_ESTn.tpl = tpl; -        tag_ESTn.rfa = 0; // two bits -        tag_ESTn.mst_length = stl[i]; -        tag_ESTn.mst_data = nullptr; - -        edi_subchannelToTag[i] = tag_ESTn; -    } - -    const uint16_t mnsc = p[8 + 4*nst] * 256 + \ -                          p[8 + 4*nst + 1]; -    edi_tagDETI.mnsc = mnsc; - -    /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \ -                          p[8 + 4*nst + 3]; */ - -    edi_tagDETI.fic_data = p + 12 + 4*nst; -    edi_tagDETI.fic_length = ficl * 4; - -    // loop over MSC subchannels -    int offset = 0; -    for (int i=0; i < nst; i++) { -        edi::TagESTn& tag = edi_subchannelToTag[i]; -        tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset); - -        offset += stl[i] * 8; -    } - -    /* -    const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \ -                          p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */ - -    // TIST -    const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4; -    uint32_t tist = (uint32_t)(p[tist_ix]) << 24 | -                    (uint32_t)(p[tist_ix+1]) << 16 | -                    (uint32_t)(p[tist_ix+2]) << 8 | -                    (uint32_t)(p[tist_ix+3]); - -    std::time_t posix_timestamp_1_jan_2000 = 946684800; - -    // Wait until our time is tist_delay after the TIST before -    // we release that frame - -    using namespace std::chrono; - -    const auto seconds = metadata.edi_time; -    const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0)); -    const auto t_frame = system_clock::from_time_t( -            seconds + posix_timestamp_1_jan_2000) + pps_offset; - -    const auto t_release = t_frame + milliseconds(tist_delay_ms); -    const auto t_now = system_clock::now(); - -    /* -    etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000; -    etiLog.level(debug) << "now " << system_clock::to_time_t(t_now); -    etiLog.level(debug) << "wait " << wait_time.count(); -    */ - -    const auto wait_time = t_release - t_now; -    wait_times.push_back(duration_cast<microseconds>(wait_time).count()); - -    if (tist_max_delay_ms > 0) { -        const auto t_latest_release = t_frame + milliseconds(tist_max_delay_ms); - -        if (t_now > t_latest_release) { -            // drop frame -            num_dropped.fetch_add(1); -            return; -        } -    } - -    if (t_release > t_now) { -        std::this_thread::sleep_for(wait_time); -    } - -    edi_tagDETI.tsta = tist; -    edi_tagDETI.atstf = 1; -    edi_tagDETI.utco = metadata.utc_offset; -    edi_tagDETI.seconds = metadata.edi_time; - -    if (edi_conf.enabled()) { -        // put tags *ptr, DETI and all subchannels into one TagPacket -        edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); -        edi_tagpacket.tag_items.push_back(&edi_tagDETI); - -        for (auto& tag : edi_subchannelToTag) { -            edi_tagpacket.tag_items.push_back(&tag.second); -        } - -        // Assemble into one AF Packet -        edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket); - -        if (edi_conf.enable_pft) { -            // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) -            vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket); - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", -                        edi_fragments.size()); -            } - -            if (edi_conf.interleaver_enabled()) { -                edi_fragments = edi_interleaver.Interleave(edi_fragments); -            } - -            // Send over ethernet -            for (const auto& edi_frag : edi_fragments) { -                for (auto& dest : edi_conf.destinations) { -                    InetAddress addr; -                    addr.setAddress(dest.dest_addr.c_str()); -                    addr.setPort(edi_conf.dest_port); - -                    dest.socket->send(edi_frag, addr); -                } - -                if (edi_conf.dump) { -                    std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                    std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -                } -            } - -            if (edi_conf.verbose) { -                fprintf(stderr, "EDI number of PFT fragments %zu\n", -                        edi_fragments.size()); -            } -        } -        else { -            // Send over ethernet -            for (auto& dest : edi_conf.destinations) { -                InetAddress addr; -                addr.setAddress(dest.dest_addr.c_str()); -                addr.setPort(edi_conf.dest_port); - -                dest.socket->send(edi_afpacket, addr); -            } - -            if (edi_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator); -            } -        } -    } -} - -void EDISender::process() -{ -    while (running.load()) { -        frame_t frame; -        frames.wait_and_pop(frame); - -        if (not running.load() or frame.first.empty()) { -            break; -        } - -        if (frame.first.size() == 6144) { -            send_eti_frame(frame.first.data(), frame.second); -        } -        else { -            etiLog.level(warn) << "Ignoring short ETI frame, " -                "DFLC=" << frame.second.dlfc << ", len=" << -                frame.first.size(); -        } - -        if (wait_times.size() == 250) { // every six seconds -            const double n = wait_times.size(); - -            double sum = accumulate(wait_times.begin(), wait_times.end(), 0); -            size_t num_late = std::count_if(wait_times.begin(), wait_times.end(), -                    [](double v){ return v < 0; }); -            double mean = sum / n; - -            double sq_sum = 0; -            for (const auto t : wait_times) { -                sq_sum += (t-mean) * (t-mean); -            } -            double stdev = sqrt(sq_sum / n); -            auto min_max = minmax_element(wait_times.begin(), wait_times.end()); - -            /* Debug code -            stringstream ss; -            ss << "times:"; -            for (const auto t : wait_times) { -                ss << " " << t; -            } -            etiLog.level(debug) << ss.str(); -            */ - -            const size_t dropped = num_dropped.exchange(0); - -            etiLog.level(info) << "Wait time statistics [microseconds]:" -                " min: " << *min_max.first << -                " max: " << *min_max.second << -                " mean: " << mean << -                " stdev: " << stdev << -                " late: " << -                num_late << " of " << wait_times.size() << " (" << -                num_late * 100.0 / n << "%)" << -                " dropped: " << dropped; - -            wait_times.clear(); -        } -    } -} diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h deleted file mode 100644 index 44502c1..0000000 --- a/src/zmq2edi/EDISender.h +++ /dev/null @@ -1,91 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2018 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#pragma once -#include <iostream> -#include <iterator> -#include <thread> -#include <vector> -#include <chrono> -#include <atomic> -#include "ThreadsafeQueue.h" -#include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/PFT.h" -#include "dabOutput/edi/Interleaver.h" - -// This metadata gets transmitted in the zmq stream -struct metadata_t { -    uint32_t edi_time = 0; -    int16_t utc_offset = 0; -    uint16_t dlfc = 0; -}; - -using frame_t = std::pair<std::vector<uint8_t>, metadata_t>; - -class EDISender { -    public: -        EDISender() = default; -        EDISender(const EDISender& other) = delete; -        EDISender& operator=(const EDISender& other) = delete; -        ~EDISender(); -        void start(const edi_configuration_t& conf, int delay_ms, int max_delay_ms); -        void push_frame(const frame_t& frame); -        void print_configuration(void); - -    private: -        void send_eti_frame(uint8_t* p, metadata_t metadata); -        void process(void); - -        int tist_delay_ms = 0; -        int tist_max_delay_ms = 0; -        std::atomic<bool> running = ATOMIC_VAR_INIT(false); -        std::thread process_thread; -        edi_configuration_t edi_conf; -        std::chrono::steady_clock::time_point startTime; -        ThreadsafeQueue<frame_t> frames; -        std::ofstream edi_debug_file; - -        // The TagPacket will then be placed into an AFPacket -        edi::AFPacketiser edi_afPacketiser; - -        // The AF Packet will be protected with reed-solomon and split in fragments -        edi::PFT edi_pft; - -        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order -        edi::Interleaver edi_interleaver; - -        // For statistics about wait time before we transmit packets, -        // in microseconds -        std::vector<double> wait_times; - -        // Number of frames dropped because their TIST was larger than max_delay -        std::atomic<size_t> num_dropped = ATOMIC_VAR_INIT(0); - -}; diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp deleted file mode 100644 index 63c3228..0000000 --- a/src/zmq2edi/zmq2edi.cpp +++ /dev/null @@ -1,419 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2018 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "Log.h" -#include "zmq.hpp" -#include <math.h> -#include <getopt.h> -#include <string.h> -#include <iostream> -#include <iterator> -#include <vector> - -#include "EDISender.h" -#include "dabOutput/dabOutput.h" - -constexpr size_t MAX_ERROR_COUNT = 10; -constexpr long ZMQ_TIMEOUT_MS = 1000; - -static edi_configuration_t edi_conf; - -static EDISender edisender; - -void usage(void) -{ -    using namespace std; - -    cerr << "Usage:" << endl; -    cerr << "odr-zmq2edi [options] <source>" << endl << endl; - -    cerr << "Options:" << endl; -    cerr << "The following options can be given only once:" << endl; -    cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; -    cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; -    cerr << " -W <max_delay> Drop ETI frames if TIST is <max_delay> later than current system time." << endl; -    cerr << " -p <destination port> sets the destination port." << endl; -    cerr << " -P Disable PFT and send AFPackets." << endl; -    cerr << " -f <fec> sets the FEC." << endl; -    cerr << " -i <interleave> enables the interleaved with this latency." << endl; -    cerr << " -D dumps the EDI to edi.debug file." << endl; -    cerr << " -v Enables verbose mode." << endl; -    cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl; - -    cerr << "The following options can be given several times, when more than once destination is addressed:" << endl; -    cerr << " -d <destination ip> sets the destination ip." << endl; -    cerr << " -s <source port> sets the source port." << endl; -    cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl; -    cerr << " -t <ttl> set the packet's TTL." << endl << endl; - -    cerr << "odr-zmq2edi will quit if it does not receive data for " << -        (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; -    cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl; -} - -static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes) -{ -    size_t remaining = size; -    if (remaining < 3) { -        etiLog.level(warn) << "Insufficient data to parse metadata"; -        throw std::runtime_error("Insufficient data"); -    } - -    metadata_t md; -    bool utc_offset_received = false; -    bool edi_time_received = false; -    bool dlfc_received = false; - -    while (remaining) { -        uint8_t id = buf[0]; -        uint16_t len = (((uint16_t)buf[1]) << 8) + buf[2]; - -        if (id == static_cast<uint8_t>(output_metadata_id_e::separation_marker)) { -            if (len != 0) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: separation_marker"; -            } - -            if (not utc_offset_received or not edi_time_received or not dlfc_received) { -                throw std::runtime_error("Incomplete metadata received"); -            } - -            remaining -= 3; -            *consumed_bytes = size - remaining; -            return md; -        } -        else if (id == static_cast<uint8_t>(output_metadata_id_e::utc_offset)) { -            if (len != 2) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: utc_offset"; -            } -            if (remaining < 2) { -                throw std::runtime_error("Insufficient data for utc_offset"); -            } -            uint16_t utco; -            std::memcpy(&utco, buf + 3, sizeof(utco)); -            md.utc_offset = ntohs(utco); -            utc_offset_received = true; -            remaining -= 5; -            buf += 5; -        } -        else if (id == static_cast<uint8_t>(output_metadata_id_e::edi_time)) { -            if (len != 4) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: edi_time"; -            } -            if (remaining < 4) { -                throw std::runtime_error("Insufficient data for edi_time"); -            } -            uint32_t edi_time; -            std::memcpy(&edi_time, buf + 3, sizeof(edi_time)); -            md.edi_time = ntohl(edi_time); -            edi_time_received = true; -            remaining -= 7; -            buf += 7; -        } -        else if (id == static_cast<uint8_t>(output_metadata_id_e::dlfc)) { -            if (len != 2) { -                etiLog.level(warn) << "Invalid length " << len << " for metadata: dlfc"; -            } -            if (remaining < 2) { -                throw std::runtime_error("Insufficient data for dlfc"); -            } -            uint16_t dlfc; -            std::memcpy(&dlfc, buf + 3, sizeof(dlfc)); -            md.dlfc = ntohs(dlfc); -            dlfc_received = true; -            remaining -= 5; -            buf += 5; -        } -    } - -    throw std::runtime_error("Insufficient data"); -} - -/* There is some state inside the parsing of destination arguments, - * because several destinations can be given.  */ - -static edi_destination_t edi_destination; -static bool source_port_set = false; -static bool source_addr_set = false; -static bool ttl_set = false; -static bool dest_addr_set = false; - -static void add_edi_destination(void) -{ -    if (not dest_addr_set) { -        throw std::runtime_error("Destination address not specified for destination number " + -                std::to_string(edi_conf.destinations.size() + 1)); -    } - -    edi_conf.destinations.push_back(edi_destination); -    edi_destination_t newdest; -    edi_destination = newdest; - -    source_port_set = false; -    source_addr_set = false; -    ttl_set = false; -    dest_addr_set = false; -} - -static void parse_destination_args(char option) -{ -    switch (option) { -        case 's': -            if (source_port_set) { -                add_edi_destination(); -            } -            edi_destination.source_port = std::stoi(optarg); -            source_port_set = true; -            break; -        case 'S': -            if (source_addr_set) { -                add_edi_destination(); -            } -            edi_destination.source_addr = optarg; -            source_addr_set = true; -            break; -        case 't': -            if (ttl_set) { -                add_edi_destination(); -            } -            edi_destination.ttl = std::stoi(optarg); -            ttl_set = true; -            break; -        case 'd': -            if (dest_addr_set) { -                add_edi_destination(); -            } -            edi_destination.dest_addr = optarg; -            dest_addr_set = true; -            break; -        default: -            throw std::logic_error("parse_destination_args invalid"); -    } -} - -int start(int argc, char **argv) -{ -    edi_conf.enable_pft = true; - -    if (argc == 0) { -        usage(); -        return 1; -    } - -    int delay_ms = 500; -    int max_delay_ms = 0; // no max delay - -    int ch = 0; -    while (ch != -1) { -        ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:W:"); -        switch (ch) { -            case -1: -                break; -            case 'd': -            case 's': -            case 'S': -            case 't': -                parse_destination_args(ch); -                break; -            case 'p': -                edi_conf.dest_port = std::stoi(optarg); -                break; -            case 'P': -                edi_conf.enable_pft = false; -                break; -            case 'f': -                edi_conf.fec = std::stoi(optarg); -                break; -            case 'i': -                { -                    double interleave_ms = std::stod(optarg); -                    if (interleave_ms != 0.0) { -                        if (interleave_ms < 0) { -                            throw std::runtime_error("EDI output: negative interleave value is invalid."); -                        } - -                        auto latency_rounded = lround(interleave_ms / 24.0); -                        if (latency_rounded * 24 > 30000) { -                            throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); -                        } - -                        edi_conf.latency_frames = latency_rounded; -                    } -                } -                break; -            case 'D': -                edi_conf.dump = true; -                break; -            case 'v': -                edi_conf.verbose = true; -                break; -            case 'a': -                edi_conf.tagpacket_alignment = std::stoi(optarg); -                break; -            case 'w': -                delay_ms = std::stoi(optarg); -                break; -            case 'W': -                max_delay_ms = std::stoi(optarg); -                break; -            case 'h': -            default: -                usage(); -                return 1; -        } -    } - -    add_edi_destination(); - -    if (optind >= argc) { -        etiLog.level(error) << "source option is missing"; -        return 1; -    } - -    if (edi_conf.dest_port == 0) { -        etiLog.level(error) << "No EDI destination port defined"; -        return 1; -    } - -    if (edi_conf.destinations.empty()) { -        etiLog.level(error) << "No EDI destinations set"; -        return 1; -    } - -    if (max_delay_ms > 0) { -        etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms and max delay " << max_delay_ms << " ms"; -    } -    else { -        etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms"; -    } -    edisender.start(edi_conf, delay_ms, max_delay_ms); -    edisender.print_configuration(); - -    const char* source_url = argv[optind]; - - -    size_t frame_count = 0; -    size_t error_count = 0; - -    etiLog.level(info) << "Opening ZMQ input: " << source_url; - -    zmq::context_t zmq_ctx(1); -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); -    zmq_sock.connect(source_url); -    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - -    while (error_count < MAX_ERROR_COUNT) { -        zmq::message_t incoming; -        zmq::pollitem_t items[1]; -        items[0].socket = zmq_sock; -        items[0].events = ZMQ_POLLIN; -        const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -        if (num_events == 0) { // timeout -            error_count++; -        } -        else { -            // Event received: recv will not block -            zmq_sock.recv(&incoming); - -            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - -            if (dab_msg->version != 1) { -                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; -                error_count++; -            } - -            int offset = sizeof(dab_msg->version) + -                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - -            std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; - -            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { -                    etiLog.level(error) << "ZeroMQ buffer " << i << -                        " has invalid length " << dab_msg->buflen[i]; -                    error_count++; -                } -                else { -                    std::vector<uint8_t> buf(6144, 0x55); - -                    const int framesize = dab_msg->buflen[i]; - -                    memcpy(&buf.front(), -                            ((uint8_t*)incoming.data()) + offset, -                            framesize); - -                    all_frames.emplace_back( -                            std::piecewise_construct, -                            std::make_tuple(std::move(buf)), -                            std::make_tuple()); - -                    offset += framesize; -                } -            } - -            for (auto &f : all_frames) { -                size_t consumed_bytes = 0; - -                f.second = get_md_one_frame( -                        static_cast<uint8_t*>(incoming.data()) + offset, -                        incoming.size() - offset, -                        &consumed_bytes); - -                offset += consumed_bytes; -            } - -            for (auto &f : all_frames) { -                edisender.push_frame(f); -                frame_count++; -            } -        } -    } - -    etiLog.level(info) << "Quitting after " << frame_count << " frames transferred"; - -    return 0; -} - -int main(int argc, char **argv) -{ -    etiLog.level(info) << "ZMQ2EDI converter from " << -        PACKAGE_NAME << " " << -#if defined(GITVERSION) -        GITVERSION << -#else -        PACKAGE_VERSION << -#endif -        " starting up"; - -    try { -        return start(argc, argv); -    } -    catch (std::runtime_error &e) { -        etiLog.level(error) << "Error: " << e.what(); -    } - -    return 1; -} | 
