From 51491533a312884862849082b3507e49c1829d22 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:27:50 +0100 Subject: Add new UDP input --- src/ConfigParser.cpp | 30 +++++++++--- src/Makefile.am | 1 + src/UdpSocket.cpp | 18 +++++-- src/UdpSocket.h | 10 +++- src/input/Udp.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/input/Udp.h | 52 ++++++++++++++++++++ 6 files changed, 232 insertions(+), 13 deletions(-) create mode 100644 src/input/Udp.cpp create mode 100644 src/input/Udp.h diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2a8d3da..e48200a 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -56,6 +56,7 @@ #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" +#include "input/Udp.h" #ifdef _WIN32 @@ -922,16 +923,15 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, dabProtection* protection = &subchan->protection; const bool nonblock = pt.get("nonblock", false); + if (nonblock) { + etiLog.level(warn) << "The nonblock option is not supported"; + } if (type == "dabplus" or type == "audio") { subchan->type = subchannel_type_t::Audio; subchan->bitrate = 0; if (proto == "file") { - if (nonblock) { - // TODO - } - if (type == "audio") { subchan->input = make_shared(); } @@ -946,10 +946,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, proto == "epgm" || proto == "ipc") { - if (nonblock) { - etiLog.level(warn) << "The nonblock option is meaningless for the zmq input"; - } - auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -983,6 +979,24 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; } + else if (type == "data") { + if (proto == "udp") { + subchan->input = make_shared(); + } else if (proto == "file") { + // TODO + } else if (proto == "fifo") { + // TODO + } else { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": Invalid protocol for data input (" << + proto << ")" << endl; + throw runtime_error(ss.str()); + } + + subchan->type = subchannel_type_t::DataDmb; + subchan->bitrate = DEFAULT_DATA_BITRATE; + } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << " has unknown type!"; diff --git a/src/Makefile.am b/src/Makefile.am index b356566..084cf7b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -51,6 +51,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ input/Prbs.cpp input/Prbs.h \ input/Zmq.cpp input/Zmq.h \ input/File.cpp input/File.h \ + input/Udp.cpp input/Udp.h \ dabOutput/dabOutput.h \ dabOutput/dabOutputFile.cpp \ dabOutput/dabOutputFifo.cpp \ diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp index 020e3f5..ccdd7ed 100644 --- a/src/UdpSocket.cpp +++ b/src/UdpSocket.cpp @@ -37,19 +37,19 @@ using namespace std; UdpSocket::UdpSocket() : listenSocket(INVALID_SOCKET) { - init_sock(0, ""); + reinit(0, ""); } UdpSocket::UdpSocket(int port) : listenSocket(INVALID_SOCKET) { - init_sock(port, ""); + reinit(port, ""); } UdpSocket::UdpSocket(int port, const std::string& name) : listenSocket(INVALID_SOCKET) { - init_sock(port, name); + reinit(port, name); } @@ -67,7 +67,7 @@ int UdpSocket::setBlocking(bool block) return 0; } -int UdpSocket::init_sock(int port, const std::string& name) +int UdpSocket::reinit(int port, const std::string& name) { if (listenSocket != INVALID_SOCKET) { ::close(listenSocket); @@ -98,6 +98,16 @@ int UdpSocket::init_sock(int port, const std::string& name) return 0; } +int UdpSocket::close() +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } + + listenSocket = INVALID_SOCKET; + + return 0; +} UdpSocket::~UdpSocket() { diff --git a/src/UdpSocket.h b/src/UdpSocket.h index 535499e..dfeaac1 100644 --- a/src/UdpSocket.h +++ b/src/UdpSocket.h @@ -80,6 +80,15 @@ class UdpSocket UdpSocket(const UdpSocket& other) = delete; const UdpSocket& operator=(const UdpSocket& other) = delete; + /** reinitialise socket. Close the already open socket, and + * create a new one + */ + int reinit(int port, const std::string& name); + + /** Close the socket + */ + int close(void); + /** Send an UDP packet. * @param packet The UDP packet to be sent. It includes the data and the * destination address @@ -111,7 +120,6 @@ class UdpSocket int setBlocking(bool block); protected: - int init_sock(int port, const std::string& name); /// The address on which the socket is bound. InetAddress address; diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp new file mode 100644 index 0000000..e534a06 --- /dev/null +++ b/src/input/Udp.cpp @@ -0,0 +1,134 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Udp.h" + +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +int Udp::open(const std::string& name) +{ + // Skip the udp:// part if it is present + const string endpoint = (name.substr(0, 6) == "udp://") ? + name.substr(6) : name; + + // The endpoint should be address:port + const auto colon_pos = endpoint.find_first_of(":"); + if (colon_pos == string::npos) { + stringstream ss; + ss << "'" << name << + " is an invalid format for udp address: " + "expected [udp://]address:port"; + throw invalid_argument(ss.str()); + } + + const auto address = endpoint.substr(0, colon_pos); + const auto port_str = endpoint.substr(colon_pos + 1); + + const long port = strtol(port_str.c_str(), nullptr, 10); + + if ((port == LONG_MIN or port == LONG_MAX) and errno == ERANGE) { + throw out_of_range("udp input: port out of range"); + } + else if (port == 0 and errno != 0) { + stringstream ss; + ss << "udp input port parse error: " << strerror(errno); + throw invalid_argument(ss.str()); + } + + if (port == 0) { + throw out_of_range("can't use port number 0 in udp address"); + } + + if (m_sock.reinit(port, address) == -1) { + stringstream ss; + ss << "Could not init UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + if (m_sock.setBlocking(false) == -1) { + stringstream ss; + ss << "Could not set non-blocking UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + return 0; +} + +int Udp::readFrame(void* buffer, int size) +{ + uint8_t* data = reinterpret_cast(buffer); + + // Regardless of buffer contents, try receiving data. + UdpPacket packet; + int ret = m_sock.receive(packet); + + if (ret == -1) { + stringstream ss; + ss << "Could not read from UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + std::copy(packet.getData(), packet.getData() + packet.getSize(), + back_inserter(m_buffer)); + + // Take data from the buffer if it contains enough data, + // in any case write the buffer + if (m_buffer.size() >= (size_t)size) { + std::copy(m_buffer.begin(), m_buffer.begin() + size, data); + } + else { + memset(data, 0x0, size); + } + + return size; +} + +int Udp::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + +int Udp::close() +{ + return m_sock.close(); +} + +}; diff --git a/src/input/Udp.h b/src/input/Udp.h new file mode 100644 index 0000000..b6705e9 --- /dev/null +++ b/src/input/Udp.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include "input/inputs.h" +#include "UdpSocket.h" + +namespace Inputs { + +class Udp : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + private: + UdpSocket m_sock; + + // The content of the UDP packets gets written into the + // buffer, and the UDP packet boundaries disappear there. + std::vector m_buffer; +}; + +}; + -- cgit v1.2.3