From 43f4a3a2a695c303bd4fdfbd7fec6def29284f2e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 28 May 2019 16:56:43 +0200 Subject: Unify Socket abstractions --- src/input/Udp.cpp | 51 ++++++++++++++------------------------------------- 1 file changed, 14 insertions(+), 37 deletions(-) (limited to 'src/input/Udp.cpp') diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..b4cced0 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint) throw out_of_range("can't use port number 0 in udp address"); } - if (m_sock.reinit(port, address) == -1) { - stringstream ss; - ss << "Could not init UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } - - if (m_sock.setBlocking(false) == -1) { - stringstream ss; - ss << "Could not set non-blocking UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + m_sock.reinit(port, address); + m_sock.setBlocking(false); etiLog.level(info) << "Opened UDP port " << address << ":" << port; } @@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint) int Udp::readFrame(uint8_t* buffer, size_t size) { // Regardless of buffer contents, try receiving data. - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - std::copy(packet.getData(), packet.getData() + packet.getSize(), - back_inserter(m_buffer)); + std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer)); // Take data from the buffer if it contains enough data, // in any case write the buffer @@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate) int Udp::close() { - return m_sock.close(); + m_sock.close(); + return 0; } @@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name) void Sti_d_Rtp::receive_packet() { - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - if (packet.getSize() == 0) { + if (packet.buffer.empty()) { // No packet was received return; } const size_t STI_FC_LEN = 8; - if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { + if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { etiLog.level(info) << "Received too small RTP packet for " << m_name; return; } - if (not rtpHeaderValid(packet.getData())) { + if (not rtpHeaderValid(packet.buffer.data())) { etiLog.level(info) << "Received invalid RTP header for " << m_name; return; @@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet() // STI(PI, X) size_t index = RTP_HEADER_LEN; - const uint8_t *buf = packet.getData(); + const uint8_t *buf = packet.buffer.data(); // SYNC index++; // Advance over STAT @@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet() m_name; return; } - if (packet.getSize() < index + DFS) { + if (packet.buffer.size() < index + DFS) { etiLog.level(info) << "Received STI too small for given DFS for " << m_name; return; @@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet() uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits index += 2; - if (packet.getSize() < index + 4*NST) { + if (packet.buffer.size() < index + 4*NST) { etiLog.level(info) << "Received STI too small to contain NST for " << - m_name << " packet: " << packet.getSize() << " need " << + m_name << " packet: " << packet.buffer.size() << " need " << index + 4*NST; return; } -- cgit v1.2.3 From 8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Jun 2019 10:14:51 +0200 Subject: Work on STI-D/EDI input --- Makefile.am | 21 +++-- doc/advanced.mux | 17 ++-- doc/example.mux | 3 +- lib/ReedSolomon.cpp | 116 ++++++++++++++++++++++++++ lib/ReedSolomon.h | 56 +++++++++++++ lib/Socket.cpp | 165 ++++++++++++++++++++++++++++--------- lib/Socket.h | 32 +++++++- lib/ThreadsafeQueue.h | 176 +++++++++++++++++++++++++++++++++++++++ src/ConfigParser.cpp | 61 ++++++++------ src/DabMux.cpp | 2 +- src/ReedSolomon.cpp | 116 -------------------------- src/ReedSolomon.h | 56 ------------- src/ThreadsafeQueue.h | 178 ---------------------------------------- src/dabOutput/edi/Config.h | 9 +- src/dabOutput/edi/PFT.cpp | 2 +- src/dabOutput/edi/Transport.cpp | 43 ++++++---- src/dabOutput/edi/Transport.h | 4 +- src/input/Udp.cpp | 8 +- 18 files changed, 614 insertions(+), 451 deletions(-) create mode 100644 lib/ReedSolomon.cpp create mode 100644 lib/ReedSolomon.h create mode 100644 lib/ThreadsafeQueue.h delete mode 100644 src/ReedSolomon.cpp delete mode 100644 src/ReedSolomon.h delete mode 100644 src/ThreadsafeQueue.h (limited to 'src/input/Udp.cpp') diff --git a/Makefile.am b/Makefile.am index 2773bbe..80d24e0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -69,6 +69,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/input/File.h \ src/input/Udp.cpp \ src/input/Udp.h \ + src/input/Edi.cpp \ + src/input/Edi.h \ src/dabOutput/dabOutput.h \ src/dabOutput/dabOutputFile.cpp \ src/dabOutput/dabOutputFifo.cpp \ @@ -107,11 +109,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/MuxElements.cpp \ src/MuxElements.h \ src/PcDebug.h \ - src/ReedSolomon.h \ - src/ReedSolomon.cpp \ src/RemoteControl.cpp \ src/RemoteControl.h \ - src/ThreadsafeQueue.h \ src/crc.h \ src/crc.c \ src/fig/FIG.h \ @@ -161,8 +160,19 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/PrbsGenerator.h \ src/utils.cpp \ src/utils.h \ + lib/edi/STIDecoder.cpp \ + lib/edi/STIDecoder.h \ + lib/edi/STIWriter.cpp \ + lib/edi/STIWriter.h \ + lib/edi/PFT.cpp \ + lib/edi/PFT.h \ + lib/edi/common.cpp \ + lib/edi/common.h \ + lib/ReedSolomon.h \ + lib/ReedSolomon.cpp \ lib/Socket.h \ lib/Socket.cpp \ + lib/ThreadsafeQueue.h \ lib/zmq.hpp \ $(lib_fec_sources) \ $(lib_charset_sources) @@ -201,14 +211,15 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \ src/dabOutput/edi/TagPacket.h \ src/dabOutput/edi/Transport.cpp \ src/dabOutput/edi/Transport.h \ - src/ReedSolomon.h \ - src/ReedSolomon.cpp \ src/Log.h \ src/Log.cpp \ src/crc.h \ src/crc.c \ + lib/ReedSolomon.h \ + lib/ReedSolomon.cpp \ lib/Socket.h \ lib/Socket.cpp \ + lib/ThreadsafeQueue.h \ lib/zmq.hpp \ $(lib_fec_sources) diff --git a/doc/advanced.mux b/doc/advanced.mux index fb67b82..b9cec05 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -163,7 +163,8 @@ subchannels { sub-fu { type audio ; example file input - inputfile "funk.mp2" + inputproto zmq + inputuri "funk.mp2" nonblock false bitrate 128 id 10 @@ -188,7 +189,8 @@ subchannels { ; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP. ; This is intended to be compatible with AVT audio encoders. ; EXPERIMENTAL! - inputfile "sti-rtp://127.0.0.1:32010" + inputproto sti + inputuri "rtp://127.0.0.1:32010" bitrate 96 id 3 protection 3 @@ -196,11 +198,12 @@ subchannels { sub-ri { type dabplus ; example file input - ;inputfile "rick.dabp" + ;inputuri "rick.dabp" ; example zmq input: ; Accepts connections to port 9000 from any interface. ; Use ODR-AudioEnc as encoder - inputfile "tcp://*:9000" + inputproto zmq + inputuri "tcp://*:9000" bitrate 96 id 1 protection 1 @@ -256,7 +259,8 @@ subchannels { ; for audio types, you can use the ZeroMQ input (if compiled in) ; with the following configuration in combination with ; Toolame-DAB - inputfile "tcp://*:9001" + inputproto zmq + inputuri "tcp://*:9001" bitrate 96 id 1 protection 1 @@ -273,7 +277,8 @@ subchannels { type data ; Use the default PRBS polynomial. - inputfile "prbs://" + inputproto prbs + inputuri "prbs://" ; To use another polynomial, set it in the url as hexadecimal ; The default polynomial is G(x) = x^20 + x^17 + 1, represented as diff --git a/doc/example.mux b/doc/example.mux index 6c2bc18..31e072d 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -171,7 +171,8 @@ subchannels { type dabplus ; Accepts connections to port 9000 from any interface. ; Use ODR-AudioEnc as encoder - inputfile "tcp://*:9000" + inputproto zmq + inputuri "tcp://*:9000" bitrate 96 id 1 protection 3 diff --git a/lib/ReedSolomon.cpp b/lib/ReedSolomon.cpp new file mode 100644 index 0000000..38d8ea8 --- /dev/null +++ b/lib/ReedSolomon.cpp @@ -0,0 +1,116 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right + of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "ReedSolomon.h" +#include +#include +#include +#include +#include // For galois.h ... +#include // For memcpy + +extern "C" { +#include "fec/fec.h" +} +#include + +#define SYMSIZE 8 + + +ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) +{ + setReverse(reverse); + + m_N = N; + m_K = K; + + const int symsize = SYMSIZE; + const int nroots = N - K; // For EDI PFT, this must be 48 + const int pad = ((1 << symsize) - 1) - N; // is 255-N + + rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); + + if (rsData == nullptr) { + std::stringstream ss; + ss << "Invalid Reed-Solomon parameters! " << + "N=" << N << " ; K=" << K << " ; pad=" << pad; + throw std::invalid_argument(ss.str()); + } +} + + +ReedSolomon::~ReedSolomon() +{ + free_rs_char(rsData); +} + + +void ReedSolomon::setReverse(bool state) +{ + reverse = state; +} + + +int ReedSolomon::encode(void* data, void* fec, size_t size) +{ + uint8_t* input = reinterpret_cast(data); + uint8_t* output = reinterpret_cast(fec); + int ret = 0; + + if (reverse) { + std::vector buffer(m_N); + + memcpy(&buffer[0], input, m_K); + memcpy(&buffer[m_K], output, m_N - m_K); + + ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); + if ((ret != 0) && (ret != -1)) { + memcpy(input, &buffer[0], m_K); + memcpy(output, &buffer[m_K], m_N - m_K); + } + } + else { + encode_rs_char(rsData, input, output); + } + + return ret; +} + + +int ReedSolomon::encode(void* data, size_t size) +{ + uint8_t* input = reinterpret_cast(data); + int ret = 0; + + if (reverse) { + ret = decode_rs_char(rsData, input, nullptr, 0); + } + else { + encode_rs_char(rsData, input, &input[m_K]); + } + + return ret; +} diff --git a/lib/ReedSolomon.h b/lib/ReedSolomon.h new file mode 100644 index 0000000..abcef62 --- /dev/null +++ b/lib/ReedSolomon.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right + of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +class ReedSolomon +{ +public: + ReedSolomon(int N, int K, + bool reverse = false, + int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); + ReedSolomon(const ReedSolomon& other) = delete; + ReedSolomon operator=(const ReedSolomon& other) = delete; + ~ReedSolomon(); + + void setReverse(bool state); + int encode(void* data, void* fec, size_t size); + int encode(void* data, size_t size); + +private: + int m_N; + int m_K; + + void* rsData; + bool reverse; +}; + diff --git a/lib/Socket.cpp b/lib/Socket.cpp index fe3df44..9b404eb 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = 0; hints.ai_protocol = 0; - hints.ai_canonname = nullptr; - hints.ai_addr = nullptr; - hints.ai_next = nullptr; struct addrinfo *result, *rp; int s = getaddrinfo(destination.c_str(), service, &hints, &result); @@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) : UDPSocket::UDPSocket() : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(0, ""); } UDPSocket::UDPSocket(int port) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, ""); } UDPSocket::UDPSocket(int port, const std::string& name) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, name); } @@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) : void UDPSocket::setBlocking(bool block) { - int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK); + int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); if (res == -1) { throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); } } +void UDPSocket::reinit(int port) +{ + return reinit(port, ""); +} + void UDPSocket::reinit(int port, const std::string& name) { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + if (port == 0) { + // No need to bind to a given port, creating the + // socket is enough + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + return; } char service[NI_MAXSERV]; @@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ @@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name) } if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { - listenSocket = sfd; + m_sock = sfd; break; } @@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name) void UDPSocket::close() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } - listenSocket = INVALID_SOCKET; + m_sock = INVALID_SOCKET; } UDPSocket::~UDPSocket() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } } @@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size) UDPPacket packet(max_size); socklen_t addrSize; addrSize = sizeof(*packet.address.as_sockaddr()); - ssize_t ret = recvfrom(listenSocket, + ssize_t ret = recvfrom(m_sock, packet.buffer.data(), packet.buffer.size(), 0, @@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size) if (ret == SOCKET_ERROR) { packet.buffer.resize(0); + + // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK if (errno == EAGAIN) { +#else + if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif return 0; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size) void UDPSocket::send(UDPPacket& packet) { - int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0, + const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } void UDPSocket::send(const std::vector& data, InetAddress destination) { - int ret = sendto(listenSocket, &data[0], data.size(), 0, + const int ret = sendto(m_sock, data.data(), data.size(), 0, destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } -void UDPSocket::joinGroup(char* groupname) +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname) if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { throw runtime_error("Group name is not a multicast address"); } - group.imr_address.s_addr = htons(INADDR_ANY);; + + if (if_addr) { + group.imr_address.s_addr = inet_addr(if_addr); + } + else { + group.imr_address.s_addr = htons(INADDR_ANY); + } group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { throw runtime_error(string("Can't join multicast group") + strerror(errno)); } @@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr) throw runtime_error(string("Can't parse source address") + strerror(errno)); } - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == SOCKET_ERROR) { throw runtime_error(string("Can't set source address") + strerror(errno)); } @@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr) void UDPSocket::setMulticastTTL(int ttl) { - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) == SOCKET_ERROR) { throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); } } +UDPReceiver::~UDPReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { + m_port = port; + m_bindto = bindto; + m_mcastaddr = mcastaddr; + m_max_packets_queued = max_packets_queued; + m_thread = std::thread(&UDPReceiver::m_run, this); +} -TCPSocket::TCPSocket() +std::vector UDPReceiver::get_packet_buffer() { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); + if (m_stop) { + throw runtime_error("UDP Receiver not running"); } -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); + UDPPacket p; + m_packets.wait_and_pop(p); + + return p.buffer; +} + +void UDPReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic& m_stop; + } autoSetStop(m_stop); + + if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { + m_sock.reinit(m_port, m_mcastaddr); + m_sock.setMulticastSource(m_bindto.c_str()); + m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); } -#endif + else { + m_sock.reinit(m_port, m_bindto); + } + + while (not m_stop) { + constexpr size_t packsize = 8192; + try { + auto packet = m_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + + // If this blocks, the UDP socket will lose incoming packets + m_packets.push_wait_if_full(packet, m_max_packets_queued); + } + catch (const std::runtime_error& e) { + // TODO replace fprintf + // TODO handle intr + fprintf(stderr, "Socket error: %s\n", e.what()); + m_stop = true; + } + } +} + + +TCPSocket::TCPSocket() +{ } TCPSocket::~TCPSocket() @@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port) /* Obtain address(es) matching host/port */ struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = 0; hints.ai_protocol = 0; @@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; @@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name) freeaddrinfo(result); +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + if (rp == nullptr) { throw runtime_error("Could not bind"); } @@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher() m_running = false; m_connections.clear(); m_listener_socket.close(); - m_listener_thread.join(); + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } } void TCPDataDispatcher::start(int port, const string& address) diff --git a/lib/Socket.h b/lib/Socket.h index 82ff5ad..2393584 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -104,13 +104,14 @@ class UDPSocket const UDPSocket& operator=(const UDPSocket& other) = delete; /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port); void reinit(int port, const std::string& name); void close(void); void send(UDPPacket& packet); void send(const std::vector& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(char* groupname); + void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -120,9 +121,36 @@ class UDPSocket void setBlocking(bool block); protected: - SOCKET listenSocket; + SOCKET m_sock; }; +/* Threaded UDP receiver */ +class UDPReceiver { + public: + UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UDPReceiver(); + UDPReceiver(const UDPReceiver&) = delete; + UDPReceiver operator=(const UDPReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::string m_bindto; + std::string m_mcastaddr; + size_t m_max_packets_queued; + std::thread m_thread; + std::atomic m_stop; + ThreadsafeQueue m_packets; + UDPSocket m_sock; +}; class TCPSocket { public: diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h new file mode 100644 index 0000000..62f4c96 --- /dev/null +++ b/lib/ThreadsafeQueue.h @@ -0,0 +1,176 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue, depends on C++11 + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available, or a wakeup requested. + */ + +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + +template +class ThreadsafeQueue +{ +public: + /* Push one element into the queue, and notify another thread that + * might be waiting. + * + * returns the new queue size. + */ + size_t push(T const& val) + { + std::unique_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + size_t push(T&& val) + { + std::unique_lock lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + + /* Send a notification for the receiver thread */ + void notify(void) + { + the_rx_notification.notify_one(); + } + + bool empty() const + { + std::unique_lock lock(the_mutex); + return the_queue.empty(); + } + + size_t size() const + { + std::unique_lock lock(the_mutex); + return the_queue.size(); + } + + bool try_pop(T& popped_value) + { + std::unique_lock lock(the_mutex); + if (the_queue.empty()) { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + + return true; + } + + void wait_and_pop(T& popped_value, size_t prebuffering = 1) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() < prebuffering and + not wakeup_requested) { + the_rx_notification.wait(lock); + } + + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + } + } + +private: + std::queue the_queue; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; + bool wakeup_requested = false; +}; + diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..3142bb3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@ #include "utils.h" #include "DabMux.h" #include "ManagementServer.h" +#include "input/Edi.h" #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" @@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr& subchan, type = pt.get("type"); } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no type defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!"); } - /* Both inputfile and inputuri are supported, and are equivalent. - * inputuri has precedence + /* Up to v2.3.1, both inputfile and inputuri are supported, and are + * equivalent. inputuri has precedence. + * + * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both. */ string inputUri = pt.get("inputuri", ""); + string proto = pt.get("inputproto", ""); - if (inputUri == "") { + if (inputUri.empty() and proto.empty()) { try { + /* Old approach, derives proto from scheme used in the URL. + * This makes it impossible to distinguish between ZMQ tcp:// and + * EDI tcp:// + */ inputUri = pt.get("inputfile"); + size_t protopos = inputUri.find("://"); + + if (protopos == string::npos) { + proto = "file"; + } + else { + proto = inputUri.substr(0, protopos); + + if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + proto = "zmq"; + } + else if (proto == "sti-rtp") { + proto = "sti"; + } + } } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!"); } } - - string proto; - size_t protopos = inputUri.find("://"); - if (protopos == string::npos) { - proto = "file"; - } - else { - proto = inputUri.substr(0, protopos); + else if (inputUri.empty() or proto.empty()) { + throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid); } subchan->inputUri = inputUri; @@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr& subchan, throw logic_error("Incomplete handling of file input"); } } - else if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + else if (proto == "zmq") { auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr& subchan, rcs.enrol(inzmq.get()); subchan->input = inzmq; } - - if (proto == "epgm") { - etiLog.level(warn) << "Using untested epgm:// zeromq input"; - } - else if (proto == "ipc") { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - } } - else if (proto == "sti-rtp") { + else if (proto == "edi") { + subchan->input = make_shared(); + } + else if (proto == "stp") { subchan->input = make_shared(); } else { diff --git a/src/DabMux.cpp b/src/DabMux.cpp index d749ed3..e726fd3 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -304,7 +304,7 @@ int main(int argc, char *argv[]) edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { - auto dest = make_shared(); + auto dest = make_shared(); dest->listen_port = pt_edi_dest.second.get("listenport"); dest->max_frames_queued = pt_edi_dest.second.get("max_frames_queued", 500); edi_conf.destinations.push_back(dest); diff --git a/src/ReedSolomon.cpp b/src/ReedSolomon.cpp deleted file mode 100644 index 38d8ea8..0000000 --- a/src/ReedSolomon.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right - of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "ReedSolomon.h" -#include -#include -#include -#include -#include // For galois.h ... -#include // For memcpy - -extern "C" { -#include "fec/fec.h" -} -#include - -#define SYMSIZE 8 - - -ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) -{ - setReverse(reverse); - - m_N = N; - m_K = K; - - const int symsize = SYMSIZE; - const int nroots = N - K; // For EDI PFT, this must be 48 - const int pad = ((1 << symsize) - 1) - N; // is 255-N - - rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); - - if (rsData == nullptr) { - std::stringstream ss; - ss << "Invalid Reed-Solomon parameters! " << - "N=" << N << " ; K=" << K << " ; pad=" << pad; - throw std::invalid_argument(ss.str()); - } -} - - -ReedSolomon::~ReedSolomon() -{ - free_rs_char(rsData); -} - - -void ReedSolomon::setReverse(bool state) -{ - reverse = state; -} - - -int ReedSolomon::encode(void* data, void* fec, size_t size) -{ - uint8_t* input = reinterpret_cast(data); - uint8_t* output = reinterpret_cast(fec); - int ret = 0; - - if (reverse) { - std::vector buffer(m_N); - - memcpy(&buffer[0], input, m_K); - memcpy(&buffer[m_K], output, m_N - m_K); - - ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); - if ((ret != 0) && (ret != -1)) { - memcpy(input, &buffer[0], m_K); - memcpy(output, &buffer[m_K], m_N - m_K); - } - } - else { - encode_rs_char(rsData, input, output); - } - - return ret; -} - - -int ReedSolomon::encode(void* data, size_t size) -{ - uint8_t* input = reinterpret_cast(data); - int ret = 0; - - if (reverse) { - ret = decode_rs_char(rsData, input, nullptr, 0); - } - else { - encode_rs_char(rsData, input, &input[m_K]); - } - - return ret; -} diff --git a/src/ReedSolomon.h b/src/ReedSolomon.h deleted file mode 100644 index abcef62..0000000 --- a/src/ReedSolomon.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right - of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include -#endif - -#include - -class ReedSolomon -{ -public: - ReedSolomon(int N, int K, - bool reverse = false, - int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); - ReedSolomon(const ReedSolomon& other) = delete; - ReedSolomon operator=(const ReedSolomon& other) = delete; - ~ReedSolomon(); - - void setReverse(bool state); - int encode(void* data, void* fec, size_t size); - int encode(void* data, size_t size); - -private: - int m_N; - int m_K; - - void* rsData; - bool reverse; -}; - diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ab287b2..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - An implementation for a threadsafe queue, depends on C++11 - - When creating a ThreadsafeQueue, one can specify the minimal number - of elements it must contain before it is possible to take one - element out. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include -#include -#include -#include - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template -class ThreadsafeQueue -{ -public: - /* Push one element into the queue, and notify another thread that - * might be waiting. - * - * returns the new queue size. - */ - size_t push(T const& val) - { - std::unique_lock lock(the_mutex); - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - size_t push(T&& val) - { - std::unique_lock lock(the_mutex); - the_queue.emplace(std::move(val)); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Push one element into the queue, but wait until the - * queue size goes below the threshold. - * - * Notify waiting thread. - * - * returns the new queue size. - */ - size_t push_wait_if_full(T const& val, size_t threshold) - { - std::unique_lock lock(the_mutex); - while (the_queue.size() >= threshold) { - the_tx_notification.wait(lock); - } - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Trigger a wakeup event on a blocking consumer, which - * will receive a ThreadsafeQueueWakeup exception. - */ - void trigger_wakeup(void) - { - std::unique_lock lock(the_mutex); - wakeup_requested = true; - lock.unlock(); - the_rx_notification.notify_one(); - } - - /* Send a notification for the receiver thread */ - void notify(void) - { - the_rx_notification.notify_one(); - } - - bool empty() const - { - std::unique_lock lock(the_mutex); - return the_queue.empty(); - } - - size_t size() const - { - std::unique_lock lock(the_mutex); - return the_queue.size(); - } - - bool try_pop(T& popped_value) - { - std::unique_lock lock(the_mutex); - if (the_queue.empty()) { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - - return true; - } - - void wait_and_pop(T& popped_value, size_t prebuffering = 1) - { - std::unique_lock lock(the_mutex); - while (the_queue.size() < prebuffering and - not wakeup_requested) { - the_rx_notification.wait(lock); - } - - if (wakeup_requested) { - wakeup_requested = false; - throw ThreadsafeQueueWakeup(); - } - else { - std::swap(popped_value, the_queue.front()); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - } - } - -private: - std::queue the_queue; - mutable std::mutex the_mutex; - std::condition_variable the_rx_notification; - std::condition_variable the_tx_notification; - bool wakeup_requested = false; -}; - diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t { }; // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t { unsigned int listen_port = 0; size_t max_frames_queued = 1024; }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { + std::string dest_addr; + unsigned int dest_port = 0; + size_t max_frames_queued = 1024; +}; + struct configuration_t { unsigned chunk_len = 207; // RSk, data length of each chunk unsigned fec = 0; // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) #if 0 fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", - m_pseq, findex, fcount, plen & ~0x8000); + m_pseq, findex, fcount, plen & ~0xC000); #endif } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index 6d3950f..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const } etiLog.level(info) << " source port " << udp_dest->source_port; } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (interleaver_enabled()) { @@ -78,13 +82,18 @@ Sender::Sender(const configuration_t& conf) : udp_sockets.emplace(udp_dest.get(), udp_socket); } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { auto dispatcher = make_shared(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + auto tcp_socket = make_shared(); + tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_socket); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } @@ -111,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket) vector edi_fragments = edi_pft.Assemble(af_packet); if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", edi_fragments.size()); } @@ -128,22 +137,25 @@ void Sender::write(const TagPacket& tagpacket) udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + ostream_iterator debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); } } if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu", + fprintf(stderr, "EDI number of PFT fragments %zu\n", edi_fragments.size()); } } @@ -156,17 +168,20 @@ void Sender::write(const TagPacket& tagpacket) udp_sockets.at(udp_dest.get())->send(af_packet, addr); } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(af_packet.begin(), af_packet.end(), debug_iterator); + ostream_iterator debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); } } } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 74126d1..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -36,6 +36,7 @@ #include #include #include +#include #include namespace edi { @@ -62,7 +63,8 @@ class Sender { edi::Interleaver edi_interleaver; std::unordered_map> udp_sockets; - std::unordered_map> tcp_dispatchers; + std::unordered_map> tcp_dispatchers; + std::unordered_map> tcp_senders; }; } diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index b4cced0..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -151,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf) int Sti_d_Rtp::open(const std::string& name) { - // Skip the sti-rtp:// part if it is present - const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? + // Skip the rtp:// part if it is present + const string endpoint = (name.substr(0, 10) == "rtp://") ? name.substr(10) : name; // The endpoint should be address:port @@ -160,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name) if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << - " is an invalid format for sti-rtp address: " - "expected [sti-rtp://]address:port"; + " is an invalid format for rtp address: " + "expected [rtp://]address:port"; throw invalid_argument(ss.str()); } -- cgit v1.2.3