diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ConfigParser.cpp | 61 | ||||
-rw-r--r-- | src/DabMultiplexer.h | 3 | ||||
-rw-r--r-- | src/DabMux.cpp | 5 | ||||
-rw-r--r-- | src/InetAddress.cpp | 155 | ||||
-rw-r--r-- | src/InetAddress.h | 78 | ||||
-rw-r--r-- | src/ReedSolomon.cpp | 116 | ||||
-rw-r--r-- | src/ReedSolomon.h | 56 | ||||
-rw-r--r-- | src/TcpSocket.cpp | 359 | ||||
-rw-r--r-- | src/TcpSocket.h | 164 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 178 | ||||
-rw-r--r-- | src/UdpSocket.cpp | 256 | ||||
-rw-r--r-- | src/UdpSocket.h | 174 | ||||
-rw-r--r-- | src/dabOutput/dabOutput.h | 21 | ||||
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 2 | ||||
-rw-r--r-- | src/dabOutput/dabOutputUdp.cpp | 65 | ||||
-rw-r--r-- | src/dabOutput/edi/Config.h | 9 | ||||
-rw-r--r-- | src/dabOutput/edi/PFT.cpp | 2 | ||||
-rw-r--r-- | src/dabOutput/edi/Transport.cpp | 67 | ||||
-rw-r--r-- | src/dabOutput/edi/Transport.h | 8 | ||||
-rw-r--r-- | src/input/Udp.cpp | 59 | ||||
-rw-r--r-- | src/input/Udp.h | 4 |
21 files changed, 143 insertions, 1699 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..3142bb3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@ #include "utils.h" #include "DabMux.h" #include "ManagementServer.h" +#include "input/Edi.h" #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" @@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, type = pt.get<string>("type"); } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no type defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!"); } - /* Both inputfile and inputuri are supported, and are equivalent. - * inputuri has precedence + /* Up to v2.3.1, both inputfile and inputuri are supported, and are + * equivalent. inputuri has precedence. + * + * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both. */ string inputUri = pt.get<string>("inputuri", ""); + string proto = pt.get<string>("inputproto", ""); - if (inputUri == "") { + if (inputUri.empty() and proto.empty()) { try { + /* Old approach, derives proto from scheme used in the URL. + * This makes it impossible to distinguish between ZMQ tcp:// and + * EDI tcp:// + */ inputUri = pt.get<string>("inputfile"); + size_t protopos = inputUri.find("://"); + + if (protopos == string::npos) { + proto = "file"; + } + else { + proto = inputUri.substr(0, protopos); + + if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + proto = "zmq"; + } + else if (proto == "sti-rtp") { + proto = "sti"; + } + } } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!"); } } - - string proto; - size_t protopos = inputUri.find("://"); - if (protopos == string::npos) { - proto = "file"; - } - else { - proto = inputUri.substr(0, protopos); + else if (inputUri.empty() or proto.empty()) { + throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid); } subchan->inputUri = inputUri; @@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, throw logic_error("Incomplete handling of file input"); } } - else if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + else if (proto == "zmq") { auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan, rcs.enrol(inzmq.get()); subchan->input = inzmq; } - - if (proto == "epgm") { - etiLog.level(warn) << "Using untested epgm:// zeromq input"; - } - else if (proto == "ipc") { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - } } - else if (proto == "sti-rtp") { + else if (proto == "edi") { + subchan->input = make_shared<Inputs::Edi>(); + } + else if (proto == "stp") { subchan->input = make_shared<Inputs::Sti_d_Rtp>(); } else { diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 386c23c..d1075a6 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -37,8 +37,7 @@ #include "fig/FIGCarousel.h" #include "crc.h" #include "utils.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h" #include "PcDebug.h" #include "MuxElements.h" #include "RemoteControl.h" diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 51f0310..578fc63 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -99,8 +99,7 @@ typedef DWORD32 uint32_t; #include "dabOutput/dabOutput.h" #include "crc.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h" #include "PcDebug.h" #include "DabMux.h" #include "MuxElements.h" @@ -305,7 +304,7 @@ int main(int argc, char *argv[]) edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { - auto dest = make_shared<edi::tcp_destination_t>(); + auto dest = make_shared<edi::tcp_server_t>(); dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport"); dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500); edi_conf.destinations.push_back(dest); diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp deleted file mode 100644 index 7660263..0000000 --- a/src/InetAddress.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "InetAddress.h" -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl -# define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -# define TRACE_CLASS(clas, func) -# define TRACE_STATIC(clas, func) -# endif -#endif - - -int inetErrNo = 0; -const char *inetErrMsg = nullptr; -const char *inetErrDesc = nullptr; - - -/** - * Constructs an IP address. - * @param port The port of this address - * @param name The name of this address - */ -InetAddress::InetAddress(int port, const char* name) { - TRACE_CLASS("InetAddress", "InetAddress(int, char)"); - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = htons(INADDR_ANY); - addr.sin_port = htons(port); - if (name) - setAddress(name); -} - - -/// Returns the raw IP address of this InetAddress object. -sockaddr *InetAddress::getAddress() { - TRACE_CLASS("InetAddress", "getAddress()"); - return (sockaddr *)&addr; -} - - -/// Return the port of this address. -int InetAddress::getPort() -{ - TRACE_CLASS("InetAddress", "getPort()"); - return ntohs(addr.sin_port); -} - - -/** - * Returns the IP address string "%d.%d.%d.%d". - * @return IP address - */ -const char *InetAddress::getHostAddress() { - TRACE_CLASS("InetAddress", "getHostAddress()"); - return inet_ntoa(addr.sin_addr); -} - - -/// Returns true if this address is multicast -bool InetAddress::isMulticastAddress() { - TRACE_CLASS("InetAddress", "isMulticastAddress()"); - return IN_MULTICAST(ntohl(addr.sin_addr.s_addr)); // a modifier -} - - -/** - * Set the port number - * @param port The new port number - */ -void InetAddress::setPort(int port) -{ - TRACE_CLASS("InetAddress", "setPort(int)"); - addr.sin_port = htons(port); -} - - -/** - * Set the address - * @param name The new address name - * @return 0 if ok - * -1 if error - */ -int InetAddress::setAddress(const std::string& name) -{ - TRACE_CLASS("InetAddress", "setAddress(string)"); - if (!name.empty()) { - if (atoi(name.c_str())) { // If it start with a number - if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) { - addr.sin_addr.s_addr = htons(INADDR_ANY); - inetErrNo = 0; - inetErrMsg = "Invalid address"; - inetErrDesc = name.c_str(); - return -1; - } - } - else { // Assume it's a real name - hostent *host = gethostbyname(name.c_str()); - if (host) { - addr.sin_addr = *(in_addr *)(host->h_addr); - } else { - addr.sin_addr.s_addr = htons(INADDR_ANY); - inetErrNo = 0; - inetErrMsg = "Could not find address"; - inetErrDesc = name.c_str(); - return -1; - } - } - } - else { - addr.sin_addr.s_addr = INADDR_ANY; - } - return 0; -} - - -void setInetError(const char* description) -{ - inetErrNo = 0; - inetErrNo = errno; - inetErrMsg = strerror(inetErrNo); - inetErrDesc = description; -} - diff --git a/src/InetAddress.h b/src/InetAddress.h deleted file mode 100644 index e246d4c..0000000 --- a/src/InetAddress.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _InetAddress -#define _InetAddress - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <stdlib.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#include <string> - -#define SOCKET int -#define INVALID_SOCKET -1 -#define INVALID_PORT -1 - - -/// The last error number -extern int inetErrNo; -/// The last error message -extern const char *inetErrMsg; -/// The description of the last error -extern const char *inetErrDesc; -/// Set the number, message and description of the last error -void setInetError(const char* description); - - -/** - * This class represents an Internet Protocol (IP) address. - * @author Pascal Charest pascal.charest@crc.ca - */ -class InetAddress { - public: - InetAddress(int port = 0, const char* name = NULL); - - sockaddr *getAddress(); - const char *getHostAddress(); - int getPort(); - int setAddress(const std::string& name); - void setPort(int port); - bool isMulticastAddress(); - - private: - sockaddr_in addr; -}; - - -#endif diff --git a/src/ReedSolomon.cpp b/src/ReedSolomon.cpp deleted file mode 100644 index 38d8ea8..0000000 --- a/src/ReedSolomon.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right - of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "ReedSolomon.h" -#include <vector> -#include <algorithm> -#include <stdexcept> -#include <sstream> -#include <stdio.h> // For galois.h ... -#include <string.h> // For memcpy - -extern "C" { -#include "fec/fec.h" -} -#include <assert.h> - -#define SYMSIZE 8 - - -ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) -{ - setReverse(reverse); - - m_N = N; - m_K = K; - - const int symsize = SYMSIZE; - const int nroots = N - K; // For EDI PFT, this must be 48 - const int pad = ((1 << symsize) - 1) - N; // is 255-N - - rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); - - if (rsData == nullptr) { - std::stringstream ss; - ss << "Invalid Reed-Solomon parameters! " << - "N=" << N << " ; K=" << K << " ; pad=" << pad; - throw std::invalid_argument(ss.str()); - } -} - - -ReedSolomon::~ReedSolomon() -{ - free_rs_char(rsData); -} - - -void ReedSolomon::setReverse(bool state) -{ - reverse = state; -} - - -int ReedSolomon::encode(void* data, void* fec, size_t size) -{ - uint8_t* input = reinterpret_cast<uint8_t*>(data); - uint8_t* output = reinterpret_cast<uint8_t*>(fec); - int ret = 0; - - if (reverse) { - std::vector<uint8_t> buffer(m_N); - - memcpy(&buffer[0], input, m_K); - memcpy(&buffer[m_K], output, m_N - m_K); - - ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); - if ((ret != 0) && (ret != -1)) { - memcpy(input, &buffer[0], m_K); - memcpy(output, &buffer[m_K], m_N - m_K); - } - } - else { - encode_rs_char(rsData, input, output); - } - - return ret; -} - - -int ReedSolomon::encode(void* data, size_t size) -{ - uint8_t* input = reinterpret_cast<uint8_t*>(data); - int ret = 0; - - if (reverse) { - ret = decode_rs_char(rsData, input, nullptr, 0); - } - else { - encode_rs_char(rsData, input, &input[m_K]); - } - - return ret; -} diff --git a/src/ReedSolomon.h b/src/ReedSolomon.h deleted file mode 100644 index abcef62..0000000 --- a/src/ReedSolomon.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right - of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include <config.h> -#endif - -#include <stdlib.h> - -class ReedSolomon -{ -public: - ReedSolomon(int N, int K, - bool reverse = false, - int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); - ReedSolomon(const ReedSolomon& other) = delete; - ReedSolomon operator=(const ReedSolomon& other) = delete; - ~ReedSolomon(); - - void setReverse(bool state); - int encode(void* data, void* fec, size_t size); - int encode(void* data, size_t size); - -private: - int m_N; - int m_K; - - void* rsData; - bool reverse; -}; - diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp deleted file mode 100644 index 3ebe73c..0000000 --- a/src/TcpSocket.cpp +++ /dev/null @@ -1,359 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "TcpSocket.h" -#include "Log.h" -#include <iostream> -#include <cstdio> -#include <cstring> -#include <cstdint> -#include <signal.h> -#include <errno.h> -#include <poll.h> -#include <thread> - -using namespace std; - -using vec_u8 = std::vector<uint8_t>; - - -TcpSocket::TcpSocket() : - m_sock(INVALID_SOCKET) -{ -} - -TcpSocket::TcpSocket(int port, const string& name) : - m_sock(INVALID_SOCKET) -{ - if (port) { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { - throw std::runtime_error("Can't create socket"); - } - - reuseopt_t reuse = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - throw std::runtime_error("Can't reuse address"); - } - -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) - == SOCKET_ERROR) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } -#endif - - m_own_address.setAddress(name); - m_own_address.setPort(port); - - if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - ::close(m_sock); - m_sock = INVALID_SOCKET; - throw std::runtime_error("Can't bind socket"); - } - } -} - -TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) : - m_own_address(own), - m_remote_address(remote), - m_sock(sock) { } - -// The move constructors must ensure the moved-from -// TcpSocket won't destroy our socket handle -TcpSocket::TcpSocket(TcpSocket&& other) -{ - m_sock = other.m_sock; - other.m_sock = INVALID_SOCKET; - - m_own_address = other.m_own_address; - m_remote_address = other.m_remote_address; -} - -TcpSocket& TcpSocket::operator=(TcpSocket&& other) -{ - m_sock = other.m_sock; - other.m_sock = INVALID_SOCKET; - - m_own_address = other.m_own_address; - m_remote_address = other.m_remote_address; - return *this; -} - -/** - * Close the underlying socket. - * @return 0 if ok - * -1 if error - */ -int TcpSocket::close() -{ - if (m_sock != INVALID_SOCKET) { - int res = ::close(m_sock); - if (res != 0) { - setInetError("Can't close socket"); - return -1; - } - m_sock = INVALID_SOCKET; - } - return 0; -} - -TcpSocket::~TcpSocket() -{ - close(); -} - -bool TcpSocket::isValid() -{ - return m_sock != INVALID_SOCKET; -} - -ssize_t TcpSocket::recv(void* data, size_t size) -{ - ssize_t ret = ::recv(m_sock, (char*)data, size, 0); - if (ret == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket recv error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - return ret; -} - - -ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) -{ - if (timeout_ms) { - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLOUT; - - const int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - stringstream ss; - ss << "TCP Socket send error on poll(): " << strerror(errno); - throw std::runtime_error(ss.str()); - } - else if (retval == 0) { - // Timed out - return 0; - } - } - - /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not - * receive a SIGPIPE and die. - * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) - const int flags = MSG_NOSIGNAL; -#else - const int flags = 0; -#endif - const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); - - if (ret == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket send error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - return ret; -} - -void TcpSocket::listen() -{ - if (::listen(m_sock, 1) == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket listen error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } -} - -TcpSocket TcpSocket::accept() -{ - InetAddress remote_addr; - socklen_t addrLen = sizeof(sockaddr_in); - - SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); - if (socket == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket accept error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - else { - TcpSocket client(socket, m_own_address, remote_addr); - return client; - } -} - -TcpSocket TcpSocket::accept(int timeout_ms) -{ - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN | POLLOUT; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - stringstream ss; - ss << "TCP Socket accept error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - else if (retval) { - return accept(); - } - else { - TcpSocket invalidsock(0, ""); - return invalidsock; - } -} - - -InetAddress TcpSocket::getOwnAddress() const -{ - return m_own_address; -} - -InetAddress TcpSocket::getRemoteAddress() const -{ - return m_remote_address; -} - - -TCPConnection::TCPConnection(TcpSocket&& sock) : - queue(), - m_running(true), - m_sender_thread(), - m_sock(move(sock)) -{ - auto own_addr = m_sock.getOwnAddress(); - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "New TCP Connection on port " << - own_addr.getPort() << " from " << - addr.getHostAddress() << ":" << addr.getPort(); - m_sender_thread = std::thread(&TCPConnection::process, this); -} - -TCPConnection::~TCPConnection() -{ - m_running = false; - vec_u8 termination_marker; - queue.push(termination_marker); - m_sender_thread.join(); -} - -void TCPConnection::process() -{ - while (m_running) { - vec_u8 data; - queue.wait_and_pop(data); - - if (data.empty()) { - // empty vector is the termination marker - m_running = false; - break; - } - - try { - ssize_t remaining = data.size(); - const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); - const int timeout_ms = 10; // Less than one ETI frame - - while (m_running and remaining > 0) { - const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); - if (sent < 0 or sent > remaining) { - throw std::logic_error("Invalid TcpSocket::send() return value"); - } - remaining -= sent; - buf += sent; - } - } - catch (const std::runtime_error& e) { - m_running = false; - } - } - - - auto own_addr = m_sock.getOwnAddress(); - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "Dropping TCP Connection on port " << - own_addr.getPort() << " from " << - addr.getHostAddress() << ":" << addr.getPort(); -} - - -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : - m_max_queue_size(max_queue_size) -{ -} - -TCPDataDispatcher::~TCPDataDispatcher() -{ - m_running = false; - m_connections.clear(); - m_listener_socket.close(); - m_listener_thread.join(); -} - -void TCPDataDispatcher::start(int port, const string& address) -{ - TcpSocket sock(port, address); - m_listener_socket = move(sock); - - m_running = true; - m_listener_thread = std::thread(&TCPDataDispatcher::process, this); -} - -void TCPDataDispatcher::write(const vec_u8& data) -{ - for (auto& connection : m_connections) { - connection.queue.push(data); - } - - m_connections.remove_if( - [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); -} - -void TCPDataDispatcher::process() -{ - try { - m_listener_socket.listen(); - - const int timeout_ms = 1000; - - while (m_running) { - // Add a new TCPConnection to the list, constructing it from the client socket - auto sock = m_listener_socket.accept(timeout_ms); - if (sock.isValid()) { - m_connections.emplace(m_connections.begin(), move(sock)); - } - } - } - catch (const std::runtime_error& e) { - etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); - m_running = false; - } -} - diff --git a/src/TcpSocket.h b/src/TcpSocket.h deleted file mode 100644 index ec7afd3..0000000 --- a/src/TcpSocket.h +++ /dev/null @@ -1,164 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _TCPSOCKET -#define _TCPSOCKET - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#include "ThreadsafeQueue.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET int -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#define reuseopt_t int - -#include <iostream> -#include <string> -#include <vector> -#include <memory> -#include <atomic> -#include <thread> -#include <list> - -/** - * This class represents a TCP socket. - */ -class TcpSocket -{ - public: - /** Create a new socket that does nothing */ - TcpSocket(); - - /** Create a new socket listening for incoming connections. - * @param port The port number on which the socket will listen. - * @param name The IP address on which the socket will be bound. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ - TcpSocket(int port, const std::string& name); - ~TcpSocket(); - TcpSocket(TcpSocket&& other); - TcpSocket& operator=(TcpSocket&& other); - TcpSocket(const TcpSocket& other) = delete; - TcpSocket& operator=(const TcpSocket& other) = delete; - - bool isValid(void); - - int close(void); - - /** Send data over the TCP connection. - * @param data The buffer that will be sent. - * @param size Number of bytes to send. - * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout - * return number of bytes sent, 0 on timeout, or throws runtime_error. - */ - ssize_t send(const void* data, size_t size, int timeout_ms=0); - - /** Receive data from the socket. - * @param data The buffer that will receive data. - * @param size The buffer size. - * @return number of bytes received or -1 (SOCKET_ERROR) if error - */ - ssize_t recv(void* data, size_t size); - - void listen(void); - TcpSocket accept(void); - - /* Returns either valid socket if a connection was - * accepted before the timeout expired, or an invalid - * socket otherwise. - */ - TcpSocket accept(int timeout_ms); - - /** Retrieve address this socket is bound to */ - InetAddress getOwnAddress() const; - InetAddress getRemoteAddress() const; - - private: - TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); - - /// The address on which the socket is bound. - InetAddress m_own_address; - InetAddress m_remote_address; - /// The low-level socket used by system functions. - SOCKET m_sock; -}; - -/* Helper class for TCPDataDispatcher, contains a queue of pending data and - * a sender thread. */ -class TCPConnection -{ - public: - TCPConnection(TcpSocket&& sock); - TCPConnection(const TCPConnection&) = delete; - TCPConnection& operator=(const TCPConnection&) = delete; - ~TCPConnection(); - - ThreadsafeQueue<std::vector<uint8_t> > queue; - - private: - std::atomic<bool> m_running; - std::thread m_sender_thread; - TcpSocket m_sock; - - void process(void); -}; - -/* Send a TCP stream to several destinations, and automatically disconnect destinations - * whose buffer overflows. - */ -class TCPDataDispatcher -{ - public: - TCPDataDispatcher(size_t max_queue_size); - ~TCPDataDispatcher(); - TCPDataDispatcher(const TCPDataDispatcher&) = delete; - TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; - - void start(int port, const std::string& address); - void write(const std::vector<uint8_t>& data); - - private: - void process(void); - - size_t m_max_queue_size; - - std::atomic<bool> m_running; - std::thread m_listener_thread; - TcpSocket m_listener_socket; - std::list<TCPConnection> m_connections; -}; - -#endif // _TCPSOCKET diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ab287b2..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - An implementation for a threadsafe queue, depends on C++11 - - When creating a ThreadsafeQueue, one can specify the minimal number - of elements it must contain before it is possible to take one - element out. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <mutex> -#include <condition_variable> -#include <queue> -#include <utility> - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template<typename T> -class ThreadsafeQueue -{ -public: - /* Push one element into the queue, and notify another thread that - * might be waiting. - * - * returns the new queue size. - */ - size_t push(T const& val) - { - std::unique_lock<std::mutex> lock(the_mutex); - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - size_t push(T&& val) - { - std::unique_lock<std::mutex> lock(the_mutex); - the_queue.emplace(std::move(val)); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Push one element into the queue, but wait until the - * queue size goes below the threshold. - * - * Notify waiting thread. - * - * returns the new queue size. - */ - size_t push_wait_if_full(T const& val, size_t threshold) - { - std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() >= threshold) { - the_tx_notification.wait(lock); - } - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Trigger a wakeup event on a blocking consumer, which - * will receive a ThreadsafeQueueWakeup exception. - */ - void trigger_wakeup(void) - { - std::unique_lock<std::mutex> lock(the_mutex); - wakeup_requested = true; - lock.unlock(); - the_rx_notification.notify_one(); - } - - /* Send a notification for the receiver thread */ - void notify(void) - { - the_rx_notification.notify_one(); - } - - bool empty() const - { - std::unique_lock<std::mutex> lock(the_mutex); - return the_queue.empty(); - } - - size_t size() const - { - std::unique_lock<std::mutex> lock(the_mutex); - return the_queue.size(); - } - - bool try_pop(T& popped_value) - { - std::unique_lock<std::mutex> lock(the_mutex); - if (the_queue.empty()) { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - - return true; - } - - void wait_and_pop(T& popped_value, size_t prebuffering = 1) - { - std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() < prebuffering and - not wakeup_requested) { - the_rx_notification.wait(lock); - } - - if (wakeup_requested) { - wakeup_requested = false; - throw ThreadsafeQueueWakeup(); - } - else { - std::swap(popped_value, the_queue.front()); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - } - } - -private: - std::queue<T> the_queue; - mutable std::mutex the_mutex; - std::condition_variable the_rx_notification; - std::condition_variable the_tx_notification; - bool wakeup_requested = false; -}; - diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp deleted file mode 100644 index 3d015ec..0000000 --- a/src/UdpSocket.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "UdpSocket.h" - -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <fcntl.h> -#include <string.h> - -using namespace std; - -UdpSocket::UdpSocket() : - listenSocket(INVALID_SOCKET) -{ - reinit(0, ""); -} - -UdpSocket::UdpSocket(int port) : - listenSocket(INVALID_SOCKET) -{ - reinit(port, ""); -} - -UdpSocket::UdpSocket(int port, const std::string& name) : - listenSocket(INVALID_SOCKET) -{ - reinit(port, name); -} - - -int UdpSocket::setBlocking(bool block) -{ - int res; - if (block) - res = fcntl(listenSocket, F_SETFL, 0); - else - res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); - if (res == SOCKET_ERROR) { - setInetError("Can't change blocking state of socket"); - return -1; - } - return 0; -} - -int UdpSocket::reinit(int port, const std::string& name) -{ - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); - } - - if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { - setInetError("Can't create socket"); - return -1; - } - reuseopt_t reuse = 1; - if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - setInetError("Can't reuse address"); - return -1; - } - - if (port) { - address.setAddress(name); - address.setPort(port); - - if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - setInetError("Can't bind socket"); - ::close(listenSocket); - listenSocket = INVALID_SOCKET; - return -1; - } - } - return 0; -} - -int UdpSocket::close() -{ - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); - } - - listenSocket = INVALID_SOCKET; - - return 0; -} - -UdpSocket::~UdpSocket() -{ - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); - } -} - - -int UdpSocket::receive(UdpPacket& packet) -{ - socklen_t addrSize; - addrSize = sizeof(*packet.getAddress().getAddress()); - ssize_t ret = recvfrom(listenSocket, - packet.getData(), - packet.getSize(), - 0, - packet.getAddress().getAddress(), - &addrSize); - - if (ret == SOCKET_ERROR) { - packet.setSize(0); - if (errno == EAGAIN) { - return 0; - } - setInetError("Can't receive UDP packet"); - return -1; - } - - packet.setSize(ret); - return 0; -} - -int UdpSocket::send(UdpPacket& packet) -{ - int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0, - packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); - if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; -} - - -int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) -{ - int ret = sendto(listenSocket, &data[0], data.size(), 0, - destination.getAddress(), sizeof(*destination.getAddress())); - if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; -} - - -/** - * Must be called to receive data on a multicast address. - * @param groupname The multica -st address to join. - * @return 0 if ok, -1 if error - */ -int UdpSocket::joinGroup(char* groupname) -{ - ip_mreqn group; - if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { - setInetError(groupname); - return -1; - } - if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - setInetError("Not a multicast address"); - return -1; - } - group.imr_address.s_addr = htons(INADDR_ANY);; - group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - } - return 0; -} - -int UdpSocket::setMulticastTTL(int ttl) -{ - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) - == SOCKET_ERROR) { - setInetError("Can't set ttl"); - return -1; - } - - return 0; -} - -int UdpSocket::setMulticastSource(const char* source_addr) -{ - struct in_addr addr; - if (inet_aton(source_addr, &addr) == 0) { - setInetError("Can't parse source address"); - return -1; - } - - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) - == SOCKET_ERROR) { - setInetError("Can't set source address"); - return -1; - } - - return 0; -} - -UdpPacket::UdpPacket() { } - -UdpPacket::UdpPacket(size_t initSize) : - m_buffer(initSize) -{ } - - -void UdpPacket::setSize(size_t newSize) -{ - m_buffer.resize(newSize); -} - - -uint8_t* UdpPacket::getData() -{ - return &m_buffer[0]; -} - - -void UdpPacket::addData(const void *data, size_t size) -{ - uint8_t *d = (uint8_t*)data; - std::copy(d, d + size, std::back_inserter(m_buffer)); -} - -size_t UdpPacket::getSize() -{ - return m_buffer.size(); -} - -InetAddress UdpPacket::getAddress() -{ - return address; -} - diff --git a/src/UdpSocket.h b/src/UdpSocket.h deleted file mode 100644 index f51e87c..0000000 --- a/src/UdpSocket.h +++ /dev/null @@ -1,174 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET int -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#define reuseopt_t int - -#include <stdlib.h> -#include <iostream> -#include <vector> - -class UdpPacket; - - -/** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - */ -class UdpSocket -{ - public: - /** Create a new socket that will not be bound to any port. To be used - * for data output. - */ - UdpSocket(); - /** Create a new socket. - * @param port The port number on which the socket will be bound - */ - UdpSocket(int port); - /** Create a new socket. - * @param port The port number on which the socket will be bound - * @param name The IP address on which the socket will be bound. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ - UdpSocket(int port, const std::string& name); - ~UdpSocket(); - UdpSocket(const UdpSocket& other) = delete; - const UdpSocket& operator=(const UdpSocket& other) = delete; - - /** reinitialise socket. Close the already open socket, and - * create a new one - */ - int reinit(int port, const std::string& name); - - /** Close the socket - */ - int close(void); - - /** Send an UDP packet. - * @param packet The UDP packet to be sent. It includes the data and the - * destination address - * return 0 if ok, -1 if error - */ - int send(UdpPacket& packet); - - /** Send an UDP packet - * - * return 0 if ok, -1 if error - */ - int send(const std::vector<uint8_t>& data, InetAddress destination); - - /** Receive an UDP packet. - * @param packet The packet that will receive the data. The address will be set - * to the source address. - * @return 0 if ok, -1 if error - */ - int receive(UdpPacket& packet); - - int joinGroup(char* groupname); - int setMulticastSource(const char* source_addr); - int setMulticastTTL(int ttl); - - /** Set blocking mode. By default, the socket is blocking. - * @return 0 if ok - * -1 if error - */ - int setBlocking(bool block); - - protected: - - /// The address on which the socket is bound. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; -}; - -/** This class represents a UDP packet. - * - * A UDP packet contains a payload (sequence of bytes) and an address. For - * outgoing packets, the address is the destination address. For incoming - * packets, the address tells the user from what source the packet arrived from. - */ -class UdpPacket -{ - public: - /** Construct an empty UDP packet. - */ - UdpPacket(); - UdpPacket(size_t initSize); - - /** Give the pointer to data. - * @return The pointer - */ - uint8_t* getData(void); - - /** Append some data at the end of data buffer and adjust size. - * @param data Pointer to the data to add - * @param size Size in bytes of new data - */ - void addData(const void *data, size_t size); - - size_t getSize(void); - - /** Changes size of the data buffer size. Keeps data intact unless - * truncated. - */ - void setSize(size_t newSize); - - /** Returns the UDP address of the packet. - */ - InetAddress getAddress(void); - - const std::vector<uint8_t>& getBuffer(void) const { - return m_buffer; - } - - - private: - std::vector<uint8_t> m_buffer; - InetAddress address; -}; - diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@ #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h" #include "Log.h" #include "string.h" #include <stdexcept> @@ -57,6 +56,8 @@ class DabOutput { return Open(name.c_str()); } + + // Return -1 on failure virtual int Write(void* buffer, int size) = 0; virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput class DabOutputUdp : public DabOutput { public: - DabOutputUdp() { - packet_ = new UdpPacket(6144); - socket_ = new UdpSocket(); - } - - virtual ~DabOutputUdp() { - delete socket_; - delete packet_; - } + DabOutputUdp(); int Open(const char* name); int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput DabOutputUdp operator=(const DabOutputUdp& other) = delete; std::string uri_; - UdpSocket* socket_; - UdpPacket* packet_; + Socket::UDPSocket socket_; + Socket::UDPPacket packet_; }; // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput private: std::string uri_; - std::shared_ptr<TCPDataDispatcher> dispatcher_; + std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_; }; // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); + dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); dispatcher_->start(port, address); } else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@ #include <cstdio> #include <limits.h> #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -# include <fscfg.h> -# include <sdci.h> -#else -# include <netinet/in.h> -# include <sys/types.h> -# include <sys/socket.h> -# include <sys/ioctl.h> -# include <net/if_arp.h> -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : + socket_(), + packet_(6144) +{ } int DabOutputUdp::Open(const char* name) { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string address = what[1]; - if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { - etiLog.level(error) << "can't set address " << - address << "(" << inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string port_str = what[2]; long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name) return -1; } - this->packet_->getAddress().setPort(port); + packet_.address.resolveUdpDestination(address, port); string query_params = what[3]; smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string src = query_what[1]; - int err = socket_->setMulticastSource(src.c_str()); - if (err) { - etiLog.level(error) << "UDP output socket set source failed!"; - return -1; - } + try { + socket_.setMulticastSource(src.c_str()); - string ttl_str = query_what[2]; + string ttl_str = query_what[2]; - if (not ttl_str.empty()) { - long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); - if ((ttl <= 0) || (ttl >= 255)) { - etiLog.level(error) << "Invalid TTL setting in " << - uri_without_proto; - return -1; - } + if (not ttl_str.empty()) { + long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); + if ((ttl <= 0) || (ttl >= 255)) { + etiLog.level(error) << "Invalid TTL setting in " << + uri_without_proto; + return -1; + } - err = socket_->setMulticastTTL(ttl); - if (err) { - etiLog.level(error) << "UDP output socket set TTL failed!"; - return -1; + socket_.setMulticastTTL(ttl); } } + catch (const std::runtime_error& e) { + etiLog.level(error) << "Failed to set UDP output settings" << e.what(); + } } else if (not query_params.empty()) { etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setSize(0); - this->packet_->addData(buffer, size); - return this->socket_->send(*this->packet_); + const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer); + packet_.buffer.resize(0); + std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); + socket_.send(packet_); + return 0; } #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t { }; // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t { unsigned int listen_port = 0; size_t max_frames_queued = 1024; }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { + std::string dest_addr; + unsigned int dest_port = 0; + size_t max_frames_queued = 1024; +}; + struct configuration_t { unsigned chunk_len = 207; // RSk, data length of each chunk unsigned fec = 0; // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) #if 0 fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", - m_pseq, findex, fcount, plen & ~0x8000); + m_pseq, findex, fcount, plen & ~0xC000); #endif } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const } etiLog.level(info) << " source port " << udp_dest->source_port; } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { + etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (interleaver_enabled()) { @@ -69,28 +73,27 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port); + auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = udp_socket->setMulticastTTL(udp_dest->ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } + udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket->setMulticastTTL(udp_dest->ttl); } udp_sockets.emplace(udp_dest.get(), udp_socket); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { - auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued); + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { + auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { + auto tcp_socket = make_shared<Socket::TCPSocket>(); + tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_socket); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } @@ -117,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", edi_fragments.size()); } @@ -129,28 +132,30 @@ void Sender::write(const TagPacket& tagpacket) for (const auto& edi_frag : edi_fragments) { for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); } } if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu", + fprintf(stderr, "EDI number of PFT fragments %zu\n", edi_fragments.size()); } } @@ -158,23 +163,25 @@ void Sender::write(const TagPacket& tagpacket) // Send over ethernet for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(af_packet, addr); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(af_packet.begin(), af_packet.end(), debug_iterator); + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); } } } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 7b0a0db..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -32,11 +32,12 @@ #include "AFPacket.h" #include "PFT.h" #include "Interleaver.h" +#include "Socket.h" #include <vector> #include <unordered_map> #include <stdexcept> +#include <fstream> #include <cstdint> -#include "dabOutput/dabOutput.h" namespace edi { @@ -61,8 +62,9 @@ class Sender { // To mitigate for burst packet loss, PFT fragments can be sent out-of-order edi::Interleaver edi_interleaver; - std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets; - std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers; + std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; + std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; + std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders; }; } diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint) throw out_of_range("can't use port number 0 in udp address"); } - if (m_sock.reinit(port, address) == -1) { - stringstream ss; - ss << "Could not init UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } - - if (m_sock.setBlocking(false) == -1) { - stringstream ss; - ss << "Could not set non-blocking UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + m_sock.reinit(port, address); + m_sock.setBlocking(false); etiLog.level(info) << "Opened UDP port " << address << ":" << port; } @@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint) int Udp::readFrame(uint8_t* buffer, size_t size) { // Regardless of buffer contents, try receiving data. - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - std::copy(packet.getData(), packet.getData() + packet.getSize(), - back_inserter(m_buffer)); + std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer)); // Take data from the buffer if it contains enough data, // in any case write the buffer @@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate) int Udp::close() { - return m_sock.close(); + m_sock.close(); + return 0; } @@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf) int Sti_d_Rtp::open(const std::string& name) { - // Skip the sti-rtp:// part if it is present - const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? + // Skip the rtp:// part if it is present + const string endpoint = (name.substr(0, 10) == "rtp://") ? name.substr(10) : name; // The endpoint should be address:port @@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name) if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << - " is an invalid format for sti-rtp address: " - "expected [sti-rtp://]address:port"; + " is an invalid format for rtp address: " + "expected [rtp://]address:port"; throw invalid_argument(ss.str()); } @@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name) void Sti_d_Rtp::receive_packet() { - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - if (packet.getSize() == 0) { + if (packet.buffer.empty()) { // No packet was received return; } const size_t STI_FC_LEN = 8; - if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { + if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { etiLog.level(info) << "Received too small RTP packet for " << m_name; return; } - if (not rtpHeaderValid(packet.getData())) { + if (not rtpHeaderValid(packet.buffer.data())) { etiLog.level(info) << "Received invalid RTP header for " << m_name; return; @@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet() // STI(PI, X) size_t index = RTP_HEADER_LEN; - const uint8_t *buf = packet.getData(); + const uint8_t *buf = packet.buffer.data(); // SYNC index++; // Advance over STAT @@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet() m_name; return; } - if (packet.getSize() < index + DFS) { + if (packet.buffer.size() < index + DFS) { etiLog.level(info) << "Received STI too small for given DFS for " << m_name; return; @@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet() uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits index += 2; - if (packet.getSize() < index + 4*NST) { + if (packet.buffer.size() < index + 4*NST) { etiLog.level(info) << "Received STI too small to contain NST for " << - m_name << " packet: " << packet.getSize() << " need " << + m_name << " packet: " << packet.buffer.size() << " need " << index + 4*NST; return; } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..dd637c6 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@ #include <deque> #include <boost/thread.hpp> #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h" namespace Inputs { @@ -46,7 +46,7 @@ class Udp : public InputBase { virtual int close(); protected: - UdpSocket m_sock; + Socket::UDPSocket m_sock; std::string m_name; void openUdpSocket(const std::string& endpoint); |