diff options
-rw-r--r-- | Makefile.am | 21 | ||||
-rw-r--r-- | doc/advanced.mux | 17 | ||||
-rw-r--r-- | doc/example.mux | 3 | ||||
-rw-r--r-- | lib/ReedSolomon.cpp (renamed from src/ReedSolomon.cpp) | 0 | ||||
-rw-r--r-- | lib/ReedSolomon.h (renamed from src/ReedSolomon.h) | 0 | ||||
-rw-r--r-- | lib/Socket.cpp | 165 | ||||
-rw-r--r-- | lib/Socket.h | 32 | ||||
-rw-r--r-- | lib/ThreadsafeQueue.h (renamed from src/ThreadsafeQueue.h) | 14 | ||||
-rw-r--r-- | src/ConfigParser.cpp | 61 | ||||
-rw-r--r-- | src/DabMux.cpp | 2 | ||||
-rw-r--r-- | src/dabOutput/edi/Config.h | 9 | ||||
-rw-r--r-- | src/dabOutput/edi/PFT.cpp | 2 | ||||
-rw-r--r-- | src/dabOutput/edi/Transport.cpp | 43 | ||||
-rw-r--r-- | src/dabOutput/edi/Transport.h | 4 | ||||
-rw-r--r-- | src/input/Udp.cpp | 8 |
15 files changed, 272 insertions, 109 deletions
diff --git a/Makefile.am b/Makefile.am index 2773bbe..80d24e0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -69,6 +69,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/input/File.h \ src/input/Udp.cpp \ src/input/Udp.h \ + src/input/Edi.cpp \ + src/input/Edi.h \ src/dabOutput/dabOutput.h \ src/dabOutput/dabOutputFile.cpp \ src/dabOutput/dabOutputFifo.cpp \ @@ -107,11 +109,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/MuxElements.cpp \ src/MuxElements.h \ src/PcDebug.h \ - src/ReedSolomon.h \ - src/ReedSolomon.cpp \ src/RemoteControl.cpp \ src/RemoteControl.h \ - src/ThreadsafeQueue.h \ src/crc.h \ src/crc.c \ src/fig/FIG.h \ @@ -161,8 +160,19 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/PrbsGenerator.h \ src/utils.cpp \ src/utils.h \ + lib/edi/STIDecoder.cpp \ + lib/edi/STIDecoder.h \ + lib/edi/STIWriter.cpp \ + lib/edi/STIWriter.h \ + lib/edi/PFT.cpp \ + lib/edi/PFT.h \ + lib/edi/common.cpp \ + lib/edi/common.h \ + lib/ReedSolomon.h \ + lib/ReedSolomon.cpp \ lib/Socket.h \ lib/Socket.cpp \ + lib/ThreadsafeQueue.h \ lib/zmq.hpp \ $(lib_fec_sources) \ $(lib_charset_sources) @@ -201,14 +211,15 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \ src/dabOutput/edi/TagPacket.h \ src/dabOutput/edi/Transport.cpp \ src/dabOutput/edi/Transport.h \ - src/ReedSolomon.h \ - src/ReedSolomon.cpp \ src/Log.h \ src/Log.cpp \ src/crc.h \ src/crc.c \ + lib/ReedSolomon.h \ + lib/ReedSolomon.cpp \ lib/Socket.h \ lib/Socket.cpp \ + lib/ThreadsafeQueue.h \ lib/zmq.hpp \ $(lib_fec_sources) diff --git a/doc/advanced.mux b/doc/advanced.mux index fb67b82..b9cec05 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -163,7 +163,8 @@ subchannels { sub-fu { type audio ; example file input - inputfile "funk.mp2" + inputproto zmq + inputuri "funk.mp2" nonblock false bitrate 128 id 10 @@ -188,7 +189,8 @@ subchannels { ; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP. ; This is intended to be compatible with AVT audio encoders. ; EXPERIMENTAL! - inputfile "sti-rtp://127.0.0.1:32010" + inputproto sti + inputuri "rtp://127.0.0.1:32010" bitrate 96 id 3 protection 3 @@ -196,11 +198,12 @@ subchannels { sub-ri { type dabplus ; example file input - ;inputfile "rick.dabp" + ;inputuri "rick.dabp" ; example zmq input: ; Accepts connections to port 9000 from any interface. ; Use ODR-AudioEnc as encoder - inputfile "tcp://*:9000" + inputproto zmq + inputuri "tcp://*:9000" bitrate 96 id 1 protection 1 @@ -256,7 +259,8 @@ subchannels { ; for audio types, you can use the ZeroMQ input (if compiled in) ; with the following configuration in combination with ; Toolame-DAB - inputfile "tcp://*:9001" + inputproto zmq + inputuri "tcp://*:9001" bitrate 96 id 1 protection 1 @@ -273,7 +277,8 @@ subchannels { type data ; Use the default PRBS polynomial. - inputfile "prbs://" + inputproto prbs + inputuri "prbs://" ; To use another polynomial, set it in the url as hexadecimal ; The default polynomial is G(x) = x^20 + x^17 + 1, represented as diff --git a/doc/example.mux b/doc/example.mux index 6c2bc18..31e072d 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -171,7 +171,8 @@ subchannels { type dabplus ; Accepts connections to port 9000 from any interface. ; Use ODR-AudioEnc as encoder - inputfile "tcp://*:9000" + inputproto zmq + inputuri "tcp://*:9000" bitrate 96 id 1 protection 3 diff --git a/src/ReedSolomon.cpp b/lib/ReedSolomon.cpp index 38d8ea8..38d8ea8 100644 --- a/src/ReedSolomon.cpp +++ b/lib/ReedSolomon.cpp diff --git a/src/ReedSolomon.h b/lib/ReedSolomon.h index abcef62..abcef62 100644 --- a/src/ReedSolomon.h +++ b/lib/ReedSolomon.h diff --git a/lib/Socket.cpp b/lib/Socket.cpp index fe3df44..9b404eb 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = 0; hints.ai_protocol = 0; - hints.ai_canonname = nullptr; - hints.ai_addr = nullptr; - hints.ai_next = nullptr; struct addrinfo *result, *rp; int s = getaddrinfo(destination.c_str(), service, &hints, &result); @@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) : UDPSocket::UDPSocket() : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(0, ""); } UDPSocket::UDPSocket(int port) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, ""); } UDPSocket::UDPSocket(int port, const std::string& name) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, name); } @@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) : void UDPSocket::setBlocking(bool block) { - int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK); + int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); if (res == -1) { throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); } } +void UDPSocket::reinit(int port) +{ + return reinit(port, ""); +} + void UDPSocket::reinit(int port, const std::string& name) { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + if (port == 0) { + // No need to bind to a given port, creating the + // socket is enough + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + return; } char service[NI_MAXSERV]; @@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ @@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name) } if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { - listenSocket = sfd; + m_sock = sfd; break; } @@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name) void UDPSocket::close() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } - listenSocket = INVALID_SOCKET; + m_sock = INVALID_SOCKET; } UDPSocket::~UDPSocket() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } } @@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size) UDPPacket packet(max_size); socklen_t addrSize; addrSize = sizeof(*packet.address.as_sockaddr()); - ssize_t ret = recvfrom(listenSocket, + ssize_t ret = recvfrom(m_sock, packet.buffer.data(), packet.buffer.size(), 0, @@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size) if (ret == SOCKET_ERROR) { packet.buffer.resize(0); + + // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK if (errno == EAGAIN) { +#else + if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif return 0; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size) void UDPSocket::send(UDPPacket& packet) { - int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0, + const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination) { - int ret = sendto(listenSocket, &data[0], data.size(), 0, + const int ret = sendto(m_sock, data.data(), data.size(), 0, destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } -void UDPSocket::joinGroup(char* groupname) +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname) if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { throw runtime_error("Group name is not a multicast address"); } - group.imr_address.s_addr = htons(INADDR_ANY);; + + if (if_addr) { + group.imr_address.s_addr = inet_addr(if_addr); + } + else { + group.imr_address.s_addr = htons(INADDR_ANY); + } group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { throw runtime_error(string("Can't join multicast group") + strerror(errno)); } @@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr) throw runtime_error(string("Can't parse source address") + strerror(errno)); } - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == SOCKET_ERROR) { throw runtime_error(string("Can't set source address") + strerror(errno)); } @@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr) void UDPSocket::setMulticastTTL(int ttl) { - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) == SOCKET_ERROR) { throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); } } +UDPReceiver::~UDPReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { + m_port = port; + m_bindto = bindto; + m_mcastaddr = mcastaddr; + m_max_packets_queued = max_packets_queued; + m_thread = std::thread(&UDPReceiver::m_run, this); +} -TCPSocket::TCPSocket() +std::vector<uint8_t> UDPReceiver::get_packet_buffer() { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); + if (m_stop) { + throw runtime_error("UDP Receiver not running"); } -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); + UDPPacket p; + m_packets.wait_and_pop(p); + + return p.buffer; +} + +void UDPReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic<bool>& m_stop; + } autoSetStop(m_stop); + + if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { + m_sock.reinit(m_port, m_mcastaddr); + m_sock.setMulticastSource(m_bindto.c_str()); + m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); } -#endif + else { + m_sock.reinit(m_port, m_bindto); + } + + while (not m_stop) { + constexpr size_t packsize = 8192; + try { + auto packet = m_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + + // If this blocks, the UDP socket will lose incoming packets + m_packets.push_wait_if_full(packet, m_max_packets_queued); + } + catch (const std::runtime_error& e) { + // TODO replace fprintf + // TODO handle intr + fprintf(stderr, "Socket error: %s\n", e.what()); + m_stop = true; + } + } +} + + +TCPSocket::TCPSocket() +{ } TCPSocket::~TCPSocket() @@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port) /* Obtain address(es) matching host/port */ struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = 0; hints.ai_protocol = 0; @@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; @@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name) freeaddrinfo(result); +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + if (rp == nullptr) { throw runtime_error("Could not bind"); } @@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher() m_running = false; m_connections.clear(); m_listener_socket.close(); - m_listener_thread.join(); + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } } void TCPDataDispatcher::start(int port, const string& address) diff --git a/lib/Socket.h b/lib/Socket.h index 82ff5ad..2393584 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -104,13 +104,14 @@ class UDPSocket const UDPSocket& operator=(const UDPSocket& other) = delete; /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port); void reinit(int port, const std::string& name); void close(void); void send(UDPPacket& packet); void send(const std::vector<uint8_t>& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(char* groupname); + void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -120,9 +121,36 @@ class UDPSocket void setBlocking(bool block); protected: - SOCKET listenSocket; + SOCKET m_sock; }; +/* Threaded UDP receiver */ +class UDPReceiver { + public: + UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UDPReceiver(); + UDPReceiver(const UDPReceiver&) = delete; + UDPReceiver operator=(const UDPReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector<uint8_t> get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::string m_bindto; + std::string m_mcastaddr; + size_t m_max_packets_queued; + std::thread m_thread; + std::atomic<bool> m_stop; + ThreadsafeQueue<UDPPacket> m_packets; + UDPSocket m_sock; +}; class TCPSocket { public: diff --git a/src/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index ab287b2..62f4c96 100644 --- a/src/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -12,20 +12,18 @@ element out. */ /* - This file is part of ODR-DabMux. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + along with this program. If not, see <https://www.gnu.org/licenses/>. */ #pragma once diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..3142bb3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@ #include "utils.h" #include "DabMux.h" #include "ManagementServer.h" +#include "input/Edi.h" #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" @@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, type = pt.get<string>("type"); } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no type defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!"); } - /* Both inputfile and inputuri are supported, and are equivalent. - * inputuri has precedence + /* Up to v2.3.1, both inputfile and inputuri are supported, and are + * equivalent. inputuri has precedence. + * + * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both. */ string inputUri = pt.get<string>("inputuri", ""); + string proto = pt.get<string>("inputproto", ""); - if (inputUri == "") { + if (inputUri.empty() and proto.empty()) { try { + /* Old approach, derives proto from scheme used in the URL. + * This makes it impossible to distinguish between ZMQ tcp:// and + * EDI tcp:// + */ inputUri = pt.get<string>("inputfile"); + size_t protopos = inputUri.find("://"); + + if (protopos == string::npos) { + proto = "file"; + } + else { + proto = inputUri.substr(0, protopos); + + if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + proto = "zmq"; + } + else if (proto == "sti-rtp") { + proto = "sti"; + } + } } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!"); } } - - string proto; - size_t protopos = inputUri.find("://"); - if (protopos == string::npos) { - proto = "file"; - } - else { - proto = inputUri.substr(0, protopos); + else if (inputUri.empty() or proto.empty()) { + throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid); } subchan->inputUri = inputUri; @@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, throw logic_error("Incomplete handling of file input"); } } - else if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + else if (proto == "zmq") { auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, rcs.enrol(inzmq.get()); subchan->input = inzmq; } - - if (proto == "epgm") { - etiLog.level(warn) << "Using untested epgm:// zeromq input"; - } - else if (proto == "ipc") { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - } } - else if (proto == "sti-rtp") { + else if (proto == "edi") { + subchan->input = make_shared<Inputs::Edi>(); + } + else if (proto == "stp") { subchan->input = make_shared<Inputs::Sti_d_Rtp>(); } else { diff --git a/src/DabMux.cpp b/src/DabMux.cpp index d749ed3..e726fd3 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -304,7 +304,7 @@ int main(int argc, char *argv[]) edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { - auto dest = make_shared<edi::tcp_destination_t>(); + auto dest = make_shared<edi::tcp_server_t>(); dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport"); dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500); edi_conf.destinations.push_back(dest); diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t { }; // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t { unsigned int listen_port = 0; size_t max_frames_queued = 1024; }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { + std::string dest_addr; + unsigned int dest_port = 0; + size_t max_frames_queued = 1024; +}; + struct configuration_t { unsigned chunk_len = 207; // RSk, data length of each chunk unsigned fec = 0; // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) #if 0 fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", - m_pseq, findex, fcount, plen & ~0x8000); + m_pseq, findex, fcount, plen & ~0xC000); #endif } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index 6d3950f..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const } etiLog.level(info) << " source port " << udp_dest->source_port; } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { + etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (interleaver_enabled()) { @@ -78,13 +82,18 @@ Sender::Sender(const configuration_t& conf) : udp_sockets.emplace(udp_dest.get(), udp_socket); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { + auto tcp_socket = make_shared<Socket::TCPSocket>(); + tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_socket); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } @@ -111,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", edi_fragments.size()); } @@ -128,22 +137,25 @@ void Sender::write(const TagPacket& tagpacket) udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); } } if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu", + fprintf(stderr, "EDI number of PFT fragments %zu\n", edi_fragments.size()); } } @@ -156,17 +168,20 @@ void Sender::write(const TagPacket& tagpacket) udp_sockets.at(udp_dest.get())->send(af_packet, addr); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(af_packet.begin(), af_packet.end(), debug_iterator); + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); } } } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 74126d1..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -36,6 +36,7 @@ #include <vector> #include <unordered_map> #include <stdexcept> +#include <fstream> #include <cstdint> namespace edi { @@ -62,7 +63,8 @@ class Sender { edi::Interleaver edi_interleaver; std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_destination_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; + std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; + std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders; }; } diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index b4cced0..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -151,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf) int Sti_d_Rtp::open(const std::string& name) { - // Skip the sti-rtp:// part if it is present - const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? + // Skip the rtp:// part if it is present + const string endpoint = (name.substr(0, 10) == "rtp://") ? name.substr(10) : name; // The endpoint should be address:port @@ -160,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name) if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << - " is an invalid format for sti-rtp address: " - "expected [sti-rtp://]address:port"; + " is an invalid format for rtp address: " + "expected [rtp://]address:port"; throw invalid_argument(ss.str()); } |