From c8c9d5abf8238c3531ecae8fb272fdf5bbffd336 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 8 Jul 2019 12:08:44 +0200 Subject: Unify Socket library with other mmbTools --- src/EtiReader.h | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'src/EtiReader.h') diff --git a/src/EtiReader.h b/src/EtiReader.h index 38f7903..8548654 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -38,9 +38,6 @@ #include "SubchannelSource.h" #include "TimestampDecoder.h" #include "lib/edi/ETIDecoder.hpp" -#ifdef HAVE_EDI -# include "lib/UdpSocket.h" -#endif #include #include @@ -211,9 +208,9 @@ class EdiTransport { enum class Proto { UDP, TCP }; Proto m_proto; - UdpReceiver m_udp_rx; + Socket::UDPReceiver m_udp_rx; std::vector m_tcpbuffer; - TCPClient m_tcpclient; + Socket::TCPClient m_tcpclient; EdiDecoder::ETIDecoder& m_decoder; }; #endif -- cgit v1.2.3 From 93fc176f6c42136d344f9feb7dcbcb48c0ab72ce Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 8 Jul 2019 15:40:43 +0200 Subject: Update EDI input library --- Makefile.am | 2 + lib/edi/ETIDecoder.cpp | 251 +++++------------------------------------------ lib/edi/ETIDecoder.hpp | 49 ++++----- src/EtiReader.cpp | 10 +- src/EtiReader.h | 6 +- src/SubchannelSource.cpp | 4 +- src/SubchannelSource.h | 2 +- 7 files changed, 58 insertions(+), 266 deletions(-) (limited to 'src/EtiReader.h') diff --git a/Makefile.am b/Makefile.am index 668acaf..4e43bad 100644 --- a/Makefile.am +++ b/Makefile.am @@ -650,6 +650,8 @@ endif if COMPILE_EDI odr_dabmod_SOURCES += lib/edi/buffer_unpack.hpp \ + lib/edi/common.hpp \ + lib/edi/common.cpp \ lib/edi/eti.hpp \ lib/edi/eti.cpp \ lib/edi/ETIDecoder.hpp \ diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp index a5d817e..a1b801b 100644 --- a/lib/edi/ETIDecoder.cpp +++ b/lib/edi/ETIDecoder.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -30,242 +30,40 @@ namespace EdiDecoder { using namespace std; -ETIDecoder::ETIDecoder(DataCollector& data_collector, bool verbose) : +ETIDecoder::ETIDecoder(ETIDataCollector& data_collector, bool verbose) : m_data_collector(data_collector), - m_last_seq(0) + m_dispatcher(std::bind(&ETIDecoder::packet_completed, this), verbose) { - m_pft.setVerbose(verbose); + using std::placeholders::_1; + using std::placeholders::_2; + m_dispatcher.register_tag("*ptr", + std::bind(&ETIDecoder::decode_starptr, this, _1, _2)); + m_dispatcher.register_tag("deti", + std::bind(&ETIDecoder::decode_deti, this, _1, _2)); + m_dispatcher.register_tag("est", + std::bind(&ETIDecoder::decode_estn, this, _1, _2)); + m_dispatcher.register_tag("*dmy", + std::bind(&ETIDecoder::decode_stardmy, this, _1, _2)); } void ETIDecoder::push_bytes(const vector &buf) { - copy(buf.begin(), buf.end(), back_inserter(m_input_data)); - - while (m_input_data.size() > 2) { - if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { - const decode_state_t st = decode_afpacket(m_input_data); - - if (st.num_bytes_consumed == 0 and not st.complete) { - // We need to refill our buffer - break; - } - - if (st.num_bytes_consumed) { - vector remaining_data; - copy(m_input_data.begin() + st.num_bytes_consumed, - m_input_data.end(), - back_inserter(remaining_data)); - m_input_data = remaining_data; - } - - if (st.complete) { - m_data_collector.assemble(); - } - - } - else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { - PFT::Fragment fragment; - const size_t fragment_bytes = fragment.loadData(m_input_data); - - if (fragment_bytes == 0) { - // We need to refill our buffer - break; - } - - vector remaining_data; - copy(m_input_data.begin() + fragment_bytes, - m_input_data.end(), - back_inserter(remaining_data)); - m_input_data = remaining_data; - - if (fragment.isValid()) { - m_pft.pushPFTFrag(fragment); - } - - auto af = m_pft.getNextAFPacket(); - if (not af.empty()) { - decode_state_t st = decode_afpacket(af); - - if (st.complete) { - m_data_collector.assemble(); - } - } - - } - else { - etiLog.log(warn,"Unknown %c!", *m_input_data.data()); - m_input_data.erase(m_input_data.begin()); - } - } + m_dispatcher.push_bytes(buf); } void ETIDecoder::push_packet(const vector &buf) { - if (buf.size() < 2) { - throw std::invalid_argument("Not enough bytes to read EDI packet header"); - } - - if (buf[0] == 'A' and buf[1] == 'F') { - const decode_state_t st = decode_afpacket(buf); - - if (st.complete) { - m_data_collector.assemble(); - } - - } - else if (buf[0] == 'P' and buf[1] == 'F') { - PFT::Fragment fragment; - fragment.loadData(buf); - - if (fragment.isValid()) { - m_pft.pushPFTFrag(fragment); - } - - auto af = m_pft.getNextAFPacket(); - if (not af.empty()) { - const decode_state_t st = decode_afpacket(af); - - if (st.complete) { - m_data_collector.assemble(); - } - } - } - else { - const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; - std::stringstream ss; - ss << "Unknown EDI packet "; - ss << packettype; - throw std::invalid_argument(ss.str()); - } + m_dispatcher.push_packet(buf); } void ETIDecoder::setMaxDelay(int num_af_packets) { - m_pft.setMaxDelay(num_af_packets); + m_dispatcher.setMaxDelay(num_af_packets); } #define AFPACKET_HEADER_LEN 10 // includes SYNC -ETIDecoder::decode_state_t ETIDecoder::decode_afpacket( - const std::vector &input_data) -{ - if (input_data.size() < AFPACKET_HEADER_LEN) { - return {false, 0}; - } - - // read length from packet - uint32_t taglength = read_32b(input_data.begin() + 2); - uint16_t seq = read_16b(input_data.begin() + 6); - - const size_t crclength = 2; - if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { - return {false, 0}; - } - - if (m_last_seq + 1 != seq) { - etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; - } - m_last_seq = seq; - - bool has_crc = (input_data[8] & 0x80) ? true : false; - uint8_t major_revision = (input_data[8] & 0x70) >> 4; - uint8_t minor_revision = input_data[8] & 0x0F; - if (major_revision != 1 or minor_revision != 0) { - throw invalid_argument("EDI AF Packet has wrong revision " + - to_string(major_revision) + "." + to_string(minor_revision)); - } - uint8_t pt = input_data[9]; - if (pt != 'T') { - // only support Tag - return {false, 0}; - } - - - if (not has_crc) { - throw invalid_argument("AF packet not supported, has no CRC"); - } - - uint16_t crc = 0xffff; - for (size_t i = 0; i < AFPACKET_HEADER_LEN + taglength; i++) { - crc = crc16(crc, &input_data[i], 1); - } - crc ^= 0xffff; - - uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); - - if (packet_crc != crc) { - throw invalid_argument( - "AF Packet crc wrong"); - } - else { - vector payload(taglength); - copy(input_data.begin() + AFPACKET_HEADER_LEN, - input_data.begin() + AFPACKET_HEADER_LEN + taglength, - payload.begin()); - - return {decode_tagpacket(payload), - AFPACKET_HEADER_LEN + taglength + 2}; - } -} - -bool ETIDecoder::decode_tagpacket(const vector &payload) -{ - size_t length = 0; - - bool success = true; - - for (size_t i = 0; i + 8 < payload.size(); i += 8 + length) { - char tag_sz[5]; - tag_sz[4] = '\0'; - copy(payload.begin() + i, payload.begin() + i + 4, tag_sz); - - string tag(tag_sz); - - uint32_t taglength = read_32b(payload.begin() + i + 4); - - if (taglength % 8 != 0) { - etiLog.log(warn, "Invalid tag length!"); - break; - } - taglength /= 8; - - length = taglength; - - vector tag_value(taglength); - copy( payload.begin() + i+8, - payload.begin() + i+8+taglength, - tag_value.begin()); - - bool tagsuccess = false; - if (tag == "*ptr") { - tagsuccess = decode_starptr(tag_value); - } - else if (tag == "deti") { - tagsuccess = decode_deti(tag_value); - } - else if (tag.substr(0, 3) == "est") { - uint8_t n = tag_sz[3]; - tagsuccess = decode_estn(tag_value, n); - } - else if (tag == "*dmy") { - tagsuccess = decode_stardmy(tag_value); - } - else { - etiLog.log(warn, "Unknown TAG %s", tag.c_str()); - break; - } - - if (not tagsuccess) { - etiLog.log(warn, "Error decoding TAG %s", tag.c_str()); - success = tagsuccess; - break; - } - } - - return success; -} - -bool ETIDecoder::decode_starptr(const vector &value) +bool ETIDecoder::decode_starptr(const vector &value, uint16_t) { if (value.size() != 0x40 / 8) { etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size()); @@ -285,7 +83,7 @@ bool ETIDecoder::decode_starptr(const vector &value) return true; } -bool ETIDecoder::decode_deti(const vector &value) +bool ETIDecoder::decode_deti(const vector &value, uint16_t) { /* uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); @@ -364,7 +162,7 @@ bool ETIDecoder::decode_deti(const vector &value) fic.begin()); i += fic_length; - m_data_collector.update_fic(fic); + m_data_collector.update_fic(move(fic)); } if (rfudf) { @@ -385,7 +183,7 @@ bool ETIDecoder::decode_deti(const vector &value) return true; } -bool ETIDecoder::decode_estn(const vector &value, uint8_t n) +bool ETIDecoder::decode_estn(const vector &value, uint16_t n) { uint32_t sstc = read_24b(value.begin()); @@ -404,14 +202,19 @@ bool ETIDecoder::decode_estn(const vector &value, uint8_t n) value.end(), back_inserter(stc.mst)); - m_data_collector.add_subchannel(stc); + m_data_collector.add_subchannel(move(stc)); return true; } -bool ETIDecoder::decode_stardmy(const vector& /*value*/) +bool ETIDecoder::decode_stardmy(const vector& /*value*/, uint16_t) { return true; } +void ETIDecoder::packet_completed() +{ + m_data_collector.assemble(); +} + } diff --git a/lib/edi/ETIDecoder.hpp b/lib/edi/ETIDecoder.hpp index 37a564f..f5d0b81 100644 --- a/lib/edi/ETIDecoder.hpp +++ b/lib/edi/ETIDecoder.hpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -20,12 +20,12 @@ */ #pragma once -#include +#include "eti.hpp" +#include "common.hpp" +#include #include #include #include -#include "PFT.hpp" -#include "eti.hpp" namespace EdiDecoder { @@ -33,7 +33,7 @@ namespace EdiDecoder { // EDI. // // Number of streams is given separately, and frame length -// is calculated in the DataCollector +// is calculated in the ETIDataCollector struct eti_fc_data { bool atstf; uint32_t tsta; @@ -58,10 +58,10 @@ struct eti_stc_data { }; /* A class that receives multiplex data must implement the interface described - * in the DataCollector. This can be e.g. a converter to ETI, or something that + * in the ETIDataCollector. This can be e.g. a converter to ETI, or something that * prepares data structures for a modulator. */ -class DataCollector { +class ETIDataCollector { public: // Tell the ETIWriter what EDI protocol we receive in *ptr. // This is not part of the ETI data, but is used as check @@ -73,21 +73,19 @@ class DataCollector { // Update the data for the frame characterisation virtual void update_fc_data(const eti_fc_data& fc_data) = 0; - virtual void update_fic(const std::vector& fic) = 0; + virtual void update_fic(std::vector&& fic) = 0; virtual void update_err(uint8_t err) = 0; // In addition to TSTA in ETI, EDI also transports more time // stamp information. - virtual void update_edi_time( - uint32_t utco, - uint32_t seconds) = 0; + virtual void update_edi_time(uint32_t utco, uint32_t seconds) = 0; virtual void update_mnsc(uint16_t mnsc) = 0; virtual void update_rfu(uint16_t rfu) = 0; - virtual void add_subchannel(const eti_stc_data& stc) = 0; + virtual void add_subchannel(eti_stc_data&& stc) = 0; // Tell the ETIWriter that the AFPacket is complete virtual void assemble(void) = 0; @@ -101,7 +99,7 @@ class DataCollector { */ class ETIDecoder { public: - ETIDecoder(DataCollector& data_collector, bool verbose); + ETIDecoder(ETIDataCollector& data_collector, bool verbose); /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams @@ -120,27 +118,16 @@ class ETIDecoder { void setMaxDelay(int num_af_packets); private: - struct decode_state_t { - decode_state_t(bool _complete, size_t _num_bytes_consumed) : - complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} - bool complete; - size_t num_bytes_consumed; - }; - - decode_state_t decode_afpacket(const std::vector &input_data); - bool decode_tagpacket(const std::vector &payload); - bool decode_starptr(const std::vector &value); - bool decode_deti(const std::vector &value); - bool decode_estn(const std::vector &value, uint8_t n); - bool decode_stardmy(const std::vector &value); - - DataCollector& m_data_collector; + bool decode_starptr(const std::vector &value, uint16_t); + bool decode_deti(const std::vector &value, uint16_t); + bool decode_estn(const std::vector &value, uint16_t n); + bool decode_stardmy(const std::vector &value, uint16_t); - PFT::PFT m_pft; + void packet_completed(); - uint16_t m_last_seq; + ETIDataCollector& m_data_collector; + TagDispatcher m_dispatcher; - std::vector m_input_data; }; } diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 93008bb..05e243c 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -238,7 +238,7 @@ int EtiReader::loadEtiData(const Buffer& dataIn) unsigned size = mySources[i]->framesize(); PDEBUG("Writting %i bytes of subchannel data\n", size); Buffer subch(size, in); - mySources[i]->loadSubchannelData(subch); + mySources[i]->loadSubchannelData(move(subch)); input_size -= size; framesize -= size; in += size; @@ -428,12 +428,12 @@ void EdiReader::update_fc_data(const EdiDecoder::eti_fc_data& fc_data) m_fc_valid = true; } -void EdiReader::update_fic(const std::vector& fic) +void EdiReader::update_fic(std::vector&& fic) { if (not m_proto_valid) { throw std::logic_error("Cannot update FIC before protocol"); } - m_fic = fic; + m_fic = move(fic); } void EdiReader::update_edi_time( @@ -469,7 +469,7 @@ void EdiReader::update_rfu(uint16_t rfu) m_rfu = rfu; } -void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc) +void EdiReader::add_subchannel(EdiDecoder::eti_stc_data&& stc) { if (not m_proto_valid) { throw std::logic_error("Cannot add subchannel before protocol"); @@ -485,7 +485,7 @@ void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc) throw std::invalid_argument( "EDI: MST data length inconsistent with FIC"); } - source->loadSubchannelData(stc.mst); + source->loadSubchannelData(move(stc.mst)); if (m_sources.size() > 64) { throw std::invalid_argument("Too many subchannels"); diff --git a/src/EtiReader.h b/src/EtiReader.h index 8548654..99ca715 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -115,7 +115,7 @@ private: /* The EdiReader extracts the necessary data using the EDI input library in * lib/edi */ -class EdiReader : public EtiSource, public EdiDecoder::DataCollector +class EdiReader : public EtiSource, public EdiDecoder::ETIDataCollector { public: EdiReader(double& tist_offset_s); @@ -139,7 +139,7 @@ public: // Update the data for the frame characterisation virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data); - virtual void update_fic(const std::vector& fic); + virtual void update_fic(std::vector&& fic); virtual void update_err(uint8_t err); @@ -153,7 +153,7 @@ public: virtual void update_rfu(uint16_t rfu); - virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc); + virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc); // Gets called by the EDI library to tell us that all data for a frame was given to us virtual void assemble(void); diff --git a/src/SubchannelSource.cpp b/src/SubchannelSource.cpp index b4d6750..443b5d2 100644 --- a/src/SubchannelSource.cpp +++ b/src/SubchannelSource.cpp @@ -1046,9 +1046,9 @@ size_t SubchannelSource::protectionOption() const return 0; } -void SubchannelSource::loadSubchannelData(const Buffer& data) +void SubchannelSource::loadSubchannelData(Buffer&& data) { - d_buffer = data; + d_buffer = std::move(data); } int SubchannelSource::process(Buffer* outputData) diff --git a/src/SubchannelSource.h b/src/SubchannelSource.h index b4ca697..68e6ff2 100644 --- a/src/SubchannelSource.h +++ b/src/SubchannelSource.h @@ -59,7 +59,7 @@ public: size_t protectionOption() const; const std::vector& get_rules() const; - void loadSubchannelData(const Buffer& data); + void loadSubchannelData(Buffer&& data); int process(Buffer* outputData); const char* name() { return "SubchannelSource"; } -- cgit v1.2.3 From 32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Thu, 5 Sep 2019 11:01:56 +0200 Subject: Rework EDI input - Use same main loop for both ETI and EDI inputs - Test SFN functionality with EDI input - Add log.show_process_time setting for process time printout --- .travis.yml | 7 - INSTALL | 1 - Makefile.am | 22 ++- configure.ac | 8 - doc/example.ini | 3 + src/ConfigParser.cpp | 2 + src/ConfigParser.h | 1 + src/DabMod.cpp | 480 ++++++++++++++++++++++++--------------------------- src/DabModulator.cpp | 2 +- src/EtiReader.cpp | 14 +- src/EtiReader.h | 11 +- src/Flowgraph.cpp | 9 +- src/Flowgraph.h | 7 +- src/OutputMemory.cpp | 1 + src/Utils.cpp | 3 - 15 files changed, 274 insertions(+), 297 deletions(-) (limited to 'src/EtiReader.h') diff --git a/.travis.yml b/.travis.yml index 990f0a9..ae8c4ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,13 +47,6 @@ matrix: compiler: gcc addons: *linuxaddons - - env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF="--disable-output-uhd --enable-edi" - os: linux - dist: xenial - sudo: required - compiler: gcc - addons: *linuxaddons - - env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF="--disable-output-uhd --enable-trace" os: linux dist: xenial diff --git a/INSTALL b/INSTALL index 0132f1c..03ddda1 100644 --- a/INSTALL +++ b/INSTALL @@ -32,7 +32,6 @@ The configure script can be launch with a variety of options: This is meant for distribution package maintainers who want to use their own march option, and for people running into compilation issues due to -march=native. (e.g. GCC bug 70132 on ARM systems) - --enable-edi Enable the EDI input. Debugging options: You should not enable any debug option if you need good performance. --enable-trace Create debugging files for each DSP block for data analysis diff --git a/Makefile.am b/Makefile.am index 34d2d91..9644c16 100644 --- a/Makefile.am +++ b/Makefile.am @@ -114,7 +114,16 @@ odr_dabmod_SOURCES = src/DabMod.cpp \ lib/fec/fec.h \ lib/fec/init_rs_char.c \ lib/fec/init_rs.h \ - lib/fec/rs-common.h + lib/fec/rs-common.h \ + lib/edi/buffer_unpack.hpp \ + lib/edi/common.hpp \ + lib/edi/common.cpp \ + lib/edi/eti.hpp \ + lib/edi/eti.cpp \ + lib/edi/ETIDecoder.hpp \ + lib/edi/ETIDecoder.cpp \ + lib/edi/PFT.hpp \ + lib/edi/PFT.cpp if !COMPILE_FOR_EASYDABV3 odr_dabmod_SOURCES += \ @@ -161,14 +170,3 @@ odr_dabmod_SOURCES += \ odr_dabmod_LDADD += $(UHD_LIBS) endif -if COMPILE_EDI -odr_dabmod_SOURCES += lib/edi/buffer_unpack.hpp \ - lib/edi/common.hpp \ - lib/edi/common.cpp \ - lib/edi/eti.hpp \ - lib/edi/eti.cpp \ - lib/edi/ETIDecoder.hpp \ - lib/edi/ETIDecoder.cpp \ - lib/edi/PFT.hpp \ - lib/edi/PFT.cpp -endif diff --git a/configure.ac b/configure.ac index 270a3ff..7e05620 100644 --- a/configure.ac +++ b/configure.ac @@ -49,9 +49,6 @@ AC_ARG_ENABLE([trace], AC_ARG_ENABLE([zeromq], [AS_HELP_STRING([--disable-zeromq], [Disable ZeroMQ input, output and remote control])], [], [enable_zeromq=yes]) -AC_ARG_ENABLE([edi], - [AS_HELP_STRING([--enable-edi], [Enable EDI input])], - [], [enable_edi=no]) AC_ARG_ENABLE([native], [AS_HELP_STRING([--disable-native], [Do not compile with -march=native])], [], [enable_native=yes]) @@ -124,11 +121,6 @@ AS_IF([test "x$enable_output_uhd" = "xyes"], AS_IF([test "x$enable_soapysdr" = "xyes"], [AC_DEFINE(HAVE_SOAPYSDR, [1], [Define if SoapySDR output is enabled])]) -AS_IF([test "x$enable_edi" = "xyes"], - [AC_DEFINE(HAVE_EDI, [1], [Define if EDI input is enabled]) ]) - -AM_CONDITIONAL([COMPILE_EDI], [test "x$enable_edi" = "xyes"]) - AS_IF([test "x$enable_easydabv3" = "xyes"], AC_DEFINE(BUILD_FOR_EASYDABV3, [1], [Define if we are building for EasyDABv3])) diff --git a/doc/example.ini b/doc/example.ini index b3e2eb3..72e3386 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -22,6 +22,9 @@ syslog=0 filelog=0 filename=odr-dabmod.log +; If you don't want to see the flowgraph processing time, set: +;show_process_time=0 + [input] ; A file or fifo input is using transport=file transport=file diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 80103c4..d5d1995 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -144,6 +144,8 @@ static void parse_configfile( etiLog.register_backend(make_shared(trace_filename)); } + mod_settings.showProcessTime = pt.GetInteger("log.show_process_time", + mod_settings.showProcessTime); // modulator parameters: const string gainMode_setting = pt.Get("modulator.gainmode", "var"); diff --git a/src/ConfigParser.h b/src/ConfigParser.h index ee961fa..7e706c0 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -86,6 +86,7 @@ struct mod_settings_t { Output::SDRDeviceConfig sdr_device_config; #endif + bool showProcessTime = true; }; void parse_args(int argc, char **argv, mod_settings_t& mod_settings); diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 81882e4..922f9e4 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -59,7 +59,6 @@ #include "OutputZeroMQ.h" #include "InputReader.h" #include "PcDebug.h" -#include "TimestampDecoder.h" #include "FIRFilter.h" #include "RemoteControl.h" #include "ConfigParser.h" @@ -94,12 +93,16 @@ void signalHandler(int signalNb) struct modulator_data { + // For ETI std::shared_ptr inputReader; - Buffer data; - uint64_t framecount = 0; + std::shared_ptr etiReader; + + // For EDI + std::shared_ptr ediInput; - Flowgraph* flowgraph = nullptr; - EtiReader* etiReader = nullptr; + // Common to both EDI and EDI + uint64_t framecount = 0; + Flowgraph *flowgraph = nullptr; }; enum class run_modulator_state_t { @@ -299,205 +302,137 @@ int launch_modulator(int argc, char* argv[]) etiLog.level(error) << "Could not set priority for modulator:" << r; } + shared_ptr inputReader; + shared_ptr ediInput; + if (mod_settings.inputTransport == "edi") { -#ifdef HAVE_EDI - EdiReader ediReader(mod_settings.tist_offset_s); - EdiDecoder::ETIDecoder ediInput(ediReader, false); - if (mod_settings.edi_max_delay_ms > 0.0f) { - // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames - ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f)); - } - EdiTransport ediTransport(ediInput); + ediInput = make_shared(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms); - ediTransport.Open(mod_settings.inputName); - if (not ediTransport.isEnabled()) { + ediInput->ediTransport.Open(mod_settings.inputName); + if (not ediInput->ediTransport.isEnabled()) { throw runtime_error("inputTransport is edi, but ediTransport is not enabled"); } - Flowgraph flowgraph; - - auto modulator = make_shared(ediReader, mod_settings); - rcs.enrol(modulator.get()); + } + else if (mod_settings.inputTransport == "file") { + auto inputFileReader = make_shared(); - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); + // Opening ETI input file + if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) { + throw std::runtime_error("Unable to open input"); } - size_t framecount = 0; - - bool first_frame = true; - - auto frame_received_tp = chrono::steady_clock::now(); - - while (running) { - while (running and not ediReader.isFrameReady()) { - try { - bool packet_received = ediTransport.rxPacket(); - if (packet_received) { - frame_received_tp = chrono::steady_clock::now(); - } - } - catch (const std::runtime_error& e) { - etiLog.level(warn) << "EDI input: " << e.what(); - running = 0; - break; - } - - if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) { - etiLog.level(error) << "No EDI data received in 10 seconds."; - running = 0; - break; - } - } - - if (not running) { - break; - } - - if (first_frame) { - if (ediReader.getFp() != 0) { - // Do not start the flowgraph before we get to FP 0 - // to ensure all blocks are properly aligned. - ediReader.clearFrame(); - continue; - } - else { - first_frame = false; - } - } - - framecount++; - flowgraph.run(); - ediReader.clearFrame(); - - /* Check every once in a while if the remote control - * is still working */ - if ((framecount % 250) == 0) { - rcs.check_faults(); - } - } -#else + inputReader = inputFileReader; + } + else if (mod_settings.inputTransport == "zeromq") { +#if !defined(HAVE_ZEROMQ) throw std::runtime_error("Unable to open input: " - "EDI input transport selected, but not compiled in!"); -#endif // HAVE_EDI + "ZeroMQ input transport selected, but not compiled in!"); +#else + auto inputZeroMQReader = make_shared(); + inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); + inputReader = inputZeroMQReader; +#endif + } + else if (mod_settings.inputTransport == "tcp") { + auto inputTcpReader = make_shared(); + inputTcpReader->Open(mod_settings.inputName); + inputReader = inputTcpReader; } else { - shared_ptr inputReader; + throw std::runtime_error("Unable to open input: " + "invalid input transport " + mod_settings.inputTransport + " selected!"); + } - if (mod_settings.inputTransport == "file") { - auto inputFileReader = make_shared(); + bool run_again = true; - // Opening ETI input file - if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) { - throw std::runtime_error("Unable to open input"); - } + while (run_again) { + Flowgraph flowgraph(mod_settings.showProcessTime); - inputReader = inputFileReader; + modulator_data m; + m.ediInput = ediInput; + m.inputReader = inputReader; + m.flowgraph = &flowgraph; + + shared_ptr modulator; + if (inputReader) { + m.etiReader = make_shared(mod_settings.tist_offset_s); + modulator = make_shared(*m.etiReader, mod_settings); } - else if (mod_settings.inputTransport == "zeromq") { -#if !defined(HAVE_ZEROMQ) - throw std::runtime_error("Unable to open input: " - "ZeroMQ input transport selected, but not compiled in!"); -#else - auto inputZeroMQReader = make_shared(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; -#endif + else if (ediInput) { + modulator = make_shared(ediInput->ediReader, mod_settings); } - else if (mod_settings.inputTransport == "tcp") { - auto inputTcpReader = make_shared(); - inputTcpReader->Open(mod_settings.inputName); - inputReader = inputTcpReader; + + rcs.enrol(modulator.get()); + + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); } else { - throw std::runtime_error("Unable to open input: " - "invalid input transport " + mod_settings.inputTransport + " selected!"); + flowgraph.connect(modulator, output); } - bool run_again = true; - - while (run_again) { - Flowgraph flowgraph; - - modulator_data m; - m.inputReader = inputReader; - m.flowgraph = &flowgraph; - m.data.setLength(6144); - - EtiReader etiReader(mod_settings.tist_offset_s); - m.etiReader = &etiReader; - - auto input = make_shared(&m.data); - auto modulator = make_shared(etiReader, mod_settings); - rcs.enrol(modulator.get()); - - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); - } - + if (inputReader) { etiLog.level(info) << inputReader->GetPrintableInfo(); + } - run_modulator_state_t st = run_modulator(m); - etiLog.log(trace, "DABMOD,run_modulator() = %d", st); - - switch (st) { - case run_modulator_state_t::failure: - etiLog.level(error) << "Modulator failure."; - run_again = false; - ret = 1; - break; - case run_modulator_state_t::again: - etiLog.level(warn) << "Restart modulator."; - run_again = false; - if (auto in = dynamic_pointer_cast(inputReader)) { - if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) { - etiLog.level(error) << "Unable to open input file!"; - ret = 1; - } - else { - run_again = true; - } + run_modulator_state_t st = run_modulator(m); + etiLog.log(trace, "DABMOD,run_modulator() = %d", st); + + switch (st) { + case run_modulator_state_t::failure: + etiLog.level(error) << "Modulator failure."; + run_again = false; + ret = 1; + break; + case run_modulator_state_t::again: + etiLog.level(warn) << "Restart modulator."; + run_again = false; + if (auto in = dynamic_pointer_cast(inputReader)) { + if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) { + etiLog.level(error) << "Unable to open input file!"; + ret = 1; } -#if defined(HAVE_ZEROMQ) - else if (auto in_zmq = dynamic_pointer_cast(inputReader)) { + else { run_again = true; - // Create a new input reader - rcs.remove_controllable(in_zmq.get()); - auto inputZeroMQReader = make_shared(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; } + } +#if defined(HAVE_ZEROMQ) + else if (auto in_zmq = dynamic_pointer_cast(inputReader)) { + run_again = true; + // Create a new input reader + rcs.remove_controllable(in_zmq.get()); + auto inputZeroMQReader = make_shared(); + inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); + inputReader = inputZeroMQReader; + } #endif - else if (dynamic_pointer_cast(inputReader)) { - // Keep the same inputReader, as there is no input buffer overflow - run_again = true; - } - break; - case run_modulator_state_t::reconfigure: - etiLog.level(warn) << "Detected change in ensemble configuration."; - /* We can keep the input in this care */ + else if (dynamic_pointer_cast(inputReader)) { + // Keep the same inputReader, as there is no input buffer overflow run_again = true; - break; - case run_modulator_state_t::normal_end: - default: - etiLog.level(info) << "modulator stopped."; - ret = 0; - run_again = false; - break; - } - - etiLog.level(info) << m.framecount << " DAB frames encoded"; - etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; + } + else if (ediInput) { + // In EDI, keep the same input + run_again = true; + } + break; + case run_modulator_state_t::reconfigure: + etiLog.level(warn) << "Detected change in ensemble configuration."; + /* We can keep the input in this case */ + run_again = true; + break; + case run_modulator_state_t::normal_end: + default: + etiLog.level(info) << "modulator stopped."; + ret = 0; + run_again = false; + break; } + + etiLog.level(info) << m.framecount << " DAB frames encoded"; + etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; } etiLog.level(info) << "Terminating"; @@ -516,57 +451,142 @@ static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { - bool first_frame = true; int last_eti_fct = -1; auto last_frame_received = chrono::steady_clock::now(); + Buffer data; + if (m.inputReader) { + data.setLength(6144); + } while (running) { - int framesize; - - PDEBUG("*****************************************\n"); - PDEBUG("* Starting main loop\n"); - PDEBUG("*****************************************\n"); - while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { - if (!running) { - break; - } + while (true) { + unsigned fct = 0; + unsigned fp = 0; + + /* Load ETI data from the source */ + if (m.inputReader) { + int framesize = m.inputReader->GetNextFrame(data.getData()); + + if (framesize == 0) { + if (dynamic_pointer_cast(m.inputReader)) { + etiLog.level(info) << "End of file reached."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } +#if defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast(m.inputReader)) { + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. + * + * Also, we reconnect zmq every 10 seconds to avoid some + * issues, discussed in + * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection + * + * > It is possible that the PUB socket sees the error + * > while the SUB socket does not. + * > + * > The ZMTP RFC has a proposal for heartbeating that would + * > solve this problem. The current best solution is for + * > PUB sockets to send heartbeats (e.g. 1 per second) when + * > traffic is low, and for SUB sockets to disconnect / + * > reconnect if they stop getting these. + * + * We don't need a heartbeat, because our application is constant frame rate, + * the frames themselves can act as heartbeats. + */ + + const auto now = chrono::steady_clock::now(); + if (last_frame_received + chrono::seconds(10) < now) { + throw zmq_input_timeout(); + } + } +#endif // defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast(m.inputReader)) { + /* Same as for ZeroMQ */ + } + else { + throw logic_error("Unhandled framesize==0!"); + } + } + else if (framesize < 0) { + etiLog.level(error) << "Input read error."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } + + const int eti_bytes_read = m.etiReader->loadEtiData(data); + if ((size_t)eti_bytes_read != data.getLength()) { + etiLog.level(error) << "ETI frame incompletely read"; + throw std::runtime_error("ETI read error"); + } - last_frame_received = chrono::steady_clock::now(); + fct = m.etiReader->getFct(); + fp = m.etiReader->getFp(); + } + else if (m.ediInput) { + while (running and not m.ediInput->ediReader.isFrameReady()) { + try { + bool packet_received = m.ediInput->ediTransport.rxPacket(); + if (packet_received) { + last_frame_received = chrono::steady_clock::now(); + } + } + catch (const std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); + running = 0; + break; + } - m.framecount++; + if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) { + etiLog.level(error) << "No EDI data received in 10 seconds."; + running = 0; + break; + } + } - PDEBUG("*****************************************\n"); - PDEBUG("* Read frame %lu\n", m.framecount); - PDEBUG("*****************************************\n"); + if (!running) { + break; + } - const int eti_bytes_read = m.etiReader->loadEtiData(m.data); - if ((size_t)eti_bytes_read != m.data.getLength()) { - etiLog.level(error) << "ETI frame incompletely read"; - throw std::runtime_error("ETI read error"); + fct = m.ediInput->ediReader.getFct(); + fp = m.ediInput->ediReader.getFp(); } - if (first_frame) { - if (m.etiReader->getFp() != 0) { + const unsigned expected_fct = (last_eti_fct + 1) % 250; + if (last_eti_fct == -1) { + if (fp != 0) { // Do not start the flowgraph before we get to FP 0 // to ensure all blocks are properly aligned. + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } continue; } else { - first_frame = false; + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); } } - - // Check for ETI FCT continuity - const unsigned expected_fct = (last_eti_fct + 1) % 250; - const unsigned fct = m.etiReader->getFct(); - if (last_eti_fct != -1 and expected_fct != fct) { + else if (fct == expected_fct) { + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); + } + else { etiLog.level(info) << "ETI FCT discontinuity, expected " << - expected_fct << " received " << m.etiReader->getFct(); + expected_fct << " received " << fct; + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } return run_modulator_state_t::again; } - last_eti_fct = fct; - m.flowgraph->run(); + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } /* Check every once in a while if the remote control * is still working */ @@ -574,52 +594,6 @@ static run_modulator_state_t run_modulator(modulator_data& m) rcs.check_faults(); } } - if (framesize == 0) { - if (dynamic_pointer_cast(m.inputReader)) { - etiLog.level(info) << "End of file reached."; - running = 0; - ret = run_modulator_state_t::normal_end; - } -#if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast(m.inputReader)) { - /* An empty frame marks a timeout. We ignore it, but we are - * now able to handle SIGINT properly. - * - * Also, we reconnect zmq every 10 seconds to avoid some - * issues, discussed in - * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection - * - * > It is possible that the PUB socket sees the error - * > while the SUB socket does not. - * > - * > The ZMTP RFC has a proposal for heartbeating that would - * > solve this problem. The current best solution is for - * > PUB sockets to send heartbeats (e.g. 1 per second) when - * > traffic is low, and for SUB sockets to disconnect / - * > reconnect if they stop getting these. - * - * We don't need a heartbeat, because our application is constant frame rate, - * the frames themselves can act as heartbeats. - */ - - const auto now = chrono::steady_clock::now(); - if (last_frame_received + chrono::seconds(10) < now) { - throw zmq_input_timeout(); - } - } -#endif // defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast(m.inputReader)) { - /* Same as for ZeroMQ */ - } - else { - throw logic_error("Unhandled framesize==0!"); - } - } - else { - etiLog.level(error) << "Input read error."; - running = 0; - ret = run_modulator_state_t::normal_end; - } } } catch (const zmq_input_timeout&) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 7e3ccf0..aa4f2a8 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -132,7 +132,7 @@ int DabModulator::process(Buffer* dataOut) const unsigned mode = m_settings.dabMode; setMode(mode); - myFlowgraph = make_shared(); + myFlowgraph = make_shared(m_settings.showProcessTime); //////////////////////////////////////////////////////////////// // CIF data initialisation //////////////////////////////////////////////////////////////// diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 719966b..25c1ada 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -312,7 +312,6 @@ uint32_t EtiReader::getPPSOffset() return timestamp; } -#ifdef HAVE_EDI EdiReader::EdiReader( double& tist_offset_s) : m_timestamp_decoder(tist_offset_s) @@ -654,4 +653,15 @@ bool EdiTransport::rxPacket() } throw logic_error("Incomplete rxPacket implementation!"); } -#endif // HAVE_EDI + +EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) : + ediReader(tist_offset_s), + decoder(ediReader, false), + ediTransport(decoder) +{ + if (edi_max_delay_ms > 0.0f) { + // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames + decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f)); + } +} + diff --git a/src/EtiReader.h b/src/EtiReader.h index 99ca715..28fb2ac 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -111,7 +111,6 @@ private: std::vector > mySources; }; -#ifdef HAVE_EDI /* The EdiReader extracts the necessary data using the EDI input library in * lib/edi */ @@ -213,5 +212,13 @@ class EdiTransport { Socket::TCPClient m_tcpclient; EdiDecoder::ETIDecoder& m_decoder; }; -#endif + +// EdiInput wraps an EdiReader, an EdiDecoder::ETIDecoder and an EdiTransport +class EdiInput { + public: + EdiInput(double& tist_offset_s, float edi_max_delay_ms); + EdiReader ediReader; + EdiDecoder::ETIDecoder decoder; + EdiTransport ediTransport; +}; diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index 4c83fe8..3d4cdcc 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -43,8 +43,7 @@ using EdgeIterator = std::vector >::iterator; Node::Node(shared_ptr plugin) : - myPlugin(plugin), - myProcessTime(0) + myPlugin(plugin) { PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", plugin->name(), plugin.get(), this); @@ -237,8 +236,8 @@ Edge::~Edge() -Flowgraph::Flowgraph() : - myProcessTime(0) +Flowgraph::Flowgraph(bool showProcessTime) : + myShowProcessTime(showProcessTime) { PDEBUG("Flowgraph::Flowgraph() @ %p\n", this); } @@ -248,7 +247,7 @@ Flowgraph::~Flowgraph() { PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this); - if (myProcessTime) { + if (myShowProcessTime and myProcessTime) { stringstream ss; ss << "Process time:\n"; diff --git a/src/Flowgraph.h b/src/Flowgraph.h index 389359b..753070b 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -71,7 +71,7 @@ protected: #endif std::shared_ptr myPlugin; - time_t myProcessTime; + time_t myProcessTime = 0; }; @@ -94,7 +94,7 @@ protected: class Flowgraph { public: - Flowgraph(); + Flowgraph(bool showProcessTime); virtual ~Flowgraph(); Flowgraph(const Flowgraph&) = delete; Flowgraph& operator=(const Flowgraph&) = delete; @@ -106,7 +106,8 @@ public: protected: std::vector > nodes; std::vector > edges; - time_t myProcessTime; + time_t myProcessTime = 0; + bool myShowProcessTime; }; diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp index 5f24095..d6ef917 100644 --- a/src/OutputMemory.cpp +++ b/src/OutputMemory.cpp @@ -27,6 +27,7 @@ #include "OutputMemory.h" #include "PcDebug.h" #include "Log.h" +#include "TimestampDecoder.h" #include #include diff --git a/src/Utils.cpp b/src/Utils.cpp index 50af4fb..6f4f3a3 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -46,9 +46,6 @@ static void printHeader() #if defined(HAVE_ZEROMQ) "zeromq " << #endif -#ifdef HAVE_EDI - "EDI " << -#endif #if defined(HAVE_OUTPUT_UHD) "output_uhd " << #endif -- cgit v1.2.3