From 43f4a3a2a695c303bd4fdfbd7fec6def29284f2e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 28 May 2019 16:56:43 +0200 Subject: Unify Socket abstractions --- src/DabMultiplexer.h | 3 +- src/DabMux.cpp | 3 +- src/InetAddress.cpp | 155 ----------------- src/InetAddress.h | 78 --------- src/TcpSocket.cpp | 359 ---------------------------------------- src/TcpSocket.h | 164 ------------------ src/UdpSocket.cpp | 256 ---------------------------- src/UdpSocket.h | 174 ------------------- src/dabOutput/dabOutput.h | 21 +-- src/dabOutput/dabOutputTcp.cpp | 2 +- src/dabOutput/dabOutputUdp.cpp | 65 +++----- src/dabOutput/edi/Transport.cpp | 24 +-- src/dabOutput/edi/Transport.h | 6 +- src/input/Udp.cpp | 51 ++---- src/input/Udp.h | 4 +- 15 files changed, 63 insertions(+), 1302 deletions(-) delete mode 100644 src/InetAddress.cpp delete mode 100644 src/InetAddress.h delete mode 100644 src/TcpSocket.cpp delete mode 100644 src/TcpSocket.h delete mode 100644 src/UdpSocket.cpp delete mode 100644 src/UdpSocket.h (limited to 'src') diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 386c23c..d1075a6 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -37,8 +37,7 @@ #include "fig/FIGCarousel.h" #include "crc.h" #include "utils.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h" #include "PcDebug.h" #include "MuxElements.h" #include "RemoteControl.h" diff --git a/src/DabMux.cpp b/src/DabMux.cpp index b9ee9fd..d749ed3 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -99,8 +99,7 @@ typedef DWORD32 uint32_t; #include "dabOutput/dabOutput.h" #include "crc.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h" #include "PcDebug.h" #include "DabMux.h" #include "MuxElements.h" diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp deleted file mode 100644 index 7660263..0000000 --- a/src/InetAddress.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "InetAddress.h" -#include -#include -#include -#include - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <h_addr); - } else { - addr.sin_addr.s_addr = htons(INADDR_ANY); - inetErrNo = 0; - inetErrMsg = "Could not find address"; - inetErrDesc = name.c_str(); - return -1; - } - } - } - else { - addr.sin_addr.s_addr = INADDR_ANY; - } - return 0; -} - - -void setInetError(const char* description) -{ - inetErrNo = 0; - inetErrNo = errno; - inetErrMsg = strerror(inetErrNo); - inetErrDesc = description; -} - diff --git a/src/InetAddress.h b/src/InetAddress.h deleted file mode 100644 index e246d4c..0000000 --- a/src/InetAddress.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef _InetAddress -#define _InetAddress - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include -#include -#include -#include -#include -#include -#include -#include - -#define SOCKET int -#define INVALID_SOCKET -1 -#define INVALID_PORT -1 - - -/// The last error number -extern int inetErrNo; -/// The last error message -extern const char *inetErrMsg; -/// The description of the last error -extern const char *inetErrDesc; -/// Set the number, message and description of the last error -void setInetError(const char* description); - - -/** - * This class represents an Internet Protocol (IP) address. - * @author Pascal Charest pascal.charest@crc.ca - */ -class InetAddress { - public: - InetAddress(int port = 0, const char* name = NULL); - - sockaddr *getAddress(); - const char *getHostAddress(); - int getPort(); - int setAddress(const std::string& name); - void setPort(int port); - bool isMulticastAddress(); - - private: - sockaddr_in addr; -}; - - -#endif diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp deleted file mode 100644 index 3ebe73c..0000000 --- a/src/TcpSocket.cpp +++ /dev/null @@ -1,359 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "TcpSocket.h" -#include "Log.h" -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace std; - -using vec_u8 = std::vector; - - -TcpSocket::TcpSocket() : - m_sock(INVALID_SOCKET) -{ -} - -TcpSocket::TcpSocket(int port, const string& name) : - m_sock(INVALID_SOCKET) -{ - if (port) { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { - throw std::runtime_error("Can't create socket"); - } - - reuseopt_t reuse = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - throw std::runtime_error("Can't reuse address"); - } - -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) - == SOCKET_ERROR) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } -#endif - - m_own_address.setAddress(name); - m_own_address.setPort(port); - - if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - ::close(m_sock); - m_sock = INVALID_SOCKET; - throw std::runtime_error("Can't bind socket"); - } - } -} - -TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) : - m_own_address(own), - m_remote_address(remote), - m_sock(sock) { } - -// The move constructors must ensure the moved-from -// TcpSocket won't destroy our socket handle -TcpSocket::TcpSocket(TcpSocket&& other) -{ - m_sock = other.m_sock; - other.m_sock = INVALID_SOCKET; - - m_own_address = other.m_own_address; - m_remote_address = other.m_remote_address; -} - -TcpSocket& TcpSocket::operator=(TcpSocket&& other) -{ - m_sock = other.m_sock; - other.m_sock = INVALID_SOCKET; - - m_own_address = other.m_own_address; - m_remote_address = other.m_remote_address; - return *this; -} - -/** - * Close the underlying socket. - * @return 0 if ok - * -1 if error - */ -int TcpSocket::close() -{ - if (m_sock != INVALID_SOCKET) { - int res = ::close(m_sock); - if (res != 0) { - setInetError("Can't close socket"); - return -1; - } - m_sock = INVALID_SOCKET; - } - return 0; -} - -TcpSocket::~TcpSocket() -{ - close(); -} - -bool TcpSocket::isValid() -{ - return m_sock != INVALID_SOCKET; -} - -ssize_t TcpSocket::recv(void* data, size_t size) -{ - ssize_t ret = ::recv(m_sock, (char*)data, size, 0); - if (ret == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket recv error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - return ret; -} - - -ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) -{ - if (timeout_ms) { - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLOUT; - - const int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - stringstream ss; - ss << "TCP Socket send error on poll(): " << strerror(errno); - throw std::runtime_error(ss.str()); - } - else if (retval == 0) { - // Timed out - return 0; - } - } - - /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not - * receive a SIGPIPE and die. - * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) - const int flags = MSG_NOSIGNAL; -#else - const int flags = 0; -#endif - const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); - - if (ret == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket send error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - return ret; -} - -void TcpSocket::listen() -{ - if (::listen(m_sock, 1) == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket listen error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } -} - -TcpSocket TcpSocket::accept() -{ - InetAddress remote_addr; - socklen_t addrLen = sizeof(sockaddr_in); - - SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); - if (socket == SOCKET_ERROR) { - stringstream ss; - ss << "TCP Socket accept error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - else { - TcpSocket client(socket, m_own_address, remote_addr); - return client; - } -} - -TcpSocket TcpSocket::accept(int timeout_ms) -{ - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN | POLLOUT; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - stringstream ss; - ss << "TCP Socket accept error: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - else if (retval) { - return accept(); - } - else { - TcpSocket invalidsock(0, ""); - return invalidsock; - } -} - - -InetAddress TcpSocket::getOwnAddress() const -{ - return m_own_address; -} - -InetAddress TcpSocket::getRemoteAddress() const -{ - return m_remote_address; -} - - -TCPConnection::TCPConnection(TcpSocket&& sock) : - queue(), - m_running(true), - m_sender_thread(), - m_sock(move(sock)) -{ - auto own_addr = m_sock.getOwnAddress(); - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "New TCP Connection on port " << - own_addr.getPort() << " from " << - addr.getHostAddress() << ":" << addr.getPort(); - m_sender_thread = std::thread(&TCPConnection::process, this); -} - -TCPConnection::~TCPConnection() -{ - m_running = false; - vec_u8 termination_marker; - queue.push(termination_marker); - m_sender_thread.join(); -} - -void TCPConnection::process() -{ - while (m_running) { - vec_u8 data; - queue.wait_and_pop(data); - - if (data.empty()) { - // empty vector is the termination marker - m_running = false; - break; - } - - try { - ssize_t remaining = data.size(); - const uint8_t *buf = reinterpret_cast(data.data()); - const int timeout_ms = 10; // Less than one ETI frame - - while (m_running and remaining > 0) { - const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); - if (sent < 0 or sent > remaining) { - throw std::logic_error("Invalid TcpSocket::send() return value"); - } - remaining -= sent; - buf += sent; - } - } - catch (const std::runtime_error& e) { - m_running = false; - } - } - - - auto own_addr = m_sock.getOwnAddress(); - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "Dropping TCP Connection on port " << - own_addr.getPort() << " from " << - addr.getHostAddress() << ":" << addr.getPort(); -} - - -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : - m_max_queue_size(max_queue_size) -{ -} - -TCPDataDispatcher::~TCPDataDispatcher() -{ - m_running = false; - m_connections.clear(); - m_listener_socket.close(); - m_listener_thread.join(); -} - -void TCPDataDispatcher::start(int port, const string& address) -{ - TcpSocket sock(port, address); - m_listener_socket = move(sock); - - m_running = true; - m_listener_thread = std::thread(&TCPDataDispatcher::process, this); -} - -void TCPDataDispatcher::write(const vec_u8& data) -{ - for (auto& connection : m_connections) { - connection.queue.push(data); - } - - m_connections.remove_if( - [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); -} - -void TCPDataDispatcher::process() -{ - try { - m_listener_socket.listen(); - - const int timeout_ms = 1000; - - while (m_running) { - // Add a new TCPConnection to the list, constructing it from the client socket - auto sock = m_listener_socket.accept(timeout_ms); - if (sock.isValid()) { - m_connections.emplace(m_connections.begin(), move(sock)); - } - } - } - catch (const std::runtime_error& e) { - etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); - m_running = false; - } -} - diff --git a/src/TcpSocket.h b/src/TcpSocket.h deleted file mode 100644 index ec7afd3..0000000 --- a/src/TcpSocket.h +++ /dev/null @@ -1,164 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef _TCPSOCKET -#define _TCPSOCKET - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#include "ThreadsafeQueue.h" -#include -#include -#include -#include -#include -#include -#define SOCKET int -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#define reuseopt_t int - -#include -#include -#include -#include -#include -#include -#include - -/** - * This class represents a TCP socket. - */ -class TcpSocket -{ - public: - /** Create a new socket that does nothing */ - TcpSocket(); - - /** Create a new socket listening for incoming connections. - * @param port The port number on which the socket will listen. - * @param name The IP address on which the socket will be bound. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ - TcpSocket(int port, const std::string& name); - ~TcpSocket(); - TcpSocket(TcpSocket&& other); - TcpSocket& operator=(TcpSocket&& other); - TcpSocket(const TcpSocket& other) = delete; - TcpSocket& operator=(const TcpSocket& other) = delete; - - bool isValid(void); - - int close(void); - - /** Send data over the TCP connection. - * @param data The buffer that will be sent. - * @param size Number of bytes to send. - * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout - * return number of bytes sent, 0 on timeout, or throws runtime_error. - */ - ssize_t send(const void* data, size_t size, int timeout_ms=0); - - /** Receive data from the socket. - * @param data The buffer that will receive data. - * @param size The buffer size. - * @return number of bytes received or -1 (SOCKET_ERROR) if error - */ - ssize_t recv(void* data, size_t size); - - void listen(void); - TcpSocket accept(void); - - /* Returns either valid socket if a connection was - * accepted before the timeout expired, or an invalid - * socket otherwise. - */ - TcpSocket accept(int timeout_ms); - - /** Retrieve address this socket is bound to */ - InetAddress getOwnAddress() const; - InetAddress getRemoteAddress() const; - - private: - TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); - - /// The address on which the socket is bound. - InetAddress m_own_address; - InetAddress m_remote_address; - /// The low-level socket used by system functions. - SOCKET m_sock; -}; - -/* Helper class for TCPDataDispatcher, contains a queue of pending data and - * a sender thread. */ -class TCPConnection -{ - public: - TCPConnection(TcpSocket&& sock); - TCPConnection(const TCPConnection&) = delete; - TCPConnection& operator=(const TCPConnection&) = delete; - ~TCPConnection(); - - ThreadsafeQueue > queue; - - private: - std::atomic m_running; - std::thread m_sender_thread; - TcpSocket m_sock; - - void process(void); -}; - -/* Send a TCP stream to several destinations, and automatically disconnect destinations - * whose buffer overflows. - */ -class TCPDataDispatcher -{ - public: - TCPDataDispatcher(size_t max_queue_size); - ~TCPDataDispatcher(); - TCPDataDispatcher(const TCPDataDispatcher&) = delete; - TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; - - void start(int port, const std::string& address); - void write(const std::vector& data); - - private: - void process(void); - - size_t m_max_queue_size; - - std::atomic m_running; - std::thread m_listener_thread; - TcpSocket m_listener_socket; - std::list m_connections; -}; - -#endif // _TCPSOCKET diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp deleted file mode 100644 index 3d015ec..0000000 --- a/src/UdpSocket.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "UdpSocket.h" - -#include -#include -#include -#include -#include - -using namespace std; - -UdpSocket::UdpSocket() : - listenSocket(INVALID_SOCKET) -{ - reinit(0, ""); -} - -UdpSocket::UdpSocket(int port) : - listenSocket(INVALID_SOCKET) -{ - reinit(port, ""); -} - -UdpSocket::UdpSocket(int port, const std::string& name) : - listenSocket(INVALID_SOCKET) -{ - reinit(port, name); -} - - -int UdpSocket::setBlocking(bool block) -{ - int res; - if (block) - res = fcntl(listenSocket, F_SETFL, 0); - else - res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); - if (res == SOCKET_ERROR) { - setInetError("Can't change blocking state of socket"); - return -1; - } - return 0; -} - -int UdpSocket::reinit(int port, const std::string& name) -{ - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); - } - - if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { - setInetError("Can't create socket"); - return -1; - } - reuseopt_t reuse = 1; - if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - setInetError("Can't reuse address"); - return -1; - } - - if (port) { - address.setAddress(name); - address.setPort(port); - - if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - setInetError("Can't bind socket"); - ::close(listenSocket); - listenSocket = INVALID_SOCKET; - return -1; - } - } - return 0; -} - -int UdpSocket::close() -{ - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); - } - - listenSocket = INVALID_SOCKET; - - return 0; -} - -UdpSocket::~UdpSocket() -{ - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); - } -} - - -int UdpSocket::receive(UdpPacket& packet) -{ - socklen_t addrSize; - addrSize = sizeof(*packet.getAddress().getAddress()); - ssize_t ret = recvfrom(listenSocket, - packet.getData(), - packet.getSize(), - 0, - packet.getAddress().getAddress(), - &addrSize); - - if (ret == SOCKET_ERROR) { - packet.setSize(0); - if (errno == EAGAIN) { - return 0; - } - setInetError("Can't receive UDP packet"); - return -1; - } - - packet.setSize(ret); - return 0; -} - -int UdpSocket::send(UdpPacket& packet) -{ - int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0, - packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); - if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; -} - - -int UdpSocket::send(const std::vector& data, InetAddress destination) -{ - int ret = sendto(listenSocket, &data[0], data.size(), 0, - destination.getAddress(), sizeof(*destination.getAddress())); - if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; -} - - -/** - * Must be called to receive data on a multicast address. - * @param groupname The multica -st address to join. - * @return 0 if ok, -1 if error - */ -int UdpSocket::joinGroup(char* groupname) -{ - ip_mreqn group; - if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { - setInetError(groupname); - return -1; - } - if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - setInetError("Not a multicast address"); - return -1; - } - group.imr_address.s_addr = htons(INADDR_ANY);; - group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - } - return 0; -} - -int UdpSocket::setMulticastTTL(int ttl) -{ - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) - == SOCKET_ERROR) { - setInetError("Can't set ttl"); - return -1; - } - - return 0; -} - -int UdpSocket::setMulticastSource(const char* source_addr) -{ - struct in_addr addr; - if (inet_aton(source_addr, &addr) == 0) { - setInetError("Can't parse source address"); - return -1; - } - - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) - == SOCKET_ERROR) { - setInetError("Can't set source address"); - return -1; - } - - return 0; -} - -UdpPacket::UdpPacket() { } - -UdpPacket::UdpPacket(size_t initSize) : - m_buffer(initSize) -{ } - - -void UdpPacket::setSize(size_t newSize) -{ - m_buffer.resize(newSize); -} - - -uint8_t* UdpPacket::getData() -{ - return &m_buffer[0]; -} - - -void UdpPacket::addData(const void *data, size_t size) -{ - uint8_t *d = (uint8_t*)data; - std::copy(d, d + size, std::back_inserter(m_buffer)); -} - -size_t UdpPacket::getSize() -{ - return m_buffer.size(); -} - -InetAddress UdpPacket::getAddress() -{ - return address; -} - diff --git a/src/UdpSocket.h b/src/UdpSocket.h deleted file mode 100644 index f51e87c..0000000 --- a/src/UdpSocket.h +++ /dev/null @@ -1,174 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#include -#include -#include -#include -#include -#include -#define SOCKET int -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#define reuseopt_t int - -#include -#include -#include - -class UdpPacket; - - -/** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - */ -class UdpSocket -{ - public: - /** Create a new socket that will not be bound to any port. To be used - * for data output. - */ - UdpSocket(); - /** Create a new socket. - * @param port The port number on which the socket will be bound - */ - UdpSocket(int port); - /** Create a new socket. - * @param port The port number on which the socket will be bound - * @param name The IP address on which the socket will be bound. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ - UdpSocket(int port, const std::string& name); - ~UdpSocket(); - UdpSocket(const UdpSocket& other) = delete; - const UdpSocket& operator=(const UdpSocket& other) = delete; - - /** reinitialise socket. Close the already open socket, and - * create a new one - */ - int reinit(int port, const std::string& name); - - /** Close the socket - */ - int close(void); - - /** Send an UDP packet. - * @param packet The UDP packet to be sent. It includes the data and the - * destination address - * return 0 if ok, -1 if error - */ - int send(UdpPacket& packet); - - /** Send an UDP packet - * - * return 0 if ok, -1 if error - */ - int send(const std::vector& data, InetAddress destination); - - /** Receive an UDP packet. - * @param packet The packet that will receive the data. The address will be set - * to the source address. - * @return 0 if ok, -1 if error - */ - int receive(UdpPacket& packet); - - int joinGroup(char* groupname); - int setMulticastSource(const char* source_addr); - int setMulticastTTL(int ttl); - - /** Set blocking mode. By default, the socket is blocking. - * @return 0 if ok - * -1 if error - */ - int setBlocking(bool block); - - protected: - - /// The address on which the socket is bound. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; -}; - -/** This class represents a UDP packet. - * - * A UDP packet contains a payload (sequence of bytes) and an address. For - * outgoing packets, the address is the destination address. For incoming - * packets, the address tells the user from what source the packet arrived from. - */ -class UdpPacket -{ - public: - /** Construct an empty UDP packet. - */ - UdpPacket(); - UdpPacket(size_t initSize); - - /** Give the pointer to data. - * @return The pointer - */ - uint8_t* getData(void); - - /** Append some data at the end of data buffer and adjust size. - * @param data Pointer to the data to add - * @param size Size in bytes of new data - */ - void addData(const void *data, size_t size); - - size_t getSize(void); - - /** Changes size of the data buffer size. Keeps data intact unless - * truncated. - */ - void setSize(size_t newSize); - - /** Returns the UDP address of the packet. - */ - InetAddress getAddress(void); - - const std::vector& getBuffer(void) const { - return m_buffer; - } - - - private: - std::vector m_buffer; - InetAddress address; -}; - diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@ #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h" #include "Log.h" #include "string.h" #include @@ -57,6 +56,8 @@ class DabOutput { return Open(name.c_str()); } + + // Return -1 on failure virtual int Write(void* buffer, int size) = 0; virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput class DabOutputUdp : public DabOutput { public: - DabOutputUdp() { - packet_ = new UdpPacket(6144); - socket_ = new UdpSocket(); - } - - virtual ~DabOutputUdp() { - delete socket_; - delete packet_; - } + DabOutputUdp(); int Open(const char* name); int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput DabOutputUdp operator=(const DabOutputUdp& other) = delete; std::string uri_; - UdpSocket* socket_; - UdpPacket* packet_; + Socket::UDPSocket socket_; + Socket::UDPPacket packet_; }; // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput private: std::string uri_; - std::shared_ptr dispatcher_; + std::shared_ptr dispatcher_; }; // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared(MAX_QUEUED_ETI_FRAMES); + dispatcher_ = make_shared(MAX_QUEUED_ETI_FRAMES); dispatcher_->start(port, address); } else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@ #include #include #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -# include -# include -#else -# include -# include -# include -# include -# include -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : + socket_(), + packet_(6144) +{ } int DabOutputUdp::Open(const char* name) { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string address = what[1]; - if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { - etiLog.level(error) << "can't set address " << - address << "(" << inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string port_str = what[2]; long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name) return -1; } - this->packet_->getAddress().setPort(port); + packet_.address.resolveUdpDestination(address, port); string query_params = what[3]; smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string src = query_what[1]; - int err = socket_->setMulticastSource(src.c_str()); - if (err) { - etiLog.level(error) << "UDP output socket set source failed!"; - return -1; - } + try { + socket_.setMulticastSource(src.c_str()); - string ttl_str = query_what[2]; + string ttl_str = query_what[2]; - if (not ttl_str.empty()) { - long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); - if ((ttl <= 0) || (ttl >= 255)) { - etiLog.level(error) << "Invalid TTL setting in " << - uri_without_proto; - return -1; - } + if (not ttl_str.empty()) { + long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); + if ((ttl <= 0) || (ttl >= 255)) { + etiLog.level(error) << "Invalid TTL setting in " << + uri_without_proto; + return -1; + } - err = socket_->setMulticastTTL(ttl); - if (err) { - etiLog.level(error) << "UDP output socket set TTL failed!"; - return -1; + socket_.setMulticastTTL(ttl); } } + catch (const std::runtime_error& e) { + etiLog.level(error) << "Failed to set UDP output settings" << e.what(); + } } else if (not query_params.empty()) { etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setSize(0); - this->packet_->addData(buffer, size); - return this->socket_->send(*this->packet_); + const uint8_t *buf = reinterpret_cast(buffer); + packet_.buffer.resize(0); + std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); + socket_.send(packet_); + return 0; } #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..6d3950f 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -69,23 +69,17 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) { - auto udp_socket = std::make_shared(udp_dest->source_port); + auto udp_socket = std::make_shared(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = udp_socket->setMulticastTTL(udp_dest->ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } + udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket->setMulticastTTL(udp_dest->ttl); } udp_sockets.emplace(udp_dest.get(), udp_socket); } else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - auto dispatcher = make_shared(tcp_dest->max_frames_queued); + auto dispatcher = make_shared(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } @@ -129,9 +123,8 @@ void Sender::write(const TagPacket& tagpacket) for (const auto& edi_frag : edi_fragments) { for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } @@ -158,9 +151,8 @@ void Sender::write(const TagPacket& tagpacket) // Send over ethernet for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(af_packet, addr); } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 7b0a0db..74126d1 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -32,11 +32,11 @@ #include "AFPacket.h" #include "PFT.h" #include "Interleaver.h" +#include "Socket.h" #include #include #include #include -#include "dabOutput/dabOutput.h" namespace edi { @@ -61,8 +61,8 @@ class Sender { // To mitigate for burst packet loss, PFT fragments can be sent out-of-order edi::Interleaver edi_interleaver; - std::unordered_map> udp_sockets; - std::unordered_map> tcp_dispatchers; + std::unordered_map> udp_sockets; + std::unordered_map> tcp_dispatchers; }; } diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..b4cced0 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint) throw out_of_range("can't use port number 0 in udp address"); } - if (m_sock.reinit(port, address) == -1) { - stringstream ss; - ss << "Could not init UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } - - if (m_sock.setBlocking(false) == -1) { - stringstream ss; - ss << "Could not set non-blocking UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + m_sock.reinit(port, address); + m_sock.setBlocking(false); etiLog.level(info) << "Opened UDP port " << address << ":" << port; } @@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint) int Udp::readFrame(uint8_t* buffer, size_t size) { // Regardless of buffer contents, try receiving data. - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - std::copy(packet.getData(), packet.getData() + packet.getSize(), - back_inserter(m_buffer)); + std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer)); // Take data from the buffer if it contains enough data, // in any case write the buffer @@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate) int Udp::close() { - return m_sock.close(); + m_sock.close(); + return 0; } @@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name) void Sti_d_Rtp::receive_packet() { - UdpPacket packet(32768); - int ret = m_sock.receive(packet); - - if (ret == -1) { - stringstream ss; - ss << "Could not read from UDP socket: " << inetErrMsg; - throw runtime_error(ss.str()); - } + auto packet = m_sock.receive(32768); - if (packet.getSize() == 0) { + if (packet.buffer.empty()) { // No packet was received return; } const size_t STI_FC_LEN = 8; - if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { + if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { etiLog.level(info) << "Received too small RTP packet for " << m_name; return; } - if (not rtpHeaderValid(packet.getData())) { + if (not rtpHeaderValid(packet.buffer.data())) { etiLog.level(info) << "Received invalid RTP header for " << m_name; return; @@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet() // STI(PI, X) size_t index = RTP_HEADER_LEN; - const uint8_t *buf = packet.getData(); + const uint8_t *buf = packet.buffer.data(); // SYNC index++; // Advance over STAT @@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet() m_name; return; } - if (packet.getSize() < index + DFS) { + if (packet.buffer.size() < index + DFS) { etiLog.level(info) << "Received STI too small for given DFS for " << m_name; return; @@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet() uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits index += 2; - if (packet.getSize() < index + 4*NST) { + if (packet.buffer.size() < index + 4*NST) { etiLog.level(info) << "Received STI too small to contain NST for " << - m_name << " packet: " << packet.getSize() << " need " << + m_name << " packet: " << packet.buffer.size() << " need " << index + 4*NST; return; } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..dd637c6 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@ #include #include #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h" namespace Inputs { @@ -46,7 +46,7 @@ class Udp : public InputBase { virtual int close(); protected: - UdpSocket m_sock; + Socket::UDPSocket m_sock; std::string m_name; void openUdpSocket(const std::string& endpoint); -- cgit v1.2.3