diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-01-13 11:53:15 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-01-13 11:53:15 +0100 |
commit | ea5594186bafa5489d6086a26d71b8f3d1ccf9cd (patch) | |
tree | a307b0882a867b415c68cd7d644241abe0c971e1 | |
parent | f908d28e72887b68391a246ceb328cb52dcb2aaa (diff) | |
download | dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.gz dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.bz2 dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.zip |
Add threaded UDP input for EDI
-rw-r--r-- | lib/UdpSocket.cpp | 62 | ||||
-rw-r--r-- | lib/UdpSocket.h | 29 | ||||
-rw-r--r-- | src/DabMod.cpp | 9 | ||||
-rw-r--r-- | src/EtiReader.cpp | 38 | ||||
-rw-r--r-- | src/EtiReader.h | 35 | ||||
-rw-r--r-- | src/InputReader.h | 1 | ||||
-rw-r--r-- | src/Utils.cpp | 3 |
7 files changed, 134 insertions, 43 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp index 981d713..b88c731 100644 --- a/lib/UdpSocket.cpp +++ b/lib/UdpSocket.cpp @@ -167,8 +167,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) /** * Must be called to receive data on a multicast address. - * @param groupname The multica -st address to join. + * @param groupname The multicast address to join. * @return 0 if ok, -1 if error */ int UdpSocket::joinGroup(char* groupname) @@ -254,3 +253,62 @@ InetAddress UdpPacket::getAddress() return address; } +UdpReceiver::~UdpReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UdpReceiver::start(int port) { + m_port = port; + m_thread = std::thread(&UdpReceiver::m_run, this); +} + +std::vector<uint8_t> UdpReceiver::get_packet_buffer() +{ + if (m_stop) { + throw runtime_error("UDP Receiver not running"); + } + + UdpPacket p; + m_packets.wait_and_pop(p); + + return p.getBuffer(); +} + +void UdpReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic<bool>& m_stop; + } autoSetStop(m_stop); + + m_sock.reinit(m_port, "0.0.0.0"); + + const size_t packsize = 8192; + UdpPacket packet(packsize); + + while (not m_stop) { + int ret = m_sock.receive(packet); + if (ret == 0) { + if (packet.getSize() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + m_packets.push(packet); + } + else + { + if (inetErrNo != EINTR) { + // TODO replace fprintf + fprintf(stderr, "Socket error: %s\n", inetErrMsg); + } + m_stop = true; + } + } +} + diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h index f51e87c..81a7d2b 100644 --- a/lib/UdpSocket.h +++ b/lib/UdpSocket.h @@ -31,6 +31,7 @@ #endif #include "InetAddress.h" +#include "ThreadsafeQueue.h" #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> @@ -45,6 +46,8 @@ #include <stdlib.h> #include <iostream> #include <vector> +#include <thread> +#include <atomic> class UdpPacket; @@ -172,3 +175,29 @@ class UdpPacket InetAddress address; }; +/* Threaded UDP receiver */ +class UdpReceiver { + public: + UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UdpReceiver(); + UdpReceiver(const UdpReceiver&) = delete; + UdpReceiver operator=(const UdpReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector<uint8_t> get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::thread m_thread; + std::atomic<bool> m_stop; + ThreadsafeQueue<UdpPacket> m_packets; + UdpSocket m_sock; +}; + + diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 8065a5a..ac7842f 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -627,6 +627,9 @@ int launch_modulator(int argc, char* argv[]) else if (inputName.substr(0, 6) == "tcp://") { inputTransport = "tcp"; } + else if (inputName.substr(0, 6) == "udp://") { + inputTransport = "edi"; + } } else { inputName = "/dev/stdin"; @@ -784,7 +787,11 @@ int launch_modulator(int argc, char* argv[]) } set_thread_name("modulator"); - if (ediUdpInput.isEnabled()) { + if (inputTransport == "edi") { + if (not ediUdpInput.isEnabled()) { + etiLog.level(error) << "inputTransport is edi, but ediUdpInput is not enabled"; + return -1; + } Flowgraph flowgraph; auto modulator = make_shared<DabModulator>( diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index cc7b004..a1053c6 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -508,49 +508,31 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) : m_port(0), m_decoder(decoder) { } -int EdiUdpInput::Open(const std::string& uri) +void EdiUdpInput::Open(const std::string& uri) { etiLog.level(info) << "Opening EDI :" << uri; - int ret = 1; - const std::regex re_udp("udp://:([0-9]+)"); std::smatch m; if (std::regex_match(uri, m, re_udp)) { m_port = std::stoi(m[1].str()); etiLog.level(info) << "EDI port :" << m_port; - ret = m_sock.reinit(m_port, "0.0.0.0"); - m_enabled = (ret == 0); + m_udp_rx.start(m_port); + m_enabled = true; } - - return ret; } bool EdiUdpInput::rxPacket() { - const size_t packsize = 8192; - UdpPacket packet(packsize); - - int ret = m_sock.receive(packet); - if (ret == 0) { - const auto &buf = packet.getBuffer(); - if (packet.getSize() == packsize) { - etiLog.log(warn, "Warning, possible UDP truncation"); - } - - m_decoder.push_packet(buf); + try { + auto udp_data = m_udp_rx.get_packet_buffer(); + m_decoder.push_packet(udp_data); return true; } - else { - if (inetErrNo == EINTR) { - return false; - } - else { - stringstream ss; - ss << "EDI UDP Socket error: " << inetErrMsg; - throw std::runtime_error(ss.str()); - } + catch (std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); + return false; } } diff --git a/src/EtiReader.h b/src/EtiReader.h index cd04a16..1b75025 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -38,29 +38,40 @@ #include "TimestampDecoder.h" #include "lib/edi/ETIDecoder.hpp" #include "lib/UdpSocket.h" +#include "ThreadsafeQueue.h" #include <vector> #include <memory> #include <stdint.h> #include <sys/types.h> - +/* The modulator uses this interface to get the necessary multiplex data, + * either from an ETI or an EDI source. + */ class EtiSource { public: + /* Get the DAB Transmission Mode. Valid values: 1, 2, 3 or 4 */ virtual unsigned getMode() = 0; + + /* Get the current Frame Phase */ virtual unsigned getFp() = 0; + /* Returns true if we have valid time stamps in the ETI*/ virtual bool sourceContainsTimestamp() = 0; virtual void calculateTimestamp(struct frame_timestamp& ts) = 0; + /* Return the FIC source to be used for modulation */ virtual std::shared_ptr<FicSource>& getFic(void); + + /* Return all subchannel sources containing MST data */ virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const = 0; protected: std::shared_ptr<FicSource> myFicSource; }; +/* The EtiReader extracts the necessary data for modulation from an ETI(NI) byte stream. */ class EtiReader : public EtiSource { public: @@ -72,25 +83,22 @@ public: virtual unsigned getFp(); /* Read ETI data from dataIn. Returns the number of bytes - * read from the buffer + * read from the buffer. */ int loadEtiData(const Buffer& dataIn); + virtual bool sourceContainsTimestamp(); virtual void calculateTimestamp(struct frame_timestamp& ts) { myTimestampDecoder.calculateTimestamp(ts); } - /* Returns true if we have valid time stamps in the ETI*/ - virtual bool sourceContainsTimestamp(); - virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const; private: /* Transform the ETI TIST to a PPS offset in units of 1/16384000 s */ uint32_t getPPSOffset(); - void sync(); int state; uint32_t nb_frames; uint16_t framesize; @@ -108,6 +116,9 @@ private: std::vector<std::shared_ptr<SubchannelSource> > mySources; }; +/* The EdiReader extracts the necessary data using the EDI input library in + * lib/edi + */ class EdiReader : public EtiSource, public EdiDecoder::DataCollector { public: @@ -147,7 +158,7 @@ public: virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc); - // Tell the ETIWriter that the AFPacket is complete + // Gets called by the EDI library to tell us that all data for a frame was given to us virtual void assemble(void); private: bool m_proto_valid = false; @@ -172,11 +183,15 @@ private: std::map<uint8_t, std::shared_ptr<SubchannelSource> > m_sources; }; +/* The EDI input does not use the inputs defined in InputReader.h, as they were designed + * for ETI. It uses the EdiUdpInput which in turn uses a threaded receiver. + */ + class EdiUdpInput { public: EdiUdpInput(EdiDecoder::ETIDecoder& decoder); - int Open(const std::string& uri); + void Open(const std::string& uri); bool isEnabled(void) const { return m_enabled; } @@ -190,7 +205,7 @@ class EdiUdpInput { bool m_enabled; int m_port; - UdpSocket m_sock; + UdpReceiver m_udp_rx; EdiDecoder::ETIDecoder& m_decoder; }; diff --git a/src/InputReader.h b/src/InputReader.h index f0e7197..7d6b373 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -40,7 +40,6 @@ #endif #include "porting.h" #include "Log.h" -#include "lib/edi/ETIDecoder.hpp" #include "lib/UdpSocket.h" #include <sys/socket.h> #include <netinet/in.h> diff --git a/src/Utils.cpp b/src/Utils.cpp index a91077c..b93f2c1 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -64,6 +64,7 @@ void printUsage(char* progName) fprintf(out, "input: ETI input filename (default: stdin), or\n"); fprintf(out, " tcp://source:port for ETI-over-TCP input, or\n"); fprintf(out, " zmq+tcp://source:port for ZMQ input.\n"); + fprintf(out, " udp://:port for EDI input.\n"); fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n"); fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n"); fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n"); |