diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-09-11 22:15:35 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-09-11 22:24:33 +0200 |
commit | 9248c1d7976ba1c37e3df147a1eb3115fe72c8d0 (patch) | |
tree | e331a2fc4600fe80ca4e2b404d4379989a95d127 /src | |
parent | 8750493994d574001e466fef21ded86730359640 (diff) | |
download | dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.gz dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.bz2 dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.zip |
Drop SLIP, Refactor sockets, improve TCP output
Quite a large refactoring of the sockets, TCP and UDP, in order
to improve the ETI-over-TCP output. This can now accept several
simultaneous connections, and requires a throttle.
The SLIP input is gone. The UDP inputs are currently broken.
Diffstat (limited to 'src')
-rw-r--r-- | src/ConfigParser.cpp | 5 | ||||
-rw-r--r-- | src/DabMultiplexer.cpp | 10 | ||||
-rw-r--r-- | src/DabMux.cpp | 3 | ||||
-rw-r--r-- | src/InetAddress.cpp | 127 | ||||
-rw-r--r-- | src/InetAddress.h | 43 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/ParserCmdline.cpp | 4 | ||||
-rw-r--r-- | src/TcpServer.cpp | 243 | ||||
-rw-r--r-- | src/TcpServer.h | 84 | ||||
-rw-r--r-- | src/TcpSocket.cpp | 385 | ||||
-rw-r--r-- | src/TcpSocket.h | 127 | ||||
-rw-r--r-- | src/UdpSocket.cpp | 504 | ||||
-rw-r--r-- | src/UdpSocket.h | 197 | ||||
-rw-r--r-- | src/dabInputBridgeUdp.cpp | 6 | ||||
-rw-r--r-- | src/dabInputDmbFile.cpp | 3 | ||||
-rw-r--r-- | src/dabInputDmbUdp.cpp | 1 | ||||
-rw-r--r-- | src/dabInputSlip.cpp | 412 | ||||
-rw-r--r-- | src/dabInputSlip.h | 52 | ||||
-rw-r--r-- | src/dabOutput/dabOutput.h | 84 | ||||
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 250 | ||||
-rw-r--r-- | src/dabOutput/dabOutputUdp.cpp | 14 | ||||
-rw-r--r-- | src/utils.cpp | 3 |
22 files changed, 615 insertions, 1944 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index b6d1482..733b5df 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -61,7 +61,6 @@ #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" #include "dabInputBridgeUdp.h" -#include "dabInputSlip.h" #include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" @@ -744,10 +743,6 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan, } else if (proto == "udp") { operations = dabInputBridgeUdpOperations; #endif // defined(HAVE_INPUT_UDP) -#if defined(HAVE_INPUT_SLIP) - } else if (proto == "slip") { - operations = dabInputSlipOperations; -#endif // defined(HAVE_INPUT_SLIP) #endif // defined(HAVE_FORMAT_BRIDGE) } } else if (type == "data") { diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 12876aa..3fe3078 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -98,16 +98,10 @@ void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) if (edi_conf.enabled()) { for (auto& edi_destination : edi_conf.destinations) { - auto edi_output = std::make_shared<UdpSocket>(); - int err = edi_output->create(edi_destination.source_port); - - if (err) { - etiLog.level(error) << "EDI socket creation failed!"; - throw MuxInitException(); - } + auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port); if (not edi_destination.source_addr.empty()) { - err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); + int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); if (err) { etiLog.level(error) << "EDI socket set source failed!"; throw MuxInitException(); diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 0420e34..9699ea6 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -105,7 +105,6 @@ typedef DWORD32 uint32_t; #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" #include "dabInputBridgeUdp.h" -#include "dabInputSlip.h" #include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" @@ -500,8 +499,6 @@ int main(int argc, char *argv[]) outputs.clear(); - UdpSocket::clean(); - if (returnCode != 0) { etiLog.log(emerg, "...aborting\n"); } else { diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp index 3fc33ad..90cdd06 100644 --- a/src/InetAddress.cpp +++ b/src/InetAddress.cpp @@ -1,6 +1,11 @@ /* Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -22,12 +27,8 @@ #include "InetAddress.h" #include <iostream> #include <stdio.h> - -#ifdef _WIN32 -#else -# include <errno.h> -# include <string.h> -#endif +#include <errno.h> +#include <string.h> #ifdef TRACE_ON # ifndef TRACE_CLASS @@ -127,31 +128,33 @@ void InetAddress::setPort(int port) * @return 0 if ok * -1 if error */ -int InetAddress::setAddress(const char *name) +int InetAddress::setAddress(const std::string& name) { - TRACE_CLASS("InetAddress", "setAddress(char*)"); - if (name) { - if (atoi(name)) { // If it start with a number - if ((addr.sin_addr.s_addr = inet_addr(name)) == INADDR_NONE) { + TRACE_CLASS("InetAddress", "setAddress(string)"); + if (!name.empty()) { + if (atoi(name.c_str())) { // If it start with a number + if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) { addr.sin_addr.s_addr = htons(INADDR_ANY); inetErrNo = 0; inetErrMsg = "Invalid address"; - inetErrDesc = name; + inetErrDesc = name.c_str(); return -1; } - } else { // Assume it's a real name - hostent *host = gethostbyname(name); + } + else { // Assume it's a real name + hostent *host = gethostbyname(name.c_str()); if (host) { addr.sin_addr = *(in_addr *)(host->h_addr); } else { addr.sin_addr.s_addr = htons(INADDR_ANY); inetErrNo = 0; inetErrMsg = "Could not find address"; - inetErrDesc = name; + inetErrDesc = name.c_str(); return -1; } } - } else { + } + else { addr.sin_addr.s_addr = INADDR_ANY; } return 0; @@ -161,100 +164,8 @@ int InetAddress::setAddress(const char *name) void setInetError(const char* description) { inetErrNo = 0; -#ifdef _WIN32 - inetErrNo = WSAGetLastError(); - switch (inetErrNo) { - case WSANOTINITIALISED: - inetErrMsg = "WSANOTINITIALISED A successful WSAStartup must occur before using this function."; - break; - case WSAENETDOWN: - inetErrMsg = "WSAENETDOWN The network subsystem has failed."; - break; - case WSAEFAULT: - inetErrMsg = "WSAEFAULT The buf or from parameters are not part of the user address space, or the fromlen parameter is too small to accommodate the peer address."; - break; - case WSAEINTR: - inetErrMsg = "WSAEINTR The (blocking) call was canceled through WSACancelBlockingCall."; - break; - case WSAEINPROGRESS: - inetErrMsg = "WSAEINPROGRESS A blocking Windows Sockets 1.1 call is in progress, or the service provider is still processing a callback function."; - break; - case WSAEINVAL: - inetErrMsg = "WSAEINVAL The socket has not been bound with bind, or an unknown flag was specified, or MSG_OOB was specified for a socket with SO_OOBINLINE enabled, or (for byte stream-style sockets only) len was zero or negative."; - break; - case WSAEISCONN: - inetErrMsg = "WSAEISCONN The socket is connected. This function is not permitted with a connected socket, whether the socket is connection-oriented or connectionless."; - break; - case WSAENETRESET: - inetErrMsg = "WSAENETRESET The connection has been broken due to the \"keep-alive\" activity detecting a failure while the operation was in progress."; - break; - case WSAENOTSOCK: - inetErrMsg = "WSAENOTSOCK The descriptor is not a socket."; - break; - case WSAEOPNOTSUPP: - inetErrMsg = "WSAEOPNOTSUPP MSG_OOB was specified, but the socket is not stream-style such as type SOCK_STREAM, out-of-band data is not supported in the communication domain associated with this socket, or the socket is unidirectional and supports only send operations."; - break; - case WSAESHUTDOWN: - inetErrMsg = "WSAESHUTDOWN The socket has been shut down; it is not possible to recvfrom on a socket after shutdown has been invoked with how set to SD_RECEIVE or SD_BOTH."; - break; - case WSAEWOULDBLOCK: - inetErrMsg = "WSAEWOULDBLOCK The socket is marked as nonblocking and the recvfrom operation would block."; - break; - case WSAEMSGSIZE: - inetErrMsg = "WSAEMSGSIZE The message was too large to fit into the specified buffer and was truncated."; - break; - case WSAETIMEDOUT: - inetErrMsg = "WSAETIMEDOUT The connection has been dropped, because of a network failure or because the system on the other end went down without notice."; - break; - case WSAECONNRESET: - inetErrMsg = "WSAECONNRESET"; - break; - case WSAEACCES: - inetErrMsg = "WSAEACCES The requested address is a broadcast address, but the appropriate flag was not set. Call setsockopt with the SO_BROADCAST parameter to allow the use of the broadcast address."; - break; - case WSAENOBUFS: - inetErrMsg = "WSAENOBUFS No buffer space is available."; - break; - case WSAENOTCONN: - inetErrMsg = "WSAENOTCONN The socket is not connected (connection-oriented sockets only)"; - break; - case WSAEHOSTUNREACH: - inetErrMsg = "WSAEHOSTUNREACH The remote host cannot be reached from this host at this time."; - break; - case WSAECONNABORTED: - inetErrMsg = "WSAECONNABORTED The virtual circuit was terminated due to a time-out or other failure. The application should close the socket as it is no longer usable."; - break; - case WSAEADDRNOTAVAIL: - inetErrMsg = "WSAEADDRNOTAVAIL The remote address is not a valid address, for example, ADDR_ANY."; - break; - case WSAEAFNOSUPPORT: - inetErrMsg = "WSAEAFNOSUPPORT Addresses in the specified family cannot be used with this socket."; - break; - case WSAEDESTADDRREQ: - inetErrMsg = "WSAEDESTADDRREQ A destination address is required."; - break; - case WSAENETUNREACH: - inetErrMsg = "WSAENETUNREACH The network cannot be reached from this host at this time."; - break; - case WSAEMFILE: - inetErrMsg = "No more socket descriptors are available."; - break; - case WSAEPROTONOSUPPORT: - inetErrMsg = "The specified protocol is not supported."; - break; - case WSAEPROTOTYPE: - inetErrMsg = "The specified protocol is the wrong type for this socket."; - break; - case WSAESOCKTNOSUPPORT: - inetErrMsg = "The specified socket type is not supported in this address family."; - break; - default: - inetErrMsg = "Unknown"; - }; -#else inetErrNo = errno; inetErrMsg = strerror(inetErrNo); -#endif inetErrDesc = description; } diff --git a/src/InetAddress.h b/src/InetAddress.h index 266b1fd..0ccc70b 100644 --- a/src/InetAddress.h +++ b/src/InetAddress.h @@ -1,6 +1,11 @@ /* Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -26,33 +31,17 @@ # include "config.h" #endif -// General libraries #include <stdlib.h> -// Linux librairies -#ifndef _WIN32 -// # include <sys/types.h> -# include <sys/socket.h> -# include <netinet/in.h> -# include <unistd.h> -# include <netdb.h> -# include <arpa/inet.h> -# include <pthread.h> -# define SOCKET int -# define INVALID_SOCKET -1 -# define closesocket ::close -// Windows librairies -#else -# include <winsock.h> -# ifdef _MSC_VER -# pragma comment(lib, "wsock32.lib") -# elif defined(__BORLANDC__) -# pragma(lib, "mswsock.lib") -# endif -# ifndef IN_MULTICAST -# define IN_MULTICAST(a) ((((unsigned long) (a)) & 0xf0000000) == 0xe0000000) -# endif -#endif -// General definitions +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#include <string> + +#define SOCKET int +#define INVALID_SOCKET -1 #define INVALID_PORT -1 @@ -79,7 +68,7 @@ class InetAddress { sockaddr *getAddress(); const char *getHostAddress(); int getPort(); - int setAddress(const char *name); + int setAddress(const std::string& name); void setPort(int port); bool isMulticastAddress(); diff --git a/src/Makefile.am b/src/Makefile.am index c572ef3..ede6649 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,7 +65,6 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ dabInputPrbs.h dabInputPrbs.cpp \ dabInputRawFile.h dabInputRawFile.cpp \ dabInputRawFifo.h dabInputRawFifo.cpp \ - dabInputSlip.h dabInputSlip.cpp \ dabInputTest.h dabInputTest.cpp \ dabInputUdp.h dabInputUdp.cpp \ dabInputZmq.h dabInputZmq.cpp \ @@ -94,7 +93,6 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ PcDebug.h \ ReedSolomon.h ReedSolomon.cpp \ RemoteControl.cpp RemoteControl.h \ - TcpServer.h TcpServer.cpp \ TcpSocket.h TcpSocket.cpp \ UdpSocket.h UdpSocket.cpp \ bridge.h bridge.c \ diff --git a/src/ParserCmdline.cpp b/src/ParserCmdline.cpp index 723efd6..8ce6f7a 100644 --- a/src/ParserCmdline.cpp +++ b/src/ParserCmdline.cpp @@ -284,10 +284,6 @@ bool parse_cmdline(char **argv, } else if (strcmp((*subchannel)->inputProto, "udp") == 0) { operations = dabInputBridgeUdpOperations; #endif // defined(HAVE_INPUT_UDP) -#if defined(HAVE_INPUT_SLIP) - } else if (strcmp((*subchannel)->inputProto, "slip") == 0) { - operations = dabInputSlipOperations; -#endif // defined(HAVE_INPUT_SLIP) #endif // defined(HAVE_FORMAT_BRIDGE) } } else if (c == 'D') { diff --git a/src/TcpServer.cpp b/src/TcpServer.cpp deleted file mode 100644 index 20ecffb..0000000 --- a/src/TcpServer.cpp +++ /dev/null @@ -1,243 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "TcpServer.h" -#include <iostream> -#include <stdio.h> -#include <errno.h> - -#ifdef _WIN32 -#else -# include <sys/socket.h> -#endif - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" <<this <<")::" <<(func) <<endl -# define TRACE_STATIC(class, func) cout <<"-" <<(class) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) -# define TRACE_STATIC(class, func) -# endif -#endif - - -/// Must be call once before doing any operation on sockets -int TcpServer::init() -{ -#ifdef _WIN32 - WSADATA wsaData; - WORD wVersionRequested = wVersionRequested = MAKEWORD( 2, 2 ); - - int res = WSAStartup( wVersionRequested, &wsaData ); - if (res) { - setInetError("Can't initialize winsock"); - return -1; - } -#endif - return 0; -} - - -/// Must be call once before leaving application -int TcpServer::clean() -{ -#ifdef _WIN32 - int res = WSACleanup(); - if (res) { - setInetError("Can't initialize winsock"); - return -1; - } -#endif - return 0; -} - - -/** - * Two step constructor. Create must be called prior to use this - * socket. - */ -TcpServer::TcpServer() : - listenSocket(INVALID_SOCKET) -{ - TRACE_CLASS("TcpServer", "TcpServer()"); -} - - -/** - * One step constructor. - * @param port The port number on which the socket will be bind - * @param name The IP address on which the socket will be bind. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ -TcpServer::TcpServer(int port, const char *name) : - listenSocket(INVALID_SOCKET) -{ - TRACE_CLASS("TcpServer", "TcpServer(int, char*)"); - create(port, name); -} - - -/** - * Two step initializer. This function must be called after the constructor - * without argument as been called. - * @param port The port number on which the socket will be bind - * @param name The IP address on which the socket will be bind. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - * @return 0 if ok - * -1 if error - */ -int TcpServer::create(int port, const char *name) -{ - TRACE_CLASS("TcpServer", "create(int, char*)"); - if (listenSocket != INVALID_SOCKET) - closesocket(listenSocket); - address.setAddress(name); - address.setPort(port); - if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { - setInetError("Can't create socket"); - return -1; - } - reuseopt_t reuse = 1; - if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - setInetError("Can't reuse address"); - return -1; - } - if (bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - setInetError("Can't bind socket"); - closesocket(listenSocket); - listenSocket = INVALID_SOCKET; - return -1; - } - return 0; -} - - -/** - * Close the underlying socket. - * @return 0 if ok - * -1 if error - */ -int TcpServer::close() -{ - TRACE_CLASS("TcpServer", "close()"); - if (listenSocket != INVALID_SOCKET) { - int res = closesocket(listenSocket); - if (res != 0) { - setInetError("Can't close socket"); - return -1; - } - listenSocket = INVALID_SOCKET; - } - return 0; -} - - -/// Destructor -TcpServer::~TcpServer() -{ - TRACE_CLASS("TcpServer", "~TcpServer()"); - close(); -} - - -int TcpServer::listen() -{ - TRACE_CLASS("TcpServer", "listen()"); - if (::listen(listenSocket, 1) == SOCKET_ERROR) { - setInetError("Can't listen on socket"); - return -1; - } - return 0; -} - - -TcpSocket* TcpServer::accept() -{ - SOCKET socket; - TcpSocket* client = NULL; - InetAddress addr; - socklen_t addrLen = sizeof(sockaddr_in); - - socket = ::accept(listenSocket, addr.getAddress(), &addrLen); - if (socket == SOCKET_ERROR) { - setInetError("Can't accept connection on socket"); - } else { - client = new TcpSocket(); - client->setSocket(socket, addr); - } - return client; -} - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/TcpServer.h b/src/TcpServer.h deleted file mode 100644 index bff7e2e..0000000 --- a/src/TcpServer.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _TCPSERVER -#define _TCPSERVER - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#ifdef _WIN32 -# include <winsock.h> -# define socklen_t int -# define reuseopt_t char -#else -# include <sys/socket.h> -# include <netinet/in.h> -# include <unistd.h> -# include <netdb.h> -# include <arpa/inet.h> -# include <pthread.h> -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 - -#include <iostream> -#include "TcpSocket.h" -//#include "SocketSelector.h" - -/** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca - */ -class TcpServer { - friend class SocketSelector; -public: - TcpServer(); - TcpServer(int port, const char *name = NULL); - ~TcpServer(); - - static int init(); - static int clean(); - - int create(int port, const char *name = NULL); - int close(); - int listen(); - TcpSocket* accept(); - - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; -}; - -#endif // _TCPSERVER diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 75b320f..13efece 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -1,6 +1,11 @@ /* Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -20,86 +25,72 @@ */ #include "TcpSocket.h" +#include "Log.h" #include <iostream> #include <stdio.h> #include <errno.h> #include <string.h> #include <signal.h> +#include <stdint.h> -#ifdef _WIN32 -#else -# include <unistd.h> -#endif - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" <<this <<")::" <<(func) <<endl -# define TRACE_STATIC(class, func) cout <<"-" <<(class) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) -# define TRACE_STATIC(class, func) -# endif -#endif - +using namespace std; -/// Must be call once before doing any operation on sockets -int TcpSocket::init() +TcpSocket::TcpSocket() : + m_sock(INVALID_SOCKET) { -#ifdef _WIN32 - WSADATA wsaData; - WORD wVersionRequested = wVersionRequested = MAKEWORD( 2, 2 ); - - int res = WSAStartup( wVersionRequested, &wsaData ); - if (res) { - setInetError("Can't initialize winsock"); - return -1; - } -#endif - return 0; + if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { + throw std::runtime_error("Can't create socket"); + } } - -/// Must be call once before leaving application -int TcpSocket::clean() +TcpSocket::TcpSocket(int port, const string& name) : + m_sock(INVALID_SOCKET) { -#ifdef _WIN32 - int res = WSACleanup(); - if (res) { - setInetError("Can't initialize winsock"); - return -1; - } -#endif - return 0; -} + if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { + throw std::runtime_error("Can't create socket"); + } + reuseopt_t reuse = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) + == SOCKET_ERROR) { + throw std::runtime_error("Can't reuse address"); + } -/** - * Two step constructor. Create must be called prior to use this - * socket. - */ -TcpSocket::TcpSocket() : - listenSocket(INVALID_SOCKET) -{ - TRACE_CLASS("TcpSocket", "TcpSocket()"); + m_own_address.setAddress(name); + m_own_address.setPort(port); + + if (bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { + ::close(m_sock); + m_sock = INVALID_SOCKET; + throw std::runtime_error("Can't bind socket"); + } } +TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) : + m_own_address(own), + m_remote_address(remote), + m_sock(sock) { } -/** - * One step constructor. - * @param port The port number on which the socket will be bind - * @param name The IP address on which the socket will be bind. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ -TcpSocket::TcpSocket(int port, char *name) : - listenSocket(INVALID_SOCKET) +// The move constructors must ensure the moved-from +// TcpSocket won't destroy our socket handle +TcpSocket::TcpSocket(TcpSocket&& other) { - TRACE_CLASS("TcpSocket", "TcpSocket(int, char*)"); - create(port, name); + m_sock = other.m_sock; + other.m_sock = INVALID_SOCKET; + + m_own_address = other.m_own_address; + m_remote_address = other.m_remote_address; } +TcpSocket& TcpSocket::operator=(TcpSocket&& other) +{ + m_sock = other.m_sock; + other.m_sock = INVALID_SOCKET; + + m_own_address = other.m_own_address; + m_remote_address = other.m_remote_address; + return *this; +} /** * Close the underlying socket. @@ -108,259 +99,79 @@ TcpSocket::TcpSocket(int port, char *name) : */ int TcpSocket::close() { - TRACE_CLASS("TcpSocket", "close()"); - if (listenSocket != INVALID_SOCKET) { - int res = closesocket(listenSocket); + if (m_sock != INVALID_SOCKET) { + int res = ::close(m_sock); if (res != 0) { setInetError("Can't close socket"); return -1; } - listenSocket = INVALID_SOCKET; + m_sock = INVALID_SOCKET; } - return 0; -} - - -/** - * Two step initializer. This function must be called after the constructor - * without argument as been called. - * @param port The port number on which the socket will be bind - * @param name The IP address on which the socket will be bind. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - * @return 0 if ok - * -1 if error - */ -int TcpSocket::create(int port, char *name) -{ - TRACE_CLASS("TcpSocket", "create(int, char*)"); - if (listenSocket != INVALID_SOCKET) - closesocket(listenSocket); - address.setAddress(name); - address.setPort(port); - if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { - setInetError("Can't create socket"); - return -1; - } - reuseopt_t reuse = 1; - if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - setInetError("Can't reuse address"); - return -1; - } - if (bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - setInetError("Can't bind socket"); - closesocket(listenSocket); - listenSocket = INVALID_SOCKET; - return -1; - } - return 0; -} - - -/// Destructor -TcpSocket::~TcpSocket() { - TRACE_CLASS("TcpSocket", "~TcpSocket()"); - close(); -} - - -/** - * Receive an telnet packet, i.e a TCP stream ending with carriage return. - * @param data The buffer that will receive data. - * @param size The buffer size. - * @return > 0 if ok, -1 (SOCKET_ERROR) if error - */ -int TcpSocket::telnetRead(void* data, int size) -{ - TRACE_CLASS("TcpSocket", "read(void*, size)"); - int ret; - - printf("selectCall\n"); - - printf("readread 1\n"); - char *line=GetLine(listenSocket); - ret=strlen(line); - printf("readread 2\n"); - if (ret <= size) - { - strcpy((char*)data, line); - } - else - { -// size_t n = size; - strcpy((char*)data, line); - ret = size; - } - printf("TELNET READ returned %d\n", ret); - return ret; + return 0; } -/** - * Receive a TCP stream. - * @param data The buffer that will receive data. - * @param size The buffer size. - * @return > 0 if ok, -1 (SOCKET_ERROR) if error - */ -int TcpSocket::read(void* data, int size) +TcpSocket::~TcpSocket() { - TRACE_CLASS("TcpSocket", "read(void*, size)"); - - int ret = recv(listenSocket, (char*)data, size, 0); - if (ret == SOCKET_ERROR) { - setInetError("Can't receive TCP packet"); - return -1; - } - return ret; + close(); } - -#define MAX 512 -char* TcpSocket::GetLine(int fd) +ssize_t TcpSocket::recv(void* data, size_t size) { - static char line[MAX]; - static char netread[MAX] = ""; - int n, len; - char *p; - - len = strlen(netread); - - /* look for \r\n in netread buffer */ - p = strstr(netread, "\r\n"); - if (p == NULL) { - /* fill buff - no \r\n found */ - //n = ::read(fd, (void*)(netread+len), (size_t)(MAX-len)); - n = recv(fd, (netread+len), MAX-len, 0); - if (n == SOCKET_ERROR) { - setInetError("Can't receive TCP packet"); - return NULL; + ssize_t ret = ::recv(m_sock, (char*)data, size, 0); + if (ret == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket recv error: " << strerror(errno); + throw std::runtime_error(ss.str()); } - len += n; - netread[len] = '\0'; - if (n>0) - return GetLine(fd); - } - if (p!=NULL) - { - *p = '\0'; - strcpy(line, netread); - /* copy rest of buf down */ - memmove(netread, p+2, strlen(p+2)+1); - } - return line; + return ret; } -/** - * Send an TCP packet. - * @param data The buffer taht will be sent. - * @param size Number of bytes to send. - * return 0 if ok, -1 if error - */ -int TcpSocket::write(const void* data, int size) +ssize_t TcpSocket::send(const void* data, size_t size) { -#ifdef DUMP - TRACE_CLASS("TcpSocket", "write(const void*, int)"); -#endif + /* Without MSG_NOSIGNAL the process would receive a SIGPIPE and die */ + ssize_t ret = ::send(m_sock, (const char*)data, size, MSG_NOSIGNAL); - // ignore BROKENPIPE signal (we handle it instead) -// void* old_sigpipe = signal ( SIGPIPE, SIG_IGN ); - // try to send data - int ret = send(listenSocket, (const char*)data, size, 0 /*MSG_NOSIGNAL*/ ); - // restore the BROKENPIPE handling -// signal ( SIGPIPE, (__sighandler_t)old_sigpipe ); - if (ret == SOCKET_ERROR) { - setInetError("Can't send TCP packet"); - return -1; - } - return ret; + if (ret == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket send error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + return ret; } - -int TcpSocket::setDestination(InetAddress &addr) +void TcpSocket::listen() { - address = addr; - int ret = connect(listenSocket, addr.getAddress(), sizeof(*addr.getAddress())); - // A etre verifier: code de retour differend entre Linux et Windows - return ret; + if (::listen(m_sock, 1) == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket listen error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } } - -void TcpSocket::setSocket(SOCKET socket, InetAddress &addr) +TcpSocket TcpSocket::accept() { - if (listenSocket != INVALID_SOCKET) - closesocket(listenSocket); - listenSocket = socket; - address = addr; + InetAddress remote_addr; + socklen_t addrLen = sizeof(sockaddr_in); + + SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); + if (socket == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket accept error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + else { + TcpSocket client(socket, m_own_address, remote_addr); + return client; + } } - -InetAddress TcpSocket::getAddress() +InetAddress TcpSocket::getOwnAddress() const { - return address; + return m_own_address; } - -int TcpSocket::PeekCount() +InetAddress TcpSocket::getRemoteAddress() const { - int count; - char tempBuffer[3]; - int size=3; - - count = recv(listenSocket, tempBuffer, size, MSG_PEEK); - if (count == -1) - { - printf("ERROR WHEN PEEKING SOCKET\n"); - } - return count; + return m_remote_address; } - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/TcpSocket.h b/src/TcpSocket.h index c667cfd..f1354a7 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -1,6 +1,11 @@ /* Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -27,71 +32,73 @@ #endif #include "InetAddress.h" -#ifdef _WIN32 -# include <winsock.h> -# define socklen_t int -# define reuseopt_t char -#else -# include <sys/socket.h> -# include <netinet/in.h> -# include <unistd.h> -# include <netdb.h> -# include <arpa/inet.h> -# include <pthread.h> -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 - -//# include "SocketSelector.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#define reuseopt_t int #include <iostream> +#include <string> /** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca + * This class represents a TCP socket. */ -class TcpSocket { - friend class SocketSelector; - public: - TcpSocket(); - TcpSocket(int port, char *name = NULL); - ~TcpSocket(); - - static int init(); - static int clean(); - - int create(int port = 0, char *name = NULL); - int close(); - - int write(const void* data, int size); - int read(void* data, int size); - int telnetRead(void* data, int size); - /** - * Connects the socket on a specific address. Only data from this address - * will be received. - * @param addr The address to connect the socket - * @warning Not implemented yet. - */ - int setDestination(InetAddress &addr); - InetAddress getAddress(); - void setSocket(SOCKET socket, InetAddress &addr); - char* GetLine(int fd); - - int PeekCount(); - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; +class TcpSocket +{ + public: + /** Create a new socket that does nothing */ + TcpSocket(); + + /** Create a new socket listening for incoming connections. + * @param port The port number on which the socket will listen. + * @param name The IP address on which the socket will be bound. + * It is used to bind the socket on a specific interface if + * the computer have many NICs. + */ + TcpSocket(int port, const std::string& name); + ~TcpSocket(); + TcpSocket(TcpSocket&& other); + TcpSocket& operator=(TcpSocket&& other); + + int close(); + + /** Send data over the TCP connection. + * @param data The buffer that will be sent. + * @param size Number of bytes to send. + * return number of bytes sent or -1 if error + */ + ssize_t send(const void* data, size_t size); + + /** Receive data from the socket. + * @param data The buffer that will receive data. + * @param size The buffer size. + * @return number of bytes received or -1 (SOCKET_ERROR) if error + */ + ssize_t recv(void* data, size_t size); + + void listen(void); + TcpSocket accept(void); + + /** Retrieve address this socket is bound to */ + InetAddress getOwnAddress() const; + InetAddress getRemoteAddress() const; + + private: + TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); + TcpSocket(const TcpSocket& other) = delete; + TcpSocket& operator=(const TcpSocket& other) = delete; + + /// The address on which the socket is bound. + InetAddress m_own_address; + InetAddress m_remote_address; + /// The low-level socket used by system functions. + SOCKET m_sock; }; #endif // _TCPSOCKET diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp index 8ac3706..020e3f5 100644 --- a/src/UdpSocket.cpp +++ b/src/UdpSocket.cpp @@ -2,7 +2,9 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 Matthias P. Braendli + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + http://www.opendigitalradio.org */ /* @@ -30,228 +32,126 @@ #include <fcntl.h> #include <string.h> -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" <<this <<")::" <<(func) <<endl -# define TRACE_STATIC(class, func) cout <<"-" <<(class) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) -# define TRACE_STATIC(class, func) -# endif -#endif - - -/// Must be call once before doing any operation on sockets -int UdpSocket::init() -{ -#ifdef _WIN32 - WSADATA wsaData; - WORD wVersionRequested = wVersionRequested = MAKEWORD( 2, 2 ); - - int res = WSAStartup( wVersionRequested, &wsaData ); - if (res) { - setInetError("Can't initialize winsock"); - return -1; - } -#endif - return 0; -} - +using namespace std; -/// Must be call once before leaving application -int UdpSocket::clean() +UdpSocket::UdpSocket() : + listenSocket(INVALID_SOCKET) { -#ifdef _WIN32 - int res = WSACleanup(); - if (res) { - setInetError("Can't initialize winsock"); - return -1; - } -#endif - return 0; + init_sock(0, ""); } - -/** - * Two step constructor. Create must be called prior to use this - * socket. - */ -UdpSocket::UdpSocket() : - listenSocket(INVALID_SOCKET) +UdpSocket::UdpSocket(int port) : + listenSocket(INVALID_SOCKET) { - TRACE_CLASS("UdpSocket", "UdpSocket()"); + init_sock(port, ""); } - -/** - * One step constructor. - * @param port The port number on which the socket will be bind - * @param name The IP address on which the socket will be bind. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - */ -UdpSocket::UdpSocket(int port, char *name) : - listenSocket(INVALID_SOCKET) +UdpSocket::UdpSocket(int port, const std::string& name) : + listenSocket(INVALID_SOCKET) { - TRACE_CLASS("UdpSocket", "UdpSocket(int, char*)"); - create(port, name); + init_sock(port, name); } -/** - * This functin set blocking mode. The socket can be blocking or not, - * depending of the parametre. By default, the socket is blocking. - * @param block If true, set the socket blocking, otherwise set non-blocking - * @return 0 if ok - * -1 if error - */ int UdpSocket::setBlocking(bool block) { -#ifdef _WIN32 - unsigned long res = block ? 0 : 1; - if (ioctlsocket(listenSocket, FIONBIO, &res) != 0) { + int res; + if (block) + res = fcntl(listenSocket, F_SETFL, 0); + else + res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); + if (res == SOCKET_ERROR) { setInetError("Can't change blocking state of socket"); return -1; - } + } return 0; -#else - int res; - if (block) - res = fcntl(listenSocket, F_SETFL, 0); - else - res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); - if (res == SOCKET_ERROR) { - setInetError("Can't change blocking state of socket"); - return -1; - } - return 0; -#endif } - -/** - * Two step initializer. This function must be called after the constructor - * without argument as been called. - * @param port The port number on which the socket will be bind - * @param name The IP address on which the socket will be bind. - * It is used to bind the socket on a specific interface if - * the computer have many NICs. - * @return 0 if ok - * -1 if error - */ -int UdpSocket::create(int port, char *name) +int UdpSocket::init_sock(int port, const std::string& name) { - TRACE_CLASS("UdpSocket", "create(int, char*)"); - if (listenSocket != INVALID_SOCKET) - closesocket(listenSocket); - address.setAddress(name); - address.setPort(port); - if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { - setInetError("Can't create socket"); - return -1; - } - reuseopt_t reuse = 1; - if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - == SOCKET_ERROR) { - setInetError("Can't reuse address"); - return -1; - } - - if (bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { - setInetError("Can't bind socket"); - closesocket(listenSocket); - listenSocket = INVALID_SOCKET; - return -1; - } - return 0; + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } + + if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { + setInetError("Can't create socket"); + return -1; + } + reuseopt_t reuse = 1; + if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) + == SOCKET_ERROR) { + setInetError("Can't reuse address"); + return -1; + } + + if (port) { + address.setAddress(name); + address.setPort(port); + + if (bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { + setInetError("Can't bind socket"); + ::close(listenSocket); + listenSocket = INVALID_SOCKET; + return -1; + } + } + return 0; } -/// Destructor -UdpSocket::~UdpSocket() { - TRACE_CLASS("UdpSocket", "~UdpSocket()"); - if (listenSocket != INVALID_SOCKET) - closesocket(listenSocket); +UdpSocket::~UdpSocket() +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } } -/** - * Receive an UDP packet. - * @param packet The packet that will receive data. The address will be set - * to the source address. - * @return 0 if ok, -1 if error - */ -int UdpSocket::receive(UdpPacket &packet) +int UdpSocket::receive(UdpPacket& packet) { - TRACE_CLASS("UdpSocket", "receive(UdpPacket)"); - socklen_t addrSize; - addrSize = sizeof(*packet.getAddress().getAddress()); - int ret = recvfrom(listenSocket, packet.getData(), packet.getSize() - packet.getOffset(), 0, - packet.getAddress().getAddress(), &addrSize); - if (ret == SOCKET_ERROR) { - packet.setLength(0); -#ifndef _WIN32 - if (errno == EAGAIN) + socklen_t addrSize; + addrSize = sizeof(*packet.getAddress().getAddress()); + ssize_t ret = recvfrom(listenSocket, + packet.getData(), + packet.getSize(), + 0, + packet.getAddress().getAddress(), + &addrSize); + + if (ret == SOCKET_ERROR) { + packet.setSize(0); + if (errno == EAGAIN) { + return 0; + } + setInetError("Can't receive UDP packet"); + return -1; + } + + packet.setSize(ret); return 0; -#endif - setInetError("Can't receive UDP packet"); - return -1; - } - packet.setLength(ret); - if (ret == (long)packet.getSize()) { - packet.setSize(packet.getSize() << 1); - } - return 0; } -/** - * Send an UDP packet. - * @param packet The UDP packet to be sent. It includes the data and the - * destination address - * return 0 if ok, -1 if error - */ -int UdpSocket::send(UdpPacket &packet) +int UdpSocket::send(UdpPacket& packet) { -#ifdef DUMP - TRACE_CLASS("UdpSocket", "send(UdpPacket)"); -#endif - int ret = sendto(listenSocket, packet.getData(), packet.getLength(), 0, - packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); - if (ret == SOCKET_ERROR -#ifndef _WIN32 - && errno != ECONNREFUSED -#endif - ) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; + int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0, + packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + setInetError("Can't send UDP packet"); + return -1; + } + return 0; } -/** - * Send an UDP packet - * - * return 0 if ok, -1 if error - */ -int UdpSocket::send(std::vector<uint8_t> data, InetAddress destination) +int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) { -#ifdef DUMP - TRACE_CLASS("UdpSocket", "send(vector<uint8_t>)"); -#endif - int ret = sendto(listenSocket, &data[0], data.size(), 0, - destination.getAddress(), sizeof(*destination.getAddress())); - if (ret == SOCKET_ERROR -#ifndef _WIN32 - && errno != ECONNREFUSED -#endif - ) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; + int ret = sendto(listenSocket, &data[0], data.size(), 0, + destination.getAddress(), sizeof(*destination.getAddress())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + setInetError("Can't send UDP packet"); + return -1; + } + return 0; } @@ -263,36 +163,22 @@ st address to join. */ int UdpSocket::joinGroup(char* groupname) { - TRACE_CLASS("UdpSocket", "joinGroup(char*)"); -#ifdef _WIN32 - ip_mreq group; -#else - ip_mreqn group; -#endif - if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { - setInetError(groupname); - return -1; - } - if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - setInetError("Not a multicast address"); - return -1; - } -#ifdef _WIN32 - group.imr_interface.s_addr = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - return -1; - } -#else - group.imr_address.s_addr = htons(INADDR_ANY);; - group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - } -#endif - return 0; + ip_mreqn group; + if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { + setInetError(groupname); + return -1; + } + if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { + setInetError("Not a multicast address"); + return -1; + } + group.imr_address.s_addr = htons(INADDR_ANY);; + group.imr_ifindex = 0; + if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + == SOCKET_ERROR) { + setInetError("Can't join multicast group"); + } + return 0; } int UdpSocket::setMulticastTTL(int ttl) @@ -323,188 +209,38 @@ int UdpSocket::setMulticastSource(const char* source_addr) return 0; } +UdpPacket::UdpPacket() { } -/** - * Constructs an UDP packet. - * @param initSize The initial size of the data buffer - */ -UdpPacket::UdpPacket(unsigned int initSize) : - dataBuf(new char[initSize]), - length(0), - size(initSize), - offset(0) -{ - TRACE_CLASS("UdpPacket", "UdpPacket(unsigned int)"); - if (dataBuf == NULL) - size = 0; -} +UdpPacket::UdpPacket(size_t initSize) : + m_buffer(initSize) +{ } -/// Destructor -UdpPacket::~UdpPacket() +void UdpPacket::setSize(size_t newSize) { - TRACE_CLASS("UdpPacket", "~UdpPacket()"); - if (dataBuf != NULL) { - delete []dataBuf; - dataBuf = NULL; - } + m_buffer.resize(newSize); } - -/** - * Changes size of the data buffer size. \a Length + \a offset data will be copied - * in the new buffer. - * @warning The pointer to data will be changed - * @param newSize The new data buffer size - */ -void UdpPacket::setSize(unsigned newSize) -{ - TRACE_CLASS("UdpPacket", "setSize(unsigned)"); - char *tmp = new char[newSize]; - if (length > newSize) - length = newSize; - if (tmp) { - memcpy(tmp, dataBuf, length); - delete []dataBuf; - dataBuf = tmp; - size = newSize; - } -} - -/** - * Give the pointer to data. It is ajusted with the \a offset. - * @warning This pointer change. when the \a size of the buffer and the \a offset change. - * @return The pointer - */ -char *UdpPacket::getData() +uint8_t* UdpPacket::getData() { - return dataBuf + offset; + return &m_buffer[0]; } -/** - * Add some data at the end of data buffer and adjust size. - * @param data Pointer to the data to add - * @param size Size in bytes of new data - */ -void UdpPacket::addData(const void *data, unsigned size) +void UdpPacket::addData(const void *data, size_t size) { - if (length + size > this->size) { - setSize(this->size << 1); - } - memcpy(dataBuf + length, data, size); - length += size; + uint8_t *d = (uint8_t*)data; + std::copy(d, d + size, std::back_inserter(m_buffer)); } - -/** - * Returns the length of useful data. Data before the \a offset are ignored. - * @return The data length - */ -unsigned long UdpPacket::getLength() +size_t UdpPacket::getSize() { - return length - offset; + return m_buffer.size(); } - -/** - * Returns the size of the data buffer. - * @return The data buffer size - */ -unsigned long UdpPacket::getSize() +InetAddress UdpPacket::getAddress() { - return size; + return address; } - -/** - * Returns the offset value. - * @return The offset value - */ -unsigned long UdpPacket::getOffset() -{ - return offset; -} - - -/** - * Sets the data length value. Data before the \a offset are ignored. - * @param len The new length of data - */ -void UdpPacket::setLength(unsigned long len) -{ - length = len + offset; -} - - -/** - * Sets the data offset. Data length is ajusted to ignore data before the \a offset. - * @param val The new data offset. - */ -void UdpPacket::setOffset(unsigned long val) -{ - offset = val; - if (offset > length) - length = offset; -} - - -/** - * Returns the UDP address of the data. - * @return The UDP address - */ -InetAddress &UdpPacket::getAddress() -{ - return address; -} - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/UdpSocket.h b/src/UdpSocket.h index 07e9f0e..535499e 100644 --- a/src/UdpSocket.h +++ b/src/UdpSocket.h @@ -2,7 +2,9 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 Matthias P. Braendli + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + http://www.opendigitalradio.org */ /* @@ -30,23 +32,16 @@ #endif #include "InetAddress.h" -#ifdef _WIN32 -# include <winsock.h> -# define socklen_t int -# define reuseopt_t char -#else -# include <sys/socket.h> -# include <netinet/in.h> -# include <unistd.h> -# include <netdb.h> -# include <arpa/inet.h> -# include <pthread.h> -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#define reuseopt_t int #include <stdlib.h> #include <iostream> @@ -62,77 +57,111 @@ class UdpPacket; * Each packet sent or received on a datagram socket is individually * addressed and routed. Multiple packets sent from one machine to another may * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca */ -class UdpSocket { - public: - UdpSocket(); - UdpSocket(int port, char *name = NULL); - ~UdpSocket(); - UdpSocket(const UdpSocket& other) = delete; - const UdpSocket& operator=(const UdpSocket& other) = delete; - - static int init(); - static int clean(); - - int create(int port = 0, char *name = NULL); - - int send(UdpPacket &packet); - int send(const std::vector<uint8_t> data); - int send(std::vector<uint8_t> data, InetAddress destination); - int receive(UdpPacket &packet); - int joinGroup(char* groupname); - int setMulticastSource(const char* source_addr); - int setMulticastTTL(int ttl); - /** - * Connects the socket on a specific address. Only data from this address - * will be received. - * @param addr The address to connect the socket - * @warning Not implemented yet. - */ - void connect(InetAddress &addr); - int setBlocking(bool block); - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; +class UdpSocket +{ + public: + /** Create a new socket that will not be bound to any port. To be used + * for data output. + */ + UdpSocket(); + /** Create a new socket. + * @param port The port number on which the socket will be bound + */ + UdpSocket(int port); + /** Create a new socket. + * @param port The port number on which the socket will be bound + * @param name The IP address on which the socket will be bound. + * It is used to bind the socket on a specific interface if + * the computer have many NICs. + */ + UdpSocket(int port, const std::string& name); + ~UdpSocket(); + UdpSocket(const UdpSocket& other) = delete; + const UdpSocket& operator=(const UdpSocket& other) = delete; + + /** Send an UDP packet. + * @param packet The UDP packet to be sent. It includes the data and the + * destination address + * return 0 if ok, -1 if error + */ + int send(UdpPacket& packet); + + /** Send an UDP packet + * + * return 0 if ok, -1 if error + */ + int send(const std::vector<uint8_t>& data, InetAddress destination); + + /** Receive an UDP packet. + * @param packet The packet that will receive the data. The address will be set + * to the source address. + * @return 0 if ok, -1 if error + */ + int receive(UdpPacket& packet); + + int joinGroup(char* groupname); + int setMulticastSource(const char* source_addr); + int setMulticastTTL(int ttl); + + /** Set blocking mode. By default, the socket is blocking. + * @return 0 if ok + * -1 if error + */ + int setBlocking(bool block); + + protected: + int init_sock(int port, const std::string& name); + + /// The address on which the socket is bound. + InetAddress address; + /// The low-level socket used by system functions. + SOCKET listenSocket; }; -/** - * This class represents a UDP packet. +/** This class represents a UDP packet. * - * UDP packets are used to implement a connectionless packet delivery service. - * Each message is routed from one machine to another based solely on - * information contained within that packet. Multiple packets sent from one - * machine to another might be routed differently, and might arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca + * A UDP packet contains a payload (sequence of bytes) and an address. For + * outgoing packets, the address is the destination address. For incoming + * packets, the address tells the user from what source the packet arrived from. */ -class UdpPacket { - public: - UdpPacket(unsigned int initSize = 1024); - UdpPacket(const UdpPacket& packet) = delete; - const UdpPacket& operator=(const UdpPacket&) = delete; - UdpPacket(const UdpPacket&& packet) = delete; - const UdpPacket& operator=(const UdpPacket&&) = delete; - ~UdpPacket(); - - char *getData(); - void addData(const void *data, unsigned size); - unsigned long getLength(); - unsigned long getSize(); - unsigned long getOffset(); - void setLength(unsigned long len); - void setOffset(unsigned long val); - void setSize(unsigned newSize); - InetAddress &getAddress(); - - private: - char *dataBuf; - unsigned long length, size, offset; - InetAddress address; +class UdpPacket +{ + public: + /** Construct an empty UDP packet. + */ + UdpPacket(); + UdpPacket(size_t initSize); + UdpPacket(const UdpPacket& packet) = delete; + const UdpPacket& operator=(const UdpPacket&) = delete; + UdpPacket(const UdpPacket&& packet) = delete; + const UdpPacket& operator=(const UdpPacket&&) = delete; + + /** Give the pointer to data. + * @return The pointer + */ + uint8_t* getData(void); + + /** Append some data at the end of data buffer and adjust size. + * @param data Pointer to the data to add + * @param size Size in bytes of new data + */ + void addData(const void *data, size_t size); + + size_t getSize(void); + + /** Changes size of the data buffer size. Keeps data intact unless + * truncated. + */ + void setSize(size_t newSize); + + /** Returns the UDP address of the packet. + */ + InetAddress getAddress(void); + + private: + std::vector<uint8_t> m_buffer; + InetAddress address; }; #endif // _UDPSOCKET - diff --git a/src/dabInputBridgeUdp.cpp b/src/dabInputBridgeUdp.cpp index 95ba487..fdf3d1f 100644 --- a/src/dabInputBridgeUdp.cpp +++ b/src/dabInputBridgeUdp.cpp @@ -76,18 +76,18 @@ int dabInputBridgeUdpRead(dabInputOperations* ops, void* args, void* buffer, int stats->frameRecords[stats->frameCount].curSize = 0; stats->frameRecords[stats->frameCount].maxSize = size; - if (input->udpData->packet->getLength() == 0) { + if (input->udpData->packet->getSize() == 0) { input->udpData->socket->receive(*input->udpData->packet); } while ((nbBytes = writePacket(input->udpData->packet->getData(), - input->udpData->packet->getLength(), buffer, size, + input->udpData->packet->getSize(), buffer, size, input->info)) != 0) { stats->frameRecords[stats->frameCount].curSize = nbBytes; input->udpData->socket->receive(*input->udpData->packet); } - if (input->udpData->packet->getLength() != 0) { + if (input->udpData->packet->getSize() != 0) { stats->frameRecords[stats->frameCount].curSize = size; } diff --git a/src/dabInputDmbFile.cpp b/src/dabInputDmbFile.cpp index 9262e6c..423d644 100644 --- a/src/dabInputDmbFile.cpp +++ b/src/dabInputDmbFile.cpp @@ -27,6 +27,7 @@ #include <string.h> #include <stdio.h> +#ifdef HAVE_FORMAT_DMB struct dabInputDmbFileData { FILE* file; @@ -62,7 +63,6 @@ int dabInputDmbFileInit(void** args) input->dmb = new Dmb(); *args = input; - UdpSocket::init(); return 0; } @@ -159,3 +159,4 @@ int dabInputDmbFileClean(void** args) } +#endif //HAVE_FORMAT_DMB diff --git a/src/dabInputDmbUdp.cpp b/src/dabInputDmbUdp.cpp index 3609de2..bd1bde7 100644 --- a/src/dabInputDmbUdp.cpp +++ b/src/dabInputDmbUdp.cpp @@ -65,7 +65,6 @@ int dabInputDmbUdpInit(void** args) input->dmb = new Dmb(); *args = input; - UdpSocket::init(); return 0; } diff --git a/src/dabInputSlip.cpp b/src/dabInputSlip.cpp deleted file mode 100644 index 0063cca..0000000 --- a/src/dabInputSlip.cpp +++ /dev/null @@ -1,412 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "dabInputSlip.h" -#include "dabInputFifo.h" -#include "TcpServer.h" -#include "UdpSocket.h" -#include "bridge.h" - -#include <signal.h> -#include <string.h> -#include <limits.h> - -#ifdef HAVE_FORMAT_BRIDGE -# ifdef HAVE_INPUT_SLIP - -#ifdef _WIN32 -# include <io.h> - -# define sem_t HANDLE -# define O_NONBLOCK 0 -#else -# include <semaphore.h> -# define O_BINARY 0 -#endif - - -struct dabInputSlipData { - TcpServer* server; - UdpPacket** packets; - UdpPacket* buffer; - bridgeInfo* info; - dabInputFifoStats stats; - pthread_t thread; - sem_t semWrite; - sem_t semQueue; - bool reading; - volatile int nbPackets; - volatile int queueSize; - volatile int packetSize; -}; - - -struct dabInputOperations dabInputSlipOperations = { - dabInputSlipInit, - dabInputSlipOpen, - dabInputSlipSetbuf, - dabInputSlipRead, - NULL, - NULL, - dabInputSlipReadFrame, - dabInputSetbitrate, - dabInputSlipClose, - dabInputSlipClean, - NULL -}; - - -int dabInputSlipInit(void** args) -{ - dabInputSlipData* data = new dabInputSlipData; - memset(&data->stats, 0, sizeof(data->stats)); - data->stats.id = dabInputFifoData::nb++; - data->server = new TcpServer(); - data->packetSize = 1500; - data->queueSize = 10; - data->packets = new UdpPacket*[data->queueSize]; - for (int i = 0; i < data->queueSize; ++i) { - data->packets[i] = new UdpPacket(data->packetSize); - } - data->buffer = new UdpPacket(data->packetSize); - data->nbPackets = 0; - data->info = new bridgeInfo; - data->thread = (pthread_t)NULL; - bridgeInitInfo(data->info); - -#ifdef _WIN32 - char semName[32]; - sprintf(semName, "semWrite%i", data->stats.id); - data->semWrite = CreateSemaphore(NULL, 1, 1, semName); - if (data->semWrite == NULL) { - fprintf(stderr, "Can't init SLIP data write semaphore %s\n", semName); - return -1; - } - sprintf(semName, "semQueue%i", data->stats.id); - data->semQueue = CreateSemaphore(NULL, 1, 1, semName); - if (data->semQueue == NULL) { - fprintf(stderr, "Can't init SLIP data index semaphore %s\n", semName); - return -1; - } -#else - if (sem_init(&data->semWrite, 0, data->queueSize) == -1) { - perror("Can't init SLIP data write semaphore"); - return -1; - } - if (sem_init(&data->semQueue, 0, 1) == -1) { - perror("Can't init SLIP data index semaphore"); - return -1; - } -#endif - data->reading = false; - - *args = data; - return 0; -} - - -void* dabInputSlipThread(void* args) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - TcpSocket* client; - - while ((client = data->server->accept()) != NULL) { - int size = 0; - etiLog.log(info, "SLIP server got a new client.\n"); - -#ifdef _WIN32 - WaitForSingleObject(data->semWrite, INFINITE); - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semWrite); - sem_wait(&data->semQueue); -#endif - UdpPacket* packet = data->packets[data->nbPackets]; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - - while ((size = client->read(packet->getData(), packet->getSize())) - > 0) { - packet->setLength(size); -#ifdef _WIN32 - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semQueue); -#endif - data->nbPackets++; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - -#ifdef _WIN32 - WaitForSingleObject(data->semWrite, INFINITE); - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semWrite); - sem_wait(&data->semQueue); -#endif - packet = data->packets[data->nbPackets]; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - } - etiLog.log(info, "SLIP server client deconnected.\n"); - client->close(); - } - etiLog.log(error, "SLIP thread can't accept new client (%s)\n", - inetErrDesc, inetErrMsg); - - return NULL; -} - - -int dabInputSlipOpen(void* args, const char* inputName) -{ - const char* address; - long port; - address = strchr(inputName, ':'); - if (address == NULL) { - etiLog.log(error, "\"%s\" SLIP address format is invalid: " - "should be [address]:port - > aborting\n", inputName); - return -1; - } - ++address; - port = strtol(address, (char **)NULL, 10); - if ((port == LONG_MIN) || (port == LONG_MAX)) { - etiLog.log(error, "can't convert port number in SLIP address %s\n", - address); - return -1; - } - if (port == 0) { - etiLog.log(error, "can't use port number 0 in SLIP address\n"); - return -1; - } - dabInputSlipData* data = (dabInputSlipData*)args; - if (data->server->create(port) == -1) { - etiLog.log(error, "can't set port %i on SLIP input (%s: %s)\n", - port, inetErrDesc, inetErrMsg); - return -1; - } - - if (data->server->listen() == -1) { - etiLog.log(error, "can't listen on SLIP socket(%s: %s)\n", - inetErrDesc, inetErrMsg); - return -1; - } -#ifdef _WIN32 - data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputSlipThread, data, 0, NULL); - if (data->thread == NULL) { - fprintf(stderr, "Can't create SLIP child"); - return -1; - } -#else - if (pthread_create(&data->thread, NULL, dabInputSlipThread, data)) { - perror("Can't create SLIP child"); - return -1; - } -#endif - - etiLog.log(debug, "check return code of create\n"); - return 0; -} - - -int dabInputSlipSetbuf(void* args, int size) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - - if (size <= 10) { - return -1; - } - - data->packetSize = size - 10; - if (data->packets == NULL) { - data->packets = new UdpPacket*[data->queueSize]; - } - for (int i = 0; i < data->queueSize; ++i) { - if (data->packets[i] == NULL) { - data->packets[i] = new UdpPacket(data->packetSize); - } else { - data->packets[i]->setSize(data->packetSize); - } - } - if (data->buffer == NULL) { - data->buffer = new UdpPacket(data->packetSize); - } else { - data->buffer->setSize(data->packetSize); - } - - return 0; -} - - -int dabInputSlipRead(void* args, void* buffer, int size) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - - if (data->nbPackets > 0) { // data ready - UdpPacket* temp; - temp = data->buffer; - data->buffer = data->packets[0]; - -#ifdef _WIN32 - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semQueue); -#endif - for (int i = 1; i < data->queueSize; ++i) { - data->packets[i - 1] = data->packets[i]; - } - data->packets[data->queueSize - 1] = temp; - --data->nbPackets; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); - ReleaseSemaphore(data->semWrite, 1, NULL); -#else - sem_post(&data->semQueue); - sem_post(&data->semWrite); -#endif - } else { - data->buffer->setLength(0); - } - - return data->buffer->getLength(); -} - - -int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) -{ - int nbBytes = 0; - dabInputSlipData* data = (dabInputSlipData*)args; - dabInputFifoStats* stats = (dabInputFifoStats*)&data->stats; - -#ifdef _WIN32 - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semQueue); -#endif - stats->bufferRecords[stats->bufferCount].curSize = data->nbPackets; - stats->bufferRecords[stats->bufferCount].maxSize = data->queueSize; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - if (++stats->bufferCount == NB_RECORDS) { - etiLog.log(info, "SLIP buffer state: (%i)", stats->id); - for (int i = 0; i < stats->bufferCount; ++i) { - etiLog.log(info, " %i/%i", - stats->bufferRecords[i].curSize, - stats->bufferRecords[i].maxSize); - } - etiLog.log(info, "\n"); - - stats->bufferCount = 0; - } - - data->stats.frameRecords[data->stats.frameCount].curSize = 0; - data->stats.frameRecords[data->stats.frameCount].maxSize = size; - - if (data->buffer->getLength() == 0) { - ops->read(args, NULL, 0); - } - while ((nbBytes = writePacket(data->buffer->getData(), - data->buffer->getLength(), buffer, size, data->info)) - != 0) { - data->stats.frameRecords[data->stats.frameCount].curSize = nbBytes; - ops->read(args, NULL, 0); - } - - if (data->buffer->getLength() != 0) { - data->stats.frameRecords[data->stats.frameCount].curSize = size; - } - - if (++stats->frameCount == NB_RECORDS) { - etiLog.log(info, "Data subchannel usage: (%i)", - stats->id); - for (int i = 0; i < stats->frameCount; ++i) { - etiLog.log(info, " %i/%i", - stats->frameRecords[i].curSize, - stats->frameRecords[i].maxSize); - } - etiLog.log(info, "\n"); - stats->frameCount = 0; - } - return size; -} - - -int dabInputSlipClose(void* args) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - data->server->close(); -#ifdef WIN32 - DWORD status; - for (int i = 0; i < 5; ++i) { - if (GetExitCodeThread(data->thread, &status)) { - break; - } - Sleep(100); - } - TerminateThread(data->thread, 1); -#else - if (data->thread != (pthread_t)NULL) { - pthread_kill(data->thread, SIGPIPE); - } -#endif - return 0; -} - - -int dabInputSlipClean(void** args) -{ - dabInputSlipData* data = (dabInputSlipData*)(*args); -#ifdef _WIN32 - CloseHandle(data->thread); - CloseHandle(data->semWrite); - CloseHandle(data->semQueue); -#else - sem_destroy(&data->semWrite); - sem_destroy(&data->semQueue); -#endif - for (int i = 0; i < data->queueSize; ++i) { - if (data->packets[i] != NULL) { - delete data->packets[i]; - } - } - delete []data->packets; - delete data->buffer; - delete data->server; - delete data->info; - delete data; - return 0; -} - -# endif // HAVE_INPUT_SLIP -#endif // HAVE_FORMAT_BRIDGE - diff --git a/src/dabInputSlip.h b/src/dabInputSlip.h deleted file mode 100644 index 2b8a782..0000000 --- a/src/dabInputSlip.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef DAB_INPUT_SLIP_H -#define DAB_INPUT_SLIP_H - - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif -#include "dabInput.h" - - -#ifdef HAVE_FORMAT_BRIDGE -# ifdef HAVE_INPUT_SLIP - - -extern struct dabInputOperations dabInputSlipOperations; - -int dabInputSlipInit(void** args); -void* dabInputSlipThread(void* args); -int dabInputSlipOpen(void* args, const char* inputName); -int dabInputSlipSetbuf(void* args, int size); -int dabInputSlipRead(void* args, void* buffer, int size); -int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputSlipClose(void* args); -int dabInputSlipClean(void** args); - - -# endif -#endif - - -#endif // DAB_INPUT_SLIP_H diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index d68cd9c..4a528bd 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -2,13 +2,13 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li - http://opendigitalradio.org + http://www.opendigitalradio.org An object-oriented version of the output channels. - */ +*/ /* This file is part of ODR-DabMux. @@ -30,7 +30,6 @@ #define __DAB_OUTPUT_H #include "UdpSocket.h" -#include "TcpServer.h" #include "Log.h" #include "string.h" #include <stdexcept> @@ -39,20 +38,11 @@ #include <chrono> #include <memory> -#ifdef _WIN32 -# include <io.h> -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include <winioctl.h> -# endif -# include <sdci.h> -#else -# include <unistd.h> -# include <sys/time.h> -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif +#include <unistd.h> +#include <sys/time.h> +#ifndef O_BINARY +# define O_BINARY 0 +#endif // O_BINARY #ifdef HAVE_OUTPUT_ZEROMQ # include "zmq.hpp" #endif @@ -168,21 +158,15 @@ class DabOutputRaw : public DabOutput public: DabOutputRaw() { -#ifdef _WIN32 - socket_ = INVALID_HANDLE_VALUE; -#else socket_ = -1; isCyclades_ = false; -#endif buffer_ = new unsigned char[6144]; } DabOutputRaw(const DabOutputRaw& other) { socket_ = other.socket_; -#ifndef _WIN32 isCyclades_ = other.isCyclades_; -#endif buffer_ = new unsigned char[6144]; memcpy(buffer_, other.buffer_, 6144); } @@ -202,12 +186,8 @@ class DabOutputRaw : public DabOutput } private: std::string filename_; -#ifdef _WIN32 - HANDLE socket_; -#else int socket_; bool isCyclades_; -#endif unsigned char* buffer_; }; @@ -216,17 +196,10 @@ class DabOutputUdp : public DabOutput { public: DabOutputUdp() { - UdpSocket::init(); packet_ = new UdpPacket(6144); socket_ = new UdpSocket(); } - // make sure we don't copy this output around - // the UdpPacket and UdpSocket do not support - // copying either - DabOutputUdp(const DabOutputUdp& other); - DabOutputUdp operator=(const DabOutputUdp& other); - ~DabOutputUdp() { delete socket_; delete packet_; @@ -240,34 +213,26 @@ class DabOutputUdp : public DabOutput return "udp://" + uri_; } private: + // make sure we don't copy this output around + // the UdpPacket and UdpSocket do not support + // copying either + DabOutputUdp(const DabOutputUdp& other) = delete; + DabOutputUdp operator=(const DabOutputUdp& other) = delete; + std::string uri_; UdpSocket* socket_; UdpPacket* packet_; }; // -------------- TCP ------------------ +class TCPDataDispatcher; class DabOutputTcp : public DabOutput { public: - DabOutputTcp() - { - TcpSocket::init(); - server = new TcpServer(); - client = NULL; - } - - DabOutputTcp(const DabOutputTcp& other); - DabOutputTcp operator=(const DabOutputTcp& other); - - ~DabOutputTcp() { - -#ifdef _WIN32 - CloseHandle(this->thread_); -#endif - - delete this->server; - delete this->client; - } + DabOutputTcp() {} + DabOutputTcp(const DabOutputTcp& other) = delete; + const DabOutputTcp& operator=(const DabOutputTcp& other) = delete; + ~DabOutputTcp(); int Open(const char* name); int Write(void* buffer, int size); @@ -277,11 +242,10 @@ class DabOutputTcp : public DabOutput return "tcp://" + uri_; } - TcpServer* server; - TcpSocket* client; private: std::string uri_; - pthread_t thread_; + + TCPDataDispatcher* dispatcher_; }; // -------------- Simul ------------------ @@ -306,11 +270,7 @@ class DabOutputSimul : public DabOutput } private: std::string name_; -#ifdef _WIN32 - DWORD startTime_; -#else std::chrono::steady_clock::time_point startTime_; -#endif }; #if defined(HAVE_OUTPUT_ZEROMQ) diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 5f1943d..a8ae1bc 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org TCP output */ @@ -28,48 +30,129 @@ #include <signal.h> #include <limits.h> #include "dabOutput.h" -#include "TcpServer.h" - -#ifdef _WIN32 -# include <io.h> -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include <winioctl.h> -# endif -# include <sdci.h> -#else -# include <unistd.h> -# include <sys/time.h> -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif - -void* tcpThread(void* param) +#include <unistd.h> +#include <sys/time.h> +#include <list> +#include <vector> +#include <atomic> +#include <boost/thread.hpp> +#include "ThreadsafeQueue.h" +#include "TcpSocket.h" + +using namespace std; + +using vec_u8 = std::vector<uint8_t>; + +// In ETI one element would be an ETI frame of 6144 bytes. +// 250 frames correspond to 6 seconds. This is mostly here +// to ensure we do not accumulate data for faulty sockets, delay +// management has to be done on the receiver end. +const size_t MAX_QUEUED_ELEMS = 250; + +class TCPConnection { - TcpSocket* client; + public: + TCPConnection(TcpSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) { + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); + m_sender_thread = boost::thread(&TCPConnection::process, this, 0); + } + + ~TCPConnection() { + m_running = false; + queue.notify(); + m_sender_thread.join(); + } - DabOutputTcp* tcp = (DabOutputTcp*)param; + ThreadsafeQueue<vec_u8> queue; - while ((client = tcp->server->accept()) != NULL) { - etiLog.log(info, "TCP server got a new client.\n"); - if (tcp->client != NULL) { - delete tcp->client; + bool is_overloaded(void) const { + return queue.size() > MAX_QUEUED_ELEMS; } - tcp->client = client; - } - etiLog.log(error, "TCP thread can't accept new client (%s)\n", - inetErrDesc, inetErrMsg); - return NULL; + private: + TCPConnection(const TCPConnection& other) = delete; + TCPConnection& operator=(const TCPConnection& other) = delete; + + atomic<bool> m_running; + boost::thread m_sender_thread; + TcpSocket m_sock; + + void process(long) { + while (m_running) { + vec_u8 data; + queue.wait_and_pop(data); + + try { + m_sock.send(&data[0], data.size()); + } + catch (std::runtime_error& e) { + m_running = false; + } + } + } +}; + +class TCPDataDispatcher +{ + public: + ~TCPDataDispatcher() { + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); + } + + void start(int port, const string& address) { + TcpSocket sock(port, address); + m_listener_socket = move(sock); + + m_running = true; + m_listener_thread = boost::thread(&TCPDataDispatcher::process, this, 0); + } + + void Write(const vec_u8& data) { + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if([](TCPConnection& conn){ return conn.is_overloaded(); }); + } + + private: + void process(long) { + m_listener_socket.listen(); + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + m_connections.emplace(m_connections.begin(), m_listener_socket.accept()); + } + } + + atomic<bool> m_running; + boost::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list<TCPConnection> m_connections; +}; + +DabOutputTcp::~DabOutputTcp() +{ + if (dispatcher_) { + delete dispatcher_; + dispatcher_ = nullptr; + } } -int DabOutputTcp::Open(const char* name) +static bool parse_uri(const char *uri, long *port, string& addr) { - char* hostport = strdup(name); // the name is actually an tuple host:port + char* const hostport = strdup(uri); // the uri is actually an tuple host:port char* address; - long port; address = strchr((char*)hostport, ':'); if (address == NULL) { etiLog.log(error, @@ -82,98 +165,61 @@ int DabOutputTcp::Open(const char* name) // terminate string hostport after the host, and advance address to the port number *(address++) = 0; - port = strtol(address, (char **)NULL, 10); - if ((port == LONG_MIN) || (port == LONG_MAX)) { + *port = strtol(address, (char **)NULL, 10); + if ((*port == LONG_MIN) || (*port == LONG_MAX)) { etiLog.log(error, "can't convert port number in tcp address %s\n", address); goto tcp_open_fail; } - if (port == 0) { + if (*port == 0) { etiLog.log(error, "can't use port number 0 in tcp address\n"); goto tcp_open_fail; } - address = hostport; - if (strlen(address) > 0) { - if (this->server->create(port, address) == -1) { - etiLog.log(error, "Can't create Tcp server on %s:%i " - "(%s: %s) -> aborting\n", - address, port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } else { - if (this->server->create(port) == -1) { - etiLog.log(error, "Can't create Tcp server on :%i " - "(%s: %s) -> aborting\n", - port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } + addr = hostport; + free(hostport); + return true; - //sprintf(name, "%s:%i", this->packet_->getAddress().getHostAddress(), - // this->packet_->getAddress().getPort()); +tcp_open_fail: + free(hostport); + return false; +} - if (this->server->listen() == -1) { - etiLog.log(error, "Can't listen on Tcp socket (%s: %s)\n", - inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } -#ifdef _WIN32 - this->thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)tcpThread, this, 0, NULL); - if (this->thread_ == NULL) { - fprintf(stderr, "Can't create TCP child"); - goto tcp_open_fail; +int DabOutputTcp::Open(const char* name) +{ + long port = 0; + string address; + bool success = parse_uri(name, &port, address); + + if (success) { + dispatcher_ = new TCPDataDispatcher(); + dispatcher_->start(port, address); } -#else - if (pthread_create(&this->thread_, NULL, tcpThread, this)) { - perror("Can't create TCP child"); - goto tcp_open_fail; + else { + stringstream ss; + ss << "Could not parse TCP output address " << name; + throw std::runtime_error(ss.str()); } -#endif - return 0; - -tcp_open_fail: - free(hostport); - return -1; } int DabOutputTcp::Write(void* buffer, int size) { + vec_u8 data(6144); + uint8_t* buffer_u8 = (uint8_t*)buffer; - if (this->client != NULL) { - if (this->client->write(&size, 2) == 2) { - if (this->client->write(buffer, size) != size) { - return size; - } - } - else { - etiLog.log(info, "TCP server client disconnected.\n"); - delete this->client; - this->client = NULL; - } - } + std::copy(buffer_u8, buffer_u8 + size, data.begin()); + + // Pad to 6144 bytes + std::fill(data.begin() + size, data.end(), 0x55); + + dispatcher_->Write(data); return size; } int DabOutputTcp::Close() { - this->server->close(); - if( this->client != NULL ) - this->client->close(); -#ifdef WIN32 - DWORD status; - for (int i = 0; i < 5; ++i) { - if (GetExitCodeThread(this->thread_, &status)) { - break; - } - Sleep(100); - } - TerminateThread(this->thread_, 1); -#else - pthread_kill(this->thread_, SIGPIPE); -#endif return 0; } diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index 433ef8f..9d3ea84 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org UDP output */ @@ -79,12 +81,6 @@ int DabOutputUdp::Open(const char* name) this->packet_->getAddress().setPort(port); - if (this->socket_->create() == -1) { - etiLog.level(error) << "can't create UDP socket (" << - inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string query_params = what[3]; smatch query_what; if (regex_match(query_params, query_what, re_query, match_default)) { @@ -132,7 +128,7 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setLength(0); + this->packet_->setSize(0); this->packet_->addData(buffer, size); return this->socket_->send(*this->packet_); } diff --git a/src/utils.cpp b/src/utils.cpp index 7a20c43..54f19b3 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -107,9 +107,6 @@ void header_message() #if defined(HAVE_INPUT_TEST) " test" << #endif -#if defined(HAVE_INPUT_SLIP) - " slip" << -#endif #if defined(HAVE_INPUT_UDP) " udp" << #endif |