aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
commit8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 (patch)
treec0296c305409bb7fc373625aea05c2f57054eb5c
parent43f4a3a2a695c303bd4fdfbd7fec6def29284f2e (diff)
downloaddabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.gz
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.bz2
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.zip
Work on STI-D/EDI input
-rw-r--r--Makefile.am21
-rw-r--r--doc/advanced.mux17
-rw-r--r--doc/example.mux3
-rw-r--r--lib/ReedSolomon.cpp (renamed from src/ReedSolomon.cpp)0
-rw-r--r--lib/ReedSolomon.h (renamed from src/ReedSolomon.h)0
-rw-r--r--lib/Socket.cpp165
-rw-r--r--lib/Socket.h32
-rw-r--r--lib/ThreadsafeQueue.h (renamed from src/ThreadsafeQueue.h)14
-rw-r--r--src/ConfigParser.cpp61
-rw-r--r--src/DabMux.cpp2
-rw-r--r--src/dabOutput/edi/Config.h9
-rw-r--r--src/dabOutput/edi/PFT.cpp2
-rw-r--r--src/dabOutput/edi/Transport.cpp43
-rw-r--r--src/dabOutput/edi/Transport.h4
-rw-r--r--src/input/Udp.cpp8
15 files changed, 272 insertions, 109 deletions
diff --git a/Makefile.am b/Makefile.am
index 2773bbe..80d24e0 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -69,6 +69,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/input/File.h \
src/input/Udp.cpp \
src/input/Udp.h \
+ src/input/Edi.cpp \
+ src/input/Edi.h \
src/dabOutput/dabOutput.h \
src/dabOutput/dabOutputFile.cpp \
src/dabOutput/dabOutputFifo.cpp \
@@ -107,11 +109,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/MuxElements.cpp \
src/MuxElements.h \
src/PcDebug.h \
- src/ReedSolomon.h \
- src/ReedSolomon.cpp \
src/RemoteControl.cpp \
src/RemoteControl.h \
- src/ThreadsafeQueue.h \
src/crc.h \
src/crc.c \
src/fig/FIG.h \
@@ -161,8 +160,19 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/PrbsGenerator.h \
src/utils.cpp \
src/utils.h \
+ lib/edi/STIDecoder.cpp \
+ lib/edi/STIDecoder.h \
+ lib/edi/STIWriter.cpp \
+ lib/edi/STIWriter.h \
+ lib/edi/PFT.cpp \
+ lib/edi/PFT.h \
+ lib/edi/common.cpp \
+ lib/edi/common.h \
+ lib/ReedSolomon.h \
+ lib/ReedSolomon.cpp \
lib/Socket.h \
lib/Socket.cpp \
+ lib/ThreadsafeQueue.h \
lib/zmq.hpp \
$(lib_fec_sources) \
$(lib_charset_sources)
@@ -201,14 +211,15 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \
src/dabOutput/edi/TagPacket.h \
src/dabOutput/edi/Transport.cpp \
src/dabOutput/edi/Transport.h \
- src/ReedSolomon.h \
- src/ReedSolomon.cpp \
src/Log.h \
src/Log.cpp \
src/crc.h \
src/crc.c \
+ lib/ReedSolomon.h \
+ lib/ReedSolomon.cpp \
lib/Socket.h \
lib/Socket.cpp \
+ lib/ThreadsafeQueue.h \
lib/zmq.hpp \
$(lib_fec_sources)
diff --git a/doc/advanced.mux b/doc/advanced.mux
index fb67b82..b9cec05 100644
--- a/doc/advanced.mux
+++ b/doc/advanced.mux
@@ -163,7 +163,8 @@ subchannels {
sub-fu {
type audio
; example file input
- inputfile "funk.mp2"
+ inputproto zmq
+ inputuri "funk.mp2"
nonblock false
bitrate 128
id 10
@@ -188,7 +189,8 @@ subchannels {
; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP.
; This is intended to be compatible with AVT audio encoders.
; EXPERIMENTAL!
- inputfile "sti-rtp://127.0.0.1:32010"
+ inputproto sti
+ inputuri "rtp://127.0.0.1:32010"
bitrate 96
id 3
protection 3
@@ -196,11 +198,12 @@ subchannels {
sub-ri {
type dabplus
; example file input
- ;inputfile "rick.dabp"
+ ;inputuri "rick.dabp"
; example zmq input:
; Accepts connections to port 9000 from any interface.
; Use ODR-AudioEnc as encoder
- inputfile "tcp://*:9000"
+ inputproto zmq
+ inputuri "tcp://*:9000"
bitrate 96
id 1
protection 1
@@ -256,7 +259,8 @@ subchannels {
; for audio types, you can use the ZeroMQ input (if compiled in)
; with the following configuration in combination with
; Toolame-DAB
- inputfile "tcp://*:9001"
+ inputproto zmq
+ inputuri "tcp://*:9001"
bitrate 96
id 1
protection 1
@@ -273,7 +277,8 @@ subchannels {
type data
; Use the default PRBS polynomial.
- inputfile "prbs://"
+ inputproto prbs
+ inputuri "prbs://"
; To use another polynomial, set it in the url as hexadecimal
; The default polynomial is G(x) = x^20 + x^17 + 1, represented as
diff --git a/doc/example.mux b/doc/example.mux
index 6c2bc18..31e072d 100644
--- a/doc/example.mux
+++ b/doc/example.mux
@@ -171,7 +171,8 @@ subchannels {
type dabplus
; Accepts connections to port 9000 from any interface.
; Use ODR-AudioEnc as encoder
- inputfile "tcp://*:9000"
+ inputproto zmq
+ inputuri "tcp://*:9000"
bitrate 96
id 1
protection 3
diff --git a/src/ReedSolomon.cpp b/lib/ReedSolomon.cpp
index 38d8ea8..38d8ea8 100644
--- a/src/ReedSolomon.cpp
+++ b/lib/ReedSolomon.cpp
diff --git a/src/ReedSolomon.h b/lib/ReedSolomon.h
index abcef62..abcef62 100644
--- a/src/ReedSolomon.h
+++ b/lib/ReedSolomon.h
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index fe3df44..9b404eb 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = 0;
hints.ai_protocol = 0;
- hints.ai_canonname = nullptr;
- hints.ai_addr = nullptr;
- hints.ai_next = nullptr;
struct addrinfo *result, *rp;
int s = getaddrinfo(destination.c_str(), service, &hints, &result);
@@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) :
UDPSocket::UDPSocket() :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(0, "");
}
UDPSocket::UDPSocket(int port) :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(port, "");
}
UDPSocket::UDPSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(port, name);
}
@@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) :
void UDPSocket::setBlocking(bool block)
{
- int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK);
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
if (res == -1) {
throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
}
}
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
void UDPSocket::reinit(int port, const std::string& name)
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
}
char service[NI_MAXSERV];
@@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name)
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
@@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name)
}
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
- listenSocket = sfd;
+ m_sock = sfd;
break;
}
@@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name)
void UDPSocket::close()
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
- listenSocket = INVALID_SOCKET;
+ m_sock = INVALID_SOCKET;
}
UDPSocket::~UDPSocket()
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
}
@@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size)
UDPPacket packet(max_size);
socklen_t addrSize;
addrSize = sizeof(*packet.address.as_sockaddr());
- ssize_t ret = recvfrom(listenSocket,
+ ssize_t ret = recvfrom(m_sock,
packet.buffer.data(),
packet.buffer.size(),
0,
@@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size)
if (ret == SOCKET_ERROR) {
packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size)
void UDPSocket::send(UDPPacket& packet)
{
- int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0,
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- throw runtime_error(string("Can't send UDP packet") + strerror(errno));
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
}
}
void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- throw runtime_error(string("Can't send UDP packet") + strerror(errno));
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
}
}
-void UDPSocket::joinGroup(char* groupname)
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
@@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname)
if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
throw runtime_error("Group name is not a multicast address");
}
- group.imr_address.s_addr = htons(INADDR_ANY);;
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
throw runtime_error(string("Can't join multicast group") + strerror(errno));
}
@@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr)
throw runtime_error(string("Can't parse source address") + strerror(errno));
}
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
== SOCKET_ERROR) {
throw runtime_error(string("Can't set source address") + strerror(errno));
}
@@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr)
void UDPSocket::setMulticastTTL(int ttl)
{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
== SOCKET_ERROR) {
throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
}
}
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
-TCPSocket::TCPSocket()
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
{
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
}
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
- &val, sizeof(val)) < 0) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
}
-#endif
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
}
TCPSocket::~TCPSocket()
@@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port)
/* Obtain address(es) matching host/port */
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
@@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name)
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC;
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0;
@@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name)
freeaddrinfo(result);
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
if (rp == nullptr) {
throw runtime_error("Could not bind");
}
@@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher()
m_running = false;
m_connections.clear();
m_listener_socket.close();
- m_listener_thread.join();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
}
void TCPDataDispatcher::start(int port, const string& address)
diff --git a/lib/Socket.h b/lib/Socket.h
index 82ff5ad..2393584 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -104,13 +104,14 @@ class UDPSocket
const UDPSocket& operator=(const UDPSocket& other) = delete;
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
void reinit(int port, const std::string& name);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector<uint8_t>& data, InetAddress destination);
UDPPacket receive(size_t max_size);
- void joinGroup(char* groupname);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
@@ -120,9 +121,36 @@ class UDPSocket
void setBlocking(bool block);
protected:
- SOCKET listenSocket;
+ SOCKET m_sock;
};
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued;
+ std::thread m_thread;
+ std::atomic<bool> m_stop;
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
class TCPSocket {
public:
diff --git a/src/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index ab287b2..62f4c96 100644
--- a/src/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -12,20 +12,18 @@
element out.
*/
/*
- This file is part of ODR-DabMux.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index fb49efc..3142bb3 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -40,6 +40,7 @@
#include "utils.h"
#include "DabMux.h"
#include "ManagementServer.h"
+#include "input/Edi.h"
#include "input/Prbs.h"
#include "input/Zmq.h"
#include "input/File.h"
@@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
type = pt.get<string>("type");
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no type defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");
}
- /* Both inputfile and inputuri are supported, and are equivalent.
- * inputuri has precedence
+ /* Up to v2.3.1, both inputfile and inputuri are supported, and are
+ * equivalent. inputuri has precedence.
+ *
+ * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.
*/
string inputUri = pt.get<string>("inputuri", "");
+ string proto = pt.get<string>("inputproto", "");
- if (inputUri == "") {
+ if (inputUri.empty() and proto.empty()) {
try {
+ /* Old approach, derives proto from scheme used in the URL.
+ * This makes it impossible to distinguish between ZMQ tcp:// and
+ * EDI tcp://
+ */
inputUri = pt.get<string>("inputfile");
+ size_t protopos = inputUri.find("://");
+
+ if (protopos == string::npos) {
+ proto = "file";
+ }
+ else {
+ proto = inputUri.substr(0, protopos);
+
+ if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ proto = "zmq";
+ }
+ else if (proto == "sti-rtp") {
+ proto = "sti";
+ }
+ }
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");
}
}
-
- string proto;
- size_t protopos = inputUri.find("://");
- if (protopos == string::npos) {
- proto = "file";
- }
- else {
- proto = inputUri.substr(0, protopos);
+ else if (inputUri.empty() or proto.empty()) {
+ throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);
}
subchan->inputUri = inputUri;
@@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
throw logic_error("Incomplete handling of file input");
}
}
- else if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ else if (proto == "zmq") {
auto zmqconfig = setup_zmq_input(pt, subchanuid);
if (type == "audio") {
@@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
rcs.enrol(inzmq.get());
subchan->input = inzmq;
}
-
- if (proto == "epgm") {
- etiLog.level(warn) << "Using untested epgm:// zeromq input";
- }
- else if (proto == "ipc") {
- etiLog.level(warn) << "Using untested ipc:// zeromq input";
- }
}
- else if (proto == "sti-rtp") {
+ else if (proto == "edi") {
+ subchan->input = make_shared<Inputs::Edi>();
+ }
+ else if (proto == "stp") {
subchan->input = make_shared<Inputs::Sti_d_Rtp>();
}
else {
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index d749ed3..e726fd3 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -304,7 +304,7 @@ int main(int argc, char *argv[])
edi_conf.destinations.push_back(dest);
}
else if (proto == "tcp") {
- auto dest = make_shared<edi::tcp_destination_t>();
+ auto dest = make_shared<edi::tcp_server_t>();
dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");
dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);
edi_conf.destinations.push_back(dest);
diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h
index 55d5f0f..0c7dce8 100644
--- a/src/dabOutput/edi/Config.h
+++ b/src/dabOutput/edi/Config.h
@@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {
};
// TCP server that can accept multiple connections
-struct tcp_destination_t : public destination_t {
+struct tcp_server_t : public destination_t {
unsigned int listen_port = 0;
size_t max_frames_queued = 1024;
};
+// TCP client that connects to one endpoint
+struct tcp_client_t : public destination_t {
+ std::string dest_addr;
+ unsigned int dest_port = 0;
+ size_t max_frames_queued = 1024;
+};
+
struct configuration_t {
unsigned chunk_len = 207; // RSk, data length of each chunk
unsigned fec = 0; // number of fragments that can be recovered
diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp
index 5b93016..63dfa34 100644
--- a/src/dabOutput/edi/PFT.cpp
+++ b/src/dabOutput/edi/PFT.cpp
@@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)
#if 0
fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n",
- m_pseq, findex, fcount, plen & ~0x8000);
+ m_pseq, findex, fcount, plen & ~0xC000);
#endif
}
diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp
index 6d3950f..187aabe 100644
--- a/src/dabOutput/edi/Transport.cpp
+++ b/src/dabOutput/edi/Transport.cpp
@@ -45,12 +45,16 @@ void configuration_t::print() const
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
+ etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (interleaver_enabled()) {
@@ -78,13 +82,18 @@ Sender::Sender(const configuration_t& conf) :
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ auto tcp_socket = make_shared<Socket::TCPSocket>();
+ tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port);
+ tcp_senders.emplace(tcp_dest.get(), tcp_socket);
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
@@ -111,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
edi_fragments.size());
}
@@ -128,22 +137,25 @@ void Sender::write(const TagPacket& tagpacket)
udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
}
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
edi_fragments.size());
}
}
@@ -156,17 +168,20 @@ void Sender::write(const TagPacket& tagpacket)
udp_sockets.at(udp_dest.get())->send(af_packet, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
}
}
}
diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h
index 74126d1..9633275 100644
--- a/src/dabOutput/edi/Transport.h
+++ b/src/dabOutput/edi/Transport.h
@@ -36,6 +36,7 @@
#include <vector>
#include <unordered_map>
#include <stdexcept>
+#include <fstream>
#include <cstdint>
namespace edi {
@@ -62,7 +63,8 @@ class Sender {
edi::Interleaver edi_interleaver;
std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
- std::unordered_map<tcp_destination_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;
};
}
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index b4cced0..5d4f964 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -151,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)
int Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -160,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}