aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-11-04 14:27:50 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-11-04 14:27:50 +0100
commit51491533a312884862849082b3507e49c1829d22 (patch)
tree09afd1164350517eeacb53254b33db4249b1e41b
parent7068a697b235f1ae05bc1a5cf93e7eeefbe7a1df (diff)
downloaddabmux-51491533a312884862849082b3507e49c1829d22.tar.gz
dabmux-51491533a312884862849082b3507e49c1829d22.tar.bz2
dabmux-51491533a312884862849082b3507e49c1829d22.zip
Add new UDP input
-rw-r--r--src/ConfigParser.cpp30
-rw-r--r--src/Makefile.am1
-rw-r--r--src/UdpSocket.cpp18
-rw-r--r--src/UdpSocket.h10
-rw-r--r--src/input/Udp.cpp134
-rw-r--r--src/input/Udp.h52
6 files changed, 232 insertions, 13 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 2a8d3da..e48200a 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -56,6 +56,7 @@
#include "input/Prbs.h"
#include "input/Zmq.h"
#include "input/File.h"
+#include "input/Udp.h"
#ifdef _WIN32
@@ -922,16 +923,15 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
dabProtection* protection = &subchan->protection;
const bool nonblock = pt.get("nonblock", false);
+ if (nonblock) {
+ etiLog.level(warn) << "The nonblock option is not supported";
+ }
if (type == "dabplus" or type == "audio") {
subchan->type = subchannel_type_t::Audio;
subchan->bitrate = 0;
if (proto == "file") {
- if (nonblock) {
- // TODO
- }
-
if (type == "audio") {
subchan->input = make_shared<Inputs::MPEGFile>();
}
@@ -946,10 +946,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
proto == "epgm" ||
proto == "ipc") {
- if (nonblock) {
- etiLog.level(warn) << "The nonblock option is meaningless for the zmq input";
- }
-
auto zmqconfig = setup_zmq_input(pt, subchanuid);
if (type == "audio") {
@@ -983,6 +979,24 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
subchan->type = subchannel_type_t::DataDmb;
subchan->bitrate = DEFAULT_DATA_BITRATE;
}
+ else if (type == "data") {
+ if (proto == "udp") {
+ subchan->input = make_shared<Inputs::Udp>();
+ } else if (proto == "file") {
+ // TODO
+ } else if (proto == "fifo") {
+ // TODO
+ } else {
+ stringstream ss;
+ ss << "Subchannel with uid " << subchanuid <<
+ ": Invalid protocol for data input (" <<
+ proto << ")" << endl;
+ throw runtime_error(ss.str());
+ }
+
+ subchan->type = subchannel_type_t::DataDmb;
+ subchan->bitrate = DEFAULT_DATA_BITRATE;
+ }
else {
stringstream ss;
ss << "Subchannel with uid " << subchanuid << " has unknown type!";
diff --git a/src/Makefile.am b/src/Makefile.am
index b356566..084cf7b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -51,6 +51,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \
input/Prbs.cpp input/Prbs.h \
input/Zmq.cpp input/Zmq.h \
input/File.cpp input/File.h \
+ input/Udp.cpp input/Udp.h \
dabOutput/dabOutput.h \
dabOutput/dabOutputFile.cpp \
dabOutput/dabOutputFifo.cpp \
diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp
index 020e3f5..ccdd7ed 100644
--- a/src/UdpSocket.cpp
+++ b/src/UdpSocket.cpp
@@ -37,19 +37,19 @@ using namespace std;
UdpSocket::UdpSocket() :
listenSocket(INVALID_SOCKET)
{
- init_sock(0, "");
+ reinit(0, "");
}
UdpSocket::UdpSocket(int port) :
listenSocket(INVALID_SOCKET)
{
- init_sock(port, "");
+ reinit(port, "");
}
UdpSocket::UdpSocket(int port, const std::string& name) :
listenSocket(INVALID_SOCKET)
{
- init_sock(port, name);
+ reinit(port, name);
}
@@ -67,7 +67,7 @@ int UdpSocket::setBlocking(bool block)
return 0;
}
-int UdpSocket::init_sock(int port, const std::string& name)
+int UdpSocket::reinit(int port, const std::string& name)
{
if (listenSocket != INVALID_SOCKET) {
::close(listenSocket);
@@ -98,6 +98,16 @@ int UdpSocket::init_sock(int port, const std::string& name)
return 0;
}
+int UdpSocket::close()
+{
+ if (listenSocket != INVALID_SOCKET) {
+ ::close(listenSocket);
+ }
+
+ listenSocket = INVALID_SOCKET;
+
+ return 0;
+}
UdpSocket::~UdpSocket()
{
diff --git a/src/UdpSocket.h b/src/UdpSocket.h
index 535499e..dfeaac1 100644
--- a/src/UdpSocket.h
+++ b/src/UdpSocket.h
@@ -80,6 +80,15 @@ class UdpSocket
UdpSocket(const UdpSocket& other) = delete;
const UdpSocket& operator=(const UdpSocket& other) = delete;
+ /** reinitialise socket. Close the already open socket, and
+ * create a new one
+ */
+ int reinit(int port, const std::string& name);
+
+ /** Close the socket
+ */
+ int close(void);
+
/** Send an UDP packet.
* @param packet The UDP packet to be sent. It includes the data and the
* destination address
@@ -111,7 +120,6 @@ class UdpSocket
int setBlocking(bool block);
protected:
- int init_sock(int port, const std::string& name);
/// The address on which the socket is bound.
InetAddress address;
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
new file mode 100644
index 0000000..e534a06
--- /dev/null
+++ b/src/input/Udp.cpp
@@ -0,0 +1,134 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Udp.h"
+
+#include <stdexcept>
+#include <sstream>
+#include <string.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <errno.h>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+int Udp::open(const std::string& name)
+{
+ // Skip the udp:// part if it is present
+ const string endpoint = (name.substr(0, 6) == "udp://") ?
+ name.substr(6) : name;
+
+ // The endpoint should be address:port
+ const auto colon_pos = endpoint.find_first_of(":");
+ if (colon_pos == string::npos) {
+ stringstream ss;
+ ss << "'" << name <<
+ " is an invalid format for udp address: "
+ "expected [udp://]address:port";
+ throw invalid_argument(ss.str());
+ }
+
+ const auto address = endpoint.substr(0, colon_pos);
+ const auto port_str = endpoint.substr(colon_pos + 1);
+
+ const long port = strtol(port_str.c_str(), nullptr, 10);
+
+ if ((port == LONG_MIN or port == LONG_MAX) and errno == ERANGE) {
+ throw out_of_range("udp input: port out of range");
+ }
+ else if (port == 0 and errno != 0) {
+ stringstream ss;
+ ss << "udp input port parse error: " << strerror(errno);
+ throw invalid_argument(ss.str());
+ }
+
+ if (port == 0) {
+ throw out_of_range("can't use port number 0 in udp address");
+ }
+
+ if (m_sock.reinit(port, address) == -1) {
+ stringstream ss;
+ ss << "Could not init UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ if (m_sock.setBlocking(false) == -1) {
+ stringstream ss;
+ ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ return 0;
+}
+
+int Udp::readFrame(void* buffer, int size)
+{
+ uint8_t* data = reinterpret_cast<uint8_t*>(buffer);
+
+ // Regardless of buffer contents, try receiving data.
+ UdpPacket packet;
+ int ret = m_sock.receive(packet);
+
+ if (ret == -1) {
+ stringstream ss;
+ ss << "Could not read from UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ std::copy(packet.getData(), packet.getData() + packet.getSize(),
+ back_inserter(m_buffer));
+
+ // Take data from the buffer if it contains enough data,
+ // in any case write the buffer
+ if (m_buffer.size() >= (size_t)size) {
+ std::copy(m_buffer.begin(), m_buffer.begin() + size, data);
+ }
+ else {
+ memset(data, 0x0, size);
+ }
+
+ return size;
+}
+
+int Udp::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
+ return -1;
+ }
+
+ return bitrate;
+}
+
+int Udp::close()
+{
+ return m_sock.close();
+}
+
+};
diff --git a/src/input/Udp.h b/src/input/Udp.h
new file mode 100644
index 0000000..b6705e9
--- /dev/null
+++ b/src/input/Udp.h
@@ -0,0 +1,52 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include "input/inputs.h"
+#include "UdpSocket.h"
+
+namespace Inputs {
+
+class Udp : public InputBase {
+ public:
+ virtual int open(const std::string& name);
+ virtual int readFrame(void* buffer, int size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ private:
+ UdpSocket m_sock;
+
+ // The content of the UDP packets gets written into the
+ // buffer, and the UDP packet boundaries disappear there.
+ std::vector<uint8_t> m_buffer;
+};
+
+};
+