From ea5594186bafa5489d6086a26d71b8f3d1ccf9cd Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 13 Jan 2017 11:53:15 +0100 Subject: Add threaded UDP input for EDI --- src/DabMod.cpp | 9 ++++++++- src/EtiReader.cpp | 38 ++++++++++---------------------------- src/EtiReader.h | 35 +++++++++++++++++++++++++---------- src/InputReader.h | 1 - src/Utils.cpp | 3 ++- 5 files changed, 45 insertions(+), 41 deletions(-) (limited to 'src') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 8065a5a..ac7842f 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -627,6 +627,9 @@ int launch_modulator(int argc, char* argv[]) else if (inputName.substr(0, 6) == "tcp://") { inputTransport = "tcp"; } + else if (inputName.substr(0, 6) == "udp://") { + inputTransport = "edi"; + } } else { inputName = "/dev/stdin"; @@ -784,7 +787,11 @@ int launch_modulator(int argc, char* argv[]) } set_thread_name("modulator"); - if (ediUdpInput.isEnabled()) { + if (inputTransport == "edi") { + if (not ediUdpInput.isEnabled()) { + etiLog.level(error) << "inputTransport is edi, but ediUdpInput is not enabled"; + return -1; + } Flowgraph flowgraph; auto modulator = make_shared( diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index cc7b004..a1053c6 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -508,49 +508,31 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) : m_port(0), m_decoder(decoder) { } -int EdiUdpInput::Open(const std::string& uri) +void EdiUdpInput::Open(const std::string& uri) { etiLog.level(info) << "Opening EDI :" << uri; - int ret = 1; - const std::regex re_udp("udp://:([0-9]+)"); std::smatch m; if (std::regex_match(uri, m, re_udp)) { m_port = std::stoi(m[1].str()); etiLog.level(info) << "EDI port :" << m_port; - ret = m_sock.reinit(m_port, "0.0.0.0"); - m_enabled = (ret == 0); + m_udp_rx.start(m_port); + m_enabled = true; } - - return ret; } bool EdiUdpInput::rxPacket() { - const size_t packsize = 8192; - UdpPacket packet(packsize); - - int ret = m_sock.receive(packet); - if (ret == 0) { - const auto &buf = packet.getBuffer(); - if (packet.getSize() == packsize) { - etiLog.log(warn, "Warning, possible UDP truncation"); - } - - m_decoder.push_packet(buf); + try { + auto udp_data = m_udp_rx.get_packet_buffer(); + m_decoder.push_packet(udp_data); return true; } - else { - if (inetErrNo == EINTR) { - return false; - } - else { - stringstream ss; - ss << "EDI UDP Socket error: " << inetErrMsg; - throw std::runtime_error(ss.str()); - } + catch (std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); + return false; } } diff --git a/src/EtiReader.h b/src/EtiReader.h index cd04a16..1b75025 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -38,29 +38,40 @@ #include "TimestampDecoder.h" #include "lib/edi/ETIDecoder.hpp" #include "lib/UdpSocket.h" +#include "ThreadsafeQueue.h" #include #include #include #include - +/* The modulator uses this interface to get the necessary multiplex data, + * either from an ETI or an EDI source. + */ class EtiSource { public: + /* Get the DAB Transmission Mode. Valid values: 1, 2, 3 or 4 */ virtual unsigned getMode() = 0; + + /* Get the current Frame Phase */ virtual unsigned getFp() = 0; + /* Returns true if we have valid time stamps in the ETI*/ virtual bool sourceContainsTimestamp() = 0; virtual void calculateTimestamp(struct frame_timestamp& ts) = 0; + /* Return the FIC source to be used for modulation */ virtual std::shared_ptr& getFic(void); + + /* Return all subchannel sources containing MST data */ virtual const std::vector > getSubchannels() const = 0; protected: std::shared_ptr myFicSource; }; +/* The EtiReader extracts the necessary data for modulation from an ETI(NI) byte stream. */ class EtiReader : public EtiSource { public: @@ -72,25 +83,22 @@ public: virtual unsigned getFp(); /* Read ETI data from dataIn. Returns the number of bytes - * read from the buffer + * read from the buffer. */ int loadEtiData(const Buffer& dataIn); + virtual bool sourceContainsTimestamp(); virtual void calculateTimestamp(struct frame_timestamp& ts) { myTimestampDecoder.calculateTimestamp(ts); } - /* Returns true if we have valid time stamps in the ETI*/ - virtual bool sourceContainsTimestamp(); - virtual const std::vector > getSubchannels() const; private: /* Transform the ETI TIST to a PPS offset in units of 1/16384000 s */ uint32_t getPPSOffset(); - void sync(); int state; uint32_t nb_frames; uint16_t framesize; @@ -108,6 +116,9 @@ private: std::vector > mySources; }; +/* The EdiReader extracts the necessary data using the EDI input library in + * lib/edi + */ class EdiReader : public EtiSource, public EdiDecoder::DataCollector { public: @@ -147,7 +158,7 @@ public: virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc); - // Tell the ETIWriter that the AFPacket is complete + // Gets called by the EDI library to tell us that all data for a frame was given to us virtual void assemble(void); private: bool m_proto_valid = false; @@ -172,11 +183,15 @@ private: std::map > m_sources; }; +/* The EDI input does not use the inputs defined in InputReader.h, as they were designed + * for ETI. It uses the EdiUdpInput which in turn uses a threaded receiver. + */ + class EdiUdpInput { public: EdiUdpInput(EdiDecoder::ETIDecoder& decoder); - int Open(const std::string& uri); + void Open(const std::string& uri); bool isEnabled(void) const { return m_enabled; } @@ -190,7 +205,7 @@ class EdiUdpInput { bool m_enabled; int m_port; - UdpSocket m_sock; + UdpReceiver m_udp_rx; EdiDecoder::ETIDecoder& m_decoder; }; diff --git a/src/InputReader.h b/src/InputReader.h index f0e7197..7d6b373 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -40,7 +40,6 @@ #endif #include "porting.h" #include "Log.h" -#include "lib/edi/ETIDecoder.hpp" #include "lib/UdpSocket.h" #include #include diff --git a/src/Utils.cpp b/src/Utils.cpp index a91077c..b93f2c1 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -64,6 +64,7 @@ void printUsage(char* progName) fprintf(out, "input: ETI input filename (default: stdin), or\n"); fprintf(out, " tcp://source:port for ETI-over-TCP input, or\n"); fprintf(out, " zmq+tcp://source:port for ZMQ input.\n"); + fprintf(out, " udp://:port for EDI input.\n"); fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n"); fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n"); fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n"); -- cgit v1.2.3