From 9248c1d7976ba1c37e3df147a1eb3115fe72c8d0 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 11 Sep 2016 22:15:35 +0200 Subject: Drop SLIP, Refactor sockets, improve TCP output Quite a large refactoring of the sockets, TCP and UDP, in order to improve the ETI-over-TCP output. This can now accept several simultaneous connections, and requires a throttle. The SLIP input is gone. The UDP inputs are currently broken. --- .gitignore | 1 + TODO | 1 + configure.ac | 8 +- doc/example.mux | 13 +- src/ConfigParser.cpp | 5 - src/DabMultiplexer.cpp | 10 +- src/DabMux.cpp | 3 - src/InetAddress.cpp | 127 ++--------- src/InetAddress.h | 43 ++-- src/Makefile.am | 2 - src/ParserCmdline.cpp | 4 - src/TcpServer.cpp | 243 -------------------- src/TcpServer.h | 84 ------- src/TcpSocket.cpp | 385 ++++++++----------------------- src/TcpSocket.h | 127 ++++++----- src/UdpSocket.cpp | 504 ++++++++++------------------------------- src/UdpSocket.h | 197 +++++++++------- src/dabInputBridgeUdp.cpp | 6 +- src/dabInputDmbFile.cpp | 3 +- src/dabInputDmbUdp.cpp | 1 - src/dabInputSlip.cpp | 412 --------------------------------- src/dabInputSlip.h | 52 ----- src/dabOutput/dabOutput.h | 84 ++----- src/dabOutput/dabOutputTcp.cpp | 250 +++++++++++--------- src/dabOutput/dabOutputUdp.cpp | 14 +- src/utils.cpp | 3 - 26 files changed, 629 insertions(+), 1953 deletions(-) delete mode 100644 src/TcpServer.cpp delete mode 100644 src/TcpServer.h delete mode 100644 src/dabInputSlip.cpp delete mode 100644 src/dabInputSlip.h diff --git a/.gitignore b/.gitignore index 54dc3e2..0492be0 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ config.h.in~ config.h.in .deps .dirstamp +*.plist cscope.out ctags diff --git a/TODO b/TODO index a3a7fee..5364e54 100644 --- a/TODO +++ b/TODO @@ -46,6 +46,7 @@ pointers that do dynamic dispatch. Refactoring this to proper classes and documenting it properly will simplify the addition of new input formats, facilitate runtime configurability and clarify the usages of the inputs. +Also, all inputs using UDP are now broken. Communicate Leap Seconds ------------------------ diff --git a/configure.ac b/configure.ac index 03c34d8..15b902c 100644 --- a/configure.ac +++ b/configure.ac @@ -131,12 +131,6 @@ AC_ARG_ENABLE([input_test], [], [enable_input_test=no]) AS_IF([test "x$enable_input_test" = "xyes"], [AC_DEFINE(HAVE_INPUT_TEST, [1], [Define if TEST input is enabled])]) -# SLIP -AC_ARG_ENABLE([input_slip], - [AS_HELP_STRING([--enable-input-slip], [Enable SLIP input])], - [], [enable_input_slip=no]) -AS_IF([test "x$enable_input_slip" = "xyes"], - [AC_DEFINE(HAVE_INPUT_SLIP, [1], [Define if SLIP input is enabled])]) # UDP AC_ARG_ENABLE([input_udp], [AS_HELP_STRING([--enable-input-udp], [Enable UDP input])], @@ -267,7 +261,7 @@ echo echo "Inputs:" enabled="" disabled="" -for output in prbs test slip udp fifo file +for output in prbs test udp fifo file do eval var=\$enable_input_$output AS_IF([test "x$var" = "xyes"], diff --git a/doc/example.mux b/doc/example.mux index c7f1f2d..5a65829 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -153,10 +153,18 @@ outputs { ; Output RAW ETI NI to standard output stdout "fifo:///dev/stdout?type=raw" - ; ZeroMQ output example + ; ZeroMQ output example + ; This output does not back-pressure the multiplexer. ; Listen on all interfaces, on port 9100 ;zmq "zmq+tcp://*:9100" + ; Output ETI-over-TCP. This is like piping a RAW ETI NI data stream + ; into a TCP socket, except that the output can handle simultaneous + ; connections. + ; 0.0.0.0 means "listen on all interfaces" + ; This output does not back-pressure the multiplexer. + ;tcp "tcp://0.0.0.0:9200" + ; Throttle output to real-time (one ETI frame every 24ms) ;throttle "simul://" @@ -167,6 +175,7 @@ outputs { ; For an output to a pipe, the data consumer at the other end of the pipe ; will dictate the multiplexing rate to ODR-DabMux. ; - ; If you use the zmq output, you must also enable a simul:// output! + ; If you use the zmq+tcp:// or the tcp:// output, + ; you must also enable a simul:// output! } diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index b6d1482..733b5df 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -61,7 +61,6 @@ #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" #include "dabInputBridgeUdp.h" -#include "dabInputSlip.h" #include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" @@ -744,10 +743,6 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan, } else if (proto == "udp") { operations = dabInputBridgeUdpOperations; #endif // defined(HAVE_INPUT_UDP) -#if defined(HAVE_INPUT_SLIP) - } else if (proto == "slip") { - operations = dabInputSlipOperations; -#endif // defined(HAVE_INPUT_SLIP) #endif // defined(HAVE_FORMAT_BRIDGE) } } else if (type == "data") { diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 12876aa..3fe3078 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -98,16 +98,10 @@ void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) if (edi_conf.enabled()) { for (auto& edi_destination : edi_conf.destinations) { - auto edi_output = std::make_shared(); - int err = edi_output->create(edi_destination.source_port); - - if (err) { - etiLog.level(error) << "EDI socket creation failed!"; - throw MuxInitException(); - } + auto edi_output = std::make_shared(edi_destination.source_port); if (not edi_destination.source_addr.empty()) { - err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); + int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str()); if (err) { etiLog.level(error) << "EDI socket set source failed!"; throw MuxInitException(); diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 0420e34..9699ea6 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -105,7 +105,6 @@ typedef DWORD32 uint32_t; #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" #include "dabInputBridgeUdp.h" -#include "dabInputSlip.h" #include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" @@ -500,8 +499,6 @@ int main(int argc, char *argv[]) outputs.clear(); - UdpSocket::clean(); - if (returnCode != 0) { etiLog.log(emerg, "...aborting\n"); } else { diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp index 3fc33ad..90cdd06 100644 --- a/src/InetAddress.cpp +++ b/src/InetAddress.cpp @@ -1,6 +1,11 @@ /* Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -22,12 +27,8 @@ #include "InetAddress.h" #include #include - -#ifdef _WIN32 -#else -# include -# include -#endif +#include +#include #ifdef TRACE_ON # ifndef TRACE_CLASS @@ -127,31 +128,33 @@ void InetAddress::setPort(int port) * @return 0 if ok * -1 if error */ -int InetAddress::setAddress(const char *name) +int InetAddress::setAddress(const std::string& name) { - TRACE_CLASS("InetAddress", "setAddress(char*)"); - if (name) { - if (atoi(name)) { // If it start with a number - if ((addr.sin_addr.s_addr = inet_addr(name)) == INADDR_NONE) { + TRACE_CLASS("InetAddress", "setAddress(string)"); + if (!name.empty()) { + if (atoi(name.c_str())) { // If it start with a number + if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) { addr.sin_addr.s_addr = htons(INADDR_ANY); inetErrNo = 0; inetErrMsg = "Invalid address"; - inetErrDesc = name; + inetErrDesc = name.c_str(); return -1; } - } else { // Assume it's a real name - hostent *host = gethostbyname(name); + } + else { // Assume it's a real name + hostent *host = gethostbyname(name.c_str()); if (host) { addr.sin_addr = *(in_addr *)(host->h_addr); } else { addr.sin_addr.s_addr = htons(INADDR_ANY); inetErrNo = 0; inetErrMsg = "Could not find address"; - inetErrDesc = name; + inetErrDesc = name.c_str(); return -1; } } - } else { + } + else { addr.sin_addr.s_addr = INADDR_ANY; } return 0; @@ -161,100 +164,8 @@ int InetAddress::setAddress(const char *name) void setInetError(const char* description) { inetErrNo = 0; -#ifdef _WIN32 - inetErrNo = WSAGetLastError(); - switch (inetErrNo) { - case WSANOTINITIALISED: - inetErrMsg = "WSANOTINITIALISED A successful WSAStartup must occur before using this function."; - break; - case WSAENETDOWN: - inetErrMsg = "WSAENETDOWN The network subsystem has failed."; - break; - case WSAEFAULT: - inetErrMsg = "WSAEFAULT The buf or from parameters are not part of the user address space, or the fromlen parameter is too small to accommodate the peer address."; - break; - case WSAEINTR: - inetErrMsg = "WSAEINTR The (blocking) call was canceled through WSACancelBlockingCall."; - break; - case WSAEINPROGRESS: - inetErrMsg = "WSAEINPROGRESS A blocking Windows Sockets 1.1 call is in progress, or the service provider is still processing a callback function."; - break; - case WSAEINVAL: - inetErrMsg = "WSAEINVAL The socket has not been bound with bind, or an unknown flag was specified, or MSG_OOB was specified for a socket with SO_OOBINLINE enabled, or (for byte stream-style sockets only) len was zero or negative."; - break; - case WSAEISCONN: - inetErrMsg = "WSAEISCONN The socket is connected. This function is not permitted with a connected socket, whether the socket is connection-oriented or connectionless."; - break; - case WSAENETRESET: - inetErrMsg = "WSAENETRESET The connection has been broken due to the \"keep-alive\" activity detecting a failure while the operation was in progress."; - break; - case WSAENOTSOCK: - inetErrMsg = "WSAENOTSOCK The descriptor is not a socket."; - break; - case WSAEOPNOTSUPP: - inetErrMsg = "WSAEOPNOTSUPP MSG_OOB was specified, but the socket is not stream-style such as type SOCK_STREAM, out-of-band data is not supported in the communication domain associated with this socket, or the socket is unidirectional and supports only send operations."; - break; - case WSAESHUTDOWN: - inetErrMsg = "WSAESHUTDOWN The socket has been shut down; it is not possible to recvfrom on a socket after shutdown has been invoked with how set to SD_RECEIVE or SD_BOTH."; - break; - case WSAEWOULDBLOCK: - inetErrMsg = "WSAEWOULDBLOCK The socket is marked as nonblocking and the recvfrom operation would block."; - break; - case WSAEMSGSIZE: - inetErrMsg = "WSAEMSGSIZE The message was too large to fit into the specified buffer and was truncated."; - break; - case WSAETIMEDOUT: - inetErrMsg = "WSAETIMEDOUT The connection has been dropped, because of a network failure or because the system on the other end went down without notice."; - break; - case WSAECONNRESET: - inetErrMsg = "WSAECONNRESET"; - break; - case WSAEACCES: - inetErrMsg = "WSAEACCES The requested address is a broadcast address, but the appropriate flag was not set. Call setsockopt with the SO_BROADCAST parameter to allow the use of the broadcast address."; - break; - case WSAENOBUFS: - inetErrMsg = "WSAENOBUFS No buffer space is available."; - break; - case WSAENOTCONN: - inetErrMsg = "WSAENOTCONN The socket is not connected (connection-oriented sockets only)"; - break; - case WSAEHOSTUNREACH: - inetErrMsg = "WSAEHOSTUNREACH The remote host cannot be reached from this host at this time."; - break; - case WSAECONNABORTED: - inetErrMsg = "WSAECONNABORTED The virtual circuit was terminated due to a time-out or other failure. The application should close the socket as it is no longer usable."; - break; - case WSAEADDRNOTAVAIL: - inetErrMsg = "WSAEADDRNOTAVAIL The remote address is not a valid address, for example, ADDR_ANY."; - break; - case WSAEAFNOSUPPORT: - inetErrMsg = "WSAEAFNOSUPPORT Addresses in the specified family cannot be used with this socket."; - break; - case WSAEDESTADDRREQ: - inetErrMsg = "WSAEDESTADDRREQ A destination address is required."; - break; - case WSAENETUNREACH: - inetErrMsg = "WSAENETUNREACH The network cannot be reached from this host at this time."; - break; - case WSAEMFILE: - inetErrMsg = "No more socket descriptors are available."; - break; - case WSAEPROTONOSUPPORT: - inetErrMsg = "The specified protocol is not supported."; - break; - case WSAEPROTOTYPE: - inetErrMsg = "The specified protocol is the wrong type for this socket."; - break; - case WSAESOCKTNOSUPPORT: - inetErrMsg = "The specified socket type is not supported in this address family."; - break; - default: - inetErrMsg = "Unknown"; - }; -#else inetErrNo = errno; inetErrMsg = strerror(inetErrNo); -#endif inetErrDesc = description; } diff --git a/src/InetAddress.h b/src/InetAddress.h index 266b1fd..0ccc70b 100644 --- a/src/InetAddress.h +++ b/src/InetAddress.h @@ -1,6 +1,11 @@ /* Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -26,33 +31,17 @@ # include "config.h" #endif -// General libraries #include -// Linux librairies -#ifndef _WIN32 -// # include -# include -# include -# include -# include -# include -# include -# define SOCKET int -# define INVALID_SOCKET -1 -# define closesocket ::close -// Windows librairies -#else -# include -# ifdef _MSC_VER -# pragma comment(lib, "wsock32.lib") -# elif defined(__BORLANDC__) -# pragma(lib, "mswsock.lib") -# endif -# ifndef IN_MULTICAST -# define IN_MULTICAST(a) ((((unsigned long) (a)) & 0xf0000000) == 0xe0000000) -# endif -#endif -// General definitions +#include +#include +#include +#include +#include +#include +#include + +#define SOCKET int +#define INVALID_SOCKET -1 #define INVALID_PORT -1 @@ -79,7 +68,7 @@ class InetAddress { sockaddr *getAddress(); const char *getHostAddress(); int getPort(); - int setAddress(const char *name); + int setAddress(const std::string& name); void setPort(int port); bool isMulticastAddress(); diff --git a/src/Makefile.am b/src/Makefile.am index c572ef3..ede6649 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,7 +65,6 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ dabInputPrbs.h dabInputPrbs.cpp \ dabInputRawFile.h dabInputRawFile.cpp \ dabInputRawFifo.h dabInputRawFifo.cpp \ - dabInputSlip.h dabInputSlip.cpp \ dabInputTest.h dabInputTest.cpp \ dabInputUdp.h dabInputUdp.cpp \ dabInputZmq.h dabInputZmq.cpp \ @@ -94,7 +93,6 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ PcDebug.h \ ReedSolomon.h ReedSolomon.cpp \ RemoteControl.cpp RemoteControl.h \ - TcpServer.h TcpServer.cpp \ TcpSocket.h TcpSocket.cpp \ UdpSocket.h UdpSocket.cpp \ bridge.h bridge.c \ diff --git a/src/ParserCmdline.cpp b/src/ParserCmdline.cpp index 723efd6..8ce6f7a 100644 --- a/src/ParserCmdline.cpp +++ b/src/ParserCmdline.cpp @@ -284,10 +284,6 @@ bool parse_cmdline(char **argv, } else if (strcmp((*subchannel)->inputProto, "udp") == 0) { operations = dabInputBridgeUdpOperations; #endif // defined(HAVE_INPUT_UDP) -#if defined(HAVE_INPUT_SLIP) - } else if (strcmp((*subchannel)->inputProto, "slip") == 0) { - operations = dabInputSlipOperations; -#endif // defined(HAVE_INPUT_SLIP) #endif // defined(HAVE_FORMAT_BRIDGE) } } else if (c == 'D') { diff --git a/src/TcpServer.cpp b/src/TcpServer.cpp deleted file mode 100644 index 20ecffb..0000000 --- a/src/TcpServer.cpp +++ /dev/null @@ -1,243 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "TcpServer.h" -#include -#include -#include - -#ifdef _WIN32 -#else -# include -#endif - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" <setSocket(socket, addr); - } - return client; -} - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/TcpServer.h b/src/TcpServer.h deleted file mode 100644 index bff7e2e..0000000 --- a/src/TcpServer.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef _TCPSERVER -#define _TCPSERVER - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#ifdef _WIN32 -# include -# define socklen_t int -# define reuseopt_t char -#else -# include -# include -# include -# include -# include -# include -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 - -#include -#include "TcpSocket.h" -//#include "SocketSelector.h" - -/** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca - */ -class TcpServer { - friend class SocketSelector; -public: - TcpServer(); - TcpServer(int port, const char *name = NULL); - ~TcpServer(); - - static int init(); - static int clean(); - - int create(int port, const char *name = NULL); - int close(); - int listen(); - TcpSocket* accept(); - - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; -}; - -#endif // _TCPSERVER diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 75b320f..13efece 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -1,6 +1,11 @@ /* Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -20,86 +25,72 @@ */ #include "TcpSocket.h" +#include "Log.h" #include #include #include #include #include +#include -#ifdef _WIN32 -#else -# include -#endif - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" < 0 if ok, -1 (SOCKET_ERROR) if error - */ -int TcpSocket::telnetRead(void* data, int size) -{ - TRACE_CLASS("TcpSocket", "read(void*, size)"); - int ret; - - printf("selectCall\n"); - - printf("readread 1\n"); - char *line=GetLine(listenSocket); - ret=strlen(line); - printf("readread 2\n"); - if (ret <= size) - { - strcpy((char*)data, line); - } - else - { -// size_t n = size; - strcpy((char*)data, line); - ret = size; - } - printf("TELNET READ returned %d\n", ret); - return ret; + return 0; } -/** - * Receive a TCP stream. - * @param data The buffer that will receive data. - * @param size The buffer size. - * @return > 0 if ok, -1 (SOCKET_ERROR) if error - */ -int TcpSocket::read(void* data, int size) +TcpSocket::~TcpSocket() { - TRACE_CLASS("TcpSocket", "read(void*, size)"); - - int ret = recv(listenSocket, (char*)data, size, 0); - if (ret == SOCKET_ERROR) { - setInetError("Can't receive TCP packet"); - return -1; - } - return ret; + close(); } - -#define MAX 512 -char* TcpSocket::GetLine(int fd) +ssize_t TcpSocket::recv(void* data, size_t size) { - static char line[MAX]; - static char netread[MAX] = ""; - int n, len; - char *p; - - len = strlen(netread); - - /* look for \r\n in netread buffer */ - p = strstr(netread, "\r\n"); - if (p == NULL) { - /* fill buff - no \r\n found */ - //n = ::read(fd, (void*)(netread+len), (size_t)(MAX-len)); - n = recv(fd, (netread+len), MAX-len, 0); - if (n == SOCKET_ERROR) { - setInetError("Can't receive TCP packet"); - return NULL; + ssize_t ret = ::recv(m_sock, (char*)data, size, 0); + if (ret == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket recv error: " << strerror(errno); + throw std::runtime_error(ss.str()); } - len += n; - netread[len] = '\0'; - if (n>0) - return GetLine(fd); - } - if (p!=NULL) - { - *p = '\0'; - strcpy(line, netread); - /* copy rest of buf down */ - memmove(netread, p+2, strlen(p+2)+1); - } - return line; + return ret; } -/** - * Send an TCP packet. - * @param data The buffer taht will be sent. - * @param size Number of bytes to send. - * return 0 if ok, -1 if error - */ -int TcpSocket::write(const void* data, int size) +ssize_t TcpSocket::send(const void* data, size_t size) { -#ifdef DUMP - TRACE_CLASS("TcpSocket", "write(const void*, int)"); -#endif + /* Without MSG_NOSIGNAL the process would receive a SIGPIPE and die */ + ssize_t ret = ::send(m_sock, (const char*)data, size, MSG_NOSIGNAL); - // ignore BROKENPIPE signal (we handle it instead) -// void* old_sigpipe = signal ( SIGPIPE, SIG_IGN ); - // try to send data - int ret = send(listenSocket, (const char*)data, size, 0 /*MSG_NOSIGNAL*/ ); - // restore the BROKENPIPE handling -// signal ( SIGPIPE, (__sighandler_t)old_sigpipe ); - if (ret == SOCKET_ERROR) { - setInetError("Can't send TCP packet"); - return -1; - } - return ret; + if (ret == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket send error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + return ret; } - -int TcpSocket::setDestination(InetAddress &addr) +void TcpSocket::listen() { - address = addr; - int ret = connect(listenSocket, addr.getAddress(), sizeof(*addr.getAddress())); - // A etre verifier: code de retour differend entre Linux et Windows - return ret; + if (::listen(m_sock, 1) == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket listen error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } } - -void TcpSocket::setSocket(SOCKET socket, InetAddress &addr) +TcpSocket TcpSocket::accept() { - if (listenSocket != INVALID_SOCKET) - closesocket(listenSocket); - listenSocket = socket; - address = addr; + InetAddress remote_addr; + socklen_t addrLen = sizeof(sockaddr_in); + + SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); + if (socket == SOCKET_ERROR) { + stringstream ss; + ss << "TCP Socket accept error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + else { + TcpSocket client(socket, m_own_address, remote_addr); + return client; + } } - -InetAddress TcpSocket::getAddress() +InetAddress TcpSocket::getOwnAddress() const { - return address; + return m_own_address; } - -int TcpSocket::PeekCount() +InetAddress TcpSocket::getRemoteAddress() const { - int count; - char tempBuffer[3]; - int size=3; - - count = recv(listenSocket, tempBuffer, size, MSG_PEEK); - if (count == -1) - { - printf("ERROR WHEN PEEKING SOCKET\n"); - } - return count; + return m_remote_address; } - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/TcpSocket.h b/src/TcpSocket.h index c667cfd..f1354a7 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -1,6 +1,11 @@ /* Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -27,71 +32,73 @@ #endif #include "InetAddress.h" -#ifdef _WIN32 -# include -# define socklen_t int -# define reuseopt_t char -#else -# include -# include -# include -# include -# include -# include -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 - -//# include "SocketSelector.h" +#include +#include +#include +#include +#include +#include +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#define reuseopt_t int #include +#include /** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca + * This class represents a TCP socket. */ -class TcpSocket { - friend class SocketSelector; - public: - TcpSocket(); - TcpSocket(int port, char *name = NULL); - ~TcpSocket(); - - static int init(); - static int clean(); - - int create(int port = 0, char *name = NULL); - int close(); - - int write(const void* data, int size); - int read(void* data, int size); - int telnetRead(void* data, int size); - /** - * Connects the socket on a specific address. Only data from this address - * will be received. - * @param addr The address to connect the socket - * @warning Not implemented yet. - */ - int setDestination(InetAddress &addr); - InetAddress getAddress(); - void setSocket(SOCKET socket, InetAddress &addr); - char* GetLine(int fd); - - int PeekCount(); - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; +class TcpSocket +{ + public: + /** Create a new socket that does nothing */ + TcpSocket(); + + /** Create a new socket listening for incoming connections. + * @param port The port number on which the socket will listen. + * @param name The IP address on which the socket will be bound. + * It is used to bind the socket on a specific interface if + * the computer have many NICs. + */ + TcpSocket(int port, const std::string& name); + ~TcpSocket(); + TcpSocket(TcpSocket&& other); + TcpSocket& operator=(TcpSocket&& other); + + int close(); + + /** Send data over the TCP connection. + * @param data The buffer that will be sent. + * @param size Number of bytes to send. + * return number of bytes sent or -1 if error + */ + ssize_t send(const void* data, size_t size); + + /** Receive data from the socket. + * @param data The buffer that will receive data. + * @param size The buffer size. + * @return number of bytes received or -1 (SOCKET_ERROR) if error + */ + ssize_t recv(void* data, size_t size); + + void listen(void); + TcpSocket accept(void); + + /** Retrieve address this socket is bound to */ + InetAddress getOwnAddress() const; + InetAddress getRemoteAddress() const; + + private: + TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); + TcpSocket(const TcpSocket& other) = delete; + TcpSocket& operator=(const TcpSocket& other) = delete; + + /// The address on which the socket is bound. + InetAddress m_own_address; + InetAddress m_remote_address; + /// The low-level socket used by system functions. + SOCKET m_sock; }; #endif // _TCPSOCKET diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp index 8ac3706..020e3f5 100644 --- a/src/UdpSocket.cpp +++ b/src/UdpSocket.cpp @@ -2,7 +2,9 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 Matthias P. Braendli + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + http://www.opendigitalradio.org */ /* @@ -30,228 +32,126 @@ #include #include -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" < data, InetAddress destination) +int UdpSocket::send(const std::vector& data, InetAddress destination) { -#ifdef DUMP - TRACE_CLASS("UdpSocket", "send(vector)"); -#endif - int ret = sendto(listenSocket, &data[0], data.size(), 0, - destination.getAddress(), sizeof(*destination.getAddress())); - if (ret == SOCKET_ERROR -#ifndef _WIN32 - && errno != ECONNREFUSED -#endif - ) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; + int ret = sendto(listenSocket, &data[0], data.size(), 0, + destination.getAddress(), sizeof(*destination.getAddress())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + setInetError("Can't send UDP packet"); + return -1; + } + return 0; } @@ -263,36 +163,22 @@ st address to join. */ int UdpSocket::joinGroup(char* groupname) { - TRACE_CLASS("UdpSocket", "joinGroup(char*)"); -#ifdef _WIN32 - ip_mreq group; -#else - ip_mreqn group; -#endif - if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { - setInetError(groupname); - return -1; - } - if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - setInetError("Not a multicast address"); - return -1; - } -#ifdef _WIN32 - group.imr_interface.s_addr = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - return -1; - } -#else - group.imr_address.s_addr = htons(INADDR_ANY);; - group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - } -#endif - return 0; + ip_mreqn group; + if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { + setInetError(groupname); + return -1; + } + if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { + setInetError("Not a multicast address"); + return -1; + } + group.imr_address.s_addr = htons(INADDR_ANY);; + group.imr_ifindex = 0; + if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + == SOCKET_ERROR) { + setInetError("Can't join multicast group"); + } + return 0; } int UdpSocket::setMulticastTTL(int ttl) @@ -323,188 +209,38 @@ int UdpSocket::setMulticastSource(const char* source_addr) return 0; } +UdpPacket::UdpPacket() { } -/** - * Constructs an UDP packet. - * @param initSize The initial size of the data buffer - */ -UdpPacket::UdpPacket(unsigned int initSize) : - dataBuf(new char[initSize]), - length(0), - size(initSize), - offset(0) -{ - TRACE_CLASS("UdpPacket", "UdpPacket(unsigned int)"); - if (dataBuf == NULL) - size = 0; -} +UdpPacket::UdpPacket(size_t initSize) : + m_buffer(initSize) +{ } -/// Destructor -UdpPacket::~UdpPacket() +void UdpPacket::setSize(size_t newSize) { - TRACE_CLASS("UdpPacket", "~UdpPacket()"); - if (dataBuf != NULL) { - delete []dataBuf; - dataBuf = NULL; - } + m_buffer.resize(newSize); } - -/** - * Changes size of the data buffer size. \a Length + \a offset data will be copied - * in the new buffer. - * @warning The pointer to data will be changed - * @param newSize The new data buffer size - */ -void UdpPacket::setSize(unsigned newSize) -{ - TRACE_CLASS("UdpPacket", "setSize(unsigned)"); - char *tmp = new char[newSize]; - if (length > newSize) - length = newSize; - if (tmp) { - memcpy(tmp, dataBuf, length); - delete []dataBuf; - dataBuf = tmp; - size = newSize; - } -} - -/** - * Give the pointer to data. It is ajusted with the \a offset. - * @warning This pointer change. when the \a size of the buffer and the \a offset change. - * @return The pointer - */ -char *UdpPacket::getData() +uint8_t* UdpPacket::getData() { - return dataBuf + offset; + return &m_buffer[0]; } -/** - * Add some data at the end of data buffer and adjust size. - * @param data Pointer to the data to add - * @param size Size in bytes of new data - */ -void UdpPacket::addData(const void *data, unsigned size) +void UdpPacket::addData(const void *data, size_t size) { - if (length + size > this->size) { - setSize(this->size << 1); - } - memcpy(dataBuf + length, data, size); - length += size; + uint8_t *d = (uint8_t*)data; + std::copy(d, d + size, std::back_inserter(m_buffer)); } - -/** - * Returns the length of useful data. Data before the \a offset are ignored. - * @return The data length - */ -unsigned long UdpPacket::getLength() +size_t UdpPacket::getSize() { - return length - offset; + return m_buffer.size(); } - -/** - * Returns the size of the data buffer. - * @return The data buffer size - */ -unsigned long UdpPacket::getSize() +InetAddress UdpPacket::getAddress() { - return size; + return address; } - -/** - * Returns the offset value. - * @return The offset value - */ -unsigned long UdpPacket::getOffset() -{ - return offset; -} - - -/** - * Sets the data length value. Data before the \a offset are ignored. - * @param len The new length of data - */ -void UdpPacket::setLength(unsigned long len) -{ - length = len + offset; -} - - -/** - * Sets the data offset. Data length is ajusted to ignore data before the \a offset. - * @param val The new data offset. - */ -void UdpPacket::setOffset(unsigned long val) -{ - offset = val; - if (offset > length) - length = offset; -} - - -/** - * Returns the UDP address of the data. - * @return The UDP address - */ -InetAddress &UdpPacket::getAddress() -{ - return address; -} - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/UdpSocket.h b/src/UdpSocket.h index 07e9f0e..535499e 100644 --- a/src/UdpSocket.h +++ b/src/UdpSocket.h @@ -2,7 +2,9 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 Matthias P. Braendli + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + http://www.opendigitalradio.org */ /* @@ -30,23 +32,16 @@ #endif #include "InetAddress.h" -#ifdef _WIN32 -# include -# define socklen_t int -# define reuseopt_t char -#else -# include -# include -# include -# include -# include -# include -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 +#include +#include +#include +#include +#include +#include +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#define reuseopt_t int #include #include @@ -62,77 +57,111 @@ class UdpPacket; * Each packet sent or received on a datagram socket is individually * addressed and routed. Multiple packets sent from one machine to another may * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca */ -class UdpSocket { - public: - UdpSocket(); - UdpSocket(int port, char *name = NULL); - ~UdpSocket(); - UdpSocket(const UdpSocket& other) = delete; - const UdpSocket& operator=(const UdpSocket& other) = delete; - - static int init(); - static int clean(); - - int create(int port = 0, char *name = NULL); - - int send(UdpPacket &packet); - int send(const std::vector data); - int send(std::vector data, InetAddress destination); - int receive(UdpPacket &packet); - int joinGroup(char* groupname); - int setMulticastSource(const char* source_addr); - int setMulticastTTL(int ttl); - /** - * Connects the socket on a specific address. Only data from this address - * will be received. - * @param addr The address to connect the socket - * @warning Not implemented yet. - */ - void connect(InetAddress &addr); - int setBlocking(bool block); - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; +class UdpSocket +{ + public: + /** Create a new socket that will not be bound to any port. To be used + * for data output. + */ + UdpSocket(); + /** Create a new socket. + * @param port The port number on which the socket will be bound + */ + UdpSocket(int port); + /** Create a new socket. + * @param port The port number on which the socket will be bound + * @param name The IP address on which the socket will be bound. + * It is used to bind the socket on a specific interface if + * the computer have many NICs. + */ + UdpSocket(int port, const std::string& name); + ~UdpSocket(); + UdpSocket(const UdpSocket& other) = delete; + const UdpSocket& operator=(const UdpSocket& other) = delete; + + /** Send an UDP packet. + * @param packet The UDP packet to be sent. It includes the data and the + * destination address + * return 0 if ok, -1 if error + */ + int send(UdpPacket& packet); + + /** Send an UDP packet + * + * return 0 if ok, -1 if error + */ + int send(const std::vector& data, InetAddress destination); + + /** Receive an UDP packet. + * @param packet The packet that will receive the data. The address will be set + * to the source address. + * @return 0 if ok, -1 if error + */ + int receive(UdpPacket& packet); + + int joinGroup(char* groupname); + int setMulticastSource(const char* source_addr); + int setMulticastTTL(int ttl); + + /** Set blocking mode. By default, the socket is blocking. + * @return 0 if ok + * -1 if error + */ + int setBlocking(bool block); + + protected: + int init_sock(int port, const std::string& name); + + /// The address on which the socket is bound. + InetAddress address; + /// The low-level socket used by system functions. + SOCKET listenSocket; }; -/** - * This class represents a UDP packet. +/** This class represents a UDP packet. * - * UDP packets are used to implement a connectionless packet delivery service. - * Each message is routed from one machine to another based solely on - * information contained within that packet. Multiple packets sent from one - * machine to another might be routed differently, and might arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca + * A UDP packet contains a payload (sequence of bytes) and an address. For + * outgoing packets, the address is the destination address. For incoming + * packets, the address tells the user from what source the packet arrived from. */ -class UdpPacket { - public: - UdpPacket(unsigned int initSize = 1024); - UdpPacket(const UdpPacket& packet) = delete; - const UdpPacket& operator=(const UdpPacket&) = delete; - UdpPacket(const UdpPacket&& packet) = delete; - const UdpPacket& operator=(const UdpPacket&&) = delete; - ~UdpPacket(); - - char *getData(); - void addData(const void *data, unsigned size); - unsigned long getLength(); - unsigned long getSize(); - unsigned long getOffset(); - void setLength(unsigned long len); - void setOffset(unsigned long val); - void setSize(unsigned newSize); - InetAddress &getAddress(); - - private: - char *dataBuf; - unsigned long length, size, offset; - InetAddress address; +class UdpPacket +{ + public: + /** Construct an empty UDP packet. + */ + UdpPacket(); + UdpPacket(size_t initSize); + UdpPacket(const UdpPacket& packet) = delete; + const UdpPacket& operator=(const UdpPacket&) = delete; + UdpPacket(const UdpPacket&& packet) = delete; + const UdpPacket& operator=(const UdpPacket&&) = delete; + + /** Give the pointer to data. + * @return The pointer + */ + uint8_t* getData(void); + + /** Append some data at the end of data buffer and adjust size. + * @param data Pointer to the data to add + * @param size Size in bytes of new data + */ + void addData(const void *data, size_t size); + + size_t getSize(void); + + /** Changes size of the data buffer size. Keeps data intact unless + * truncated. + */ + void setSize(size_t newSize); + + /** Returns the UDP address of the packet. + */ + InetAddress getAddress(void); + + private: + std::vector m_buffer; + InetAddress address; }; #endif // _UDPSOCKET - diff --git a/src/dabInputBridgeUdp.cpp b/src/dabInputBridgeUdp.cpp index 95ba487..fdf3d1f 100644 --- a/src/dabInputBridgeUdp.cpp +++ b/src/dabInputBridgeUdp.cpp @@ -76,18 +76,18 @@ int dabInputBridgeUdpRead(dabInputOperations* ops, void* args, void* buffer, int stats->frameRecords[stats->frameCount].curSize = 0; stats->frameRecords[stats->frameCount].maxSize = size; - if (input->udpData->packet->getLength() == 0) { + if (input->udpData->packet->getSize() == 0) { input->udpData->socket->receive(*input->udpData->packet); } while ((nbBytes = writePacket(input->udpData->packet->getData(), - input->udpData->packet->getLength(), buffer, size, + input->udpData->packet->getSize(), buffer, size, input->info)) != 0) { stats->frameRecords[stats->frameCount].curSize = nbBytes; input->udpData->socket->receive(*input->udpData->packet); } - if (input->udpData->packet->getLength() != 0) { + if (input->udpData->packet->getSize() != 0) { stats->frameRecords[stats->frameCount].curSize = size; } diff --git a/src/dabInputDmbFile.cpp b/src/dabInputDmbFile.cpp index 9262e6c..423d644 100644 --- a/src/dabInputDmbFile.cpp +++ b/src/dabInputDmbFile.cpp @@ -27,6 +27,7 @@ #include #include +#ifdef HAVE_FORMAT_DMB struct dabInputDmbFileData { FILE* file; @@ -62,7 +63,6 @@ int dabInputDmbFileInit(void** args) input->dmb = new Dmb(); *args = input; - UdpSocket::init(); return 0; } @@ -159,3 +159,4 @@ int dabInputDmbFileClean(void** args) } +#endif //HAVE_FORMAT_DMB diff --git a/src/dabInputDmbUdp.cpp b/src/dabInputDmbUdp.cpp index 3609de2..bd1bde7 100644 --- a/src/dabInputDmbUdp.cpp +++ b/src/dabInputDmbUdp.cpp @@ -65,7 +65,6 @@ int dabInputDmbUdpInit(void** args) input->dmb = new Dmb(); *args = input; - UdpSocket::init(); return 0; } diff --git a/src/dabInputSlip.cpp b/src/dabInputSlip.cpp deleted file mode 100644 index 0063cca..0000000 --- a/src/dabInputSlip.cpp +++ /dev/null @@ -1,412 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "dabInputSlip.h" -#include "dabInputFifo.h" -#include "TcpServer.h" -#include "UdpSocket.h" -#include "bridge.h" - -#include -#include -#include - -#ifdef HAVE_FORMAT_BRIDGE -# ifdef HAVE_INPUT_SLIP - -#ifdef _WIN32 -# include - -# define sem_t HANDLE -# define O_NONBLOCK 0 -#else -# include -# define O_BINARY 0 -#endif - - -struct dabInputSlipData { - TcpServer* server; - UdpPacket** packets; - UdpPacket* buffer; - bridgeInfo* info; - dabInputFifoStats stats; - pthread_t thread; - sem_t semWrite; - sem_t semQueue; - bool reading; - volatile int nbPackets; - volatile int queueSize; - volatile int packetSize; -}; - - -struct dabInputOperations dabInputSlipOperations = { - dabInputSlipInit, - dabInputSlipOpen, - dabInputSlipSetbuf, - dabInputSlipRead, - NULL, - NULL, - dabInputSlipReadFrame, - dabInputSetbitrate, - dabInputSlipClose, - dabInputSlipClean, - NULL -}; - - -int dabInputSlipInit(void** args) -{ - dabInputSlipData* data = new dabInputSlipData; - memset(&data->stats, 0, sizeof(data->stats)); - data->stats.id = dabInputFifoData::nb++; - data->server = new TcpServer(); - data->packetSize = 1500; - data->queueSize = 10; - data->packets = new UdpPacket*[data->queueSize]; - for (int i = 0; i < data->queueSize; ++i) { - data->packets[i] = new UdpPacket(data->packetSize); - } - data->buffer = new UdpPacket(data->packetSize); - data->nbPackets = 0; - data->info = new bridgeInfo; - data->thread = (pthread_t)NULL; - bridgeInitInfo(data->info); - -#ifdef _WIN32 - char semName[32]; - sprintf(semName, "semWrite%i", data->stats.id); - data->semWrite = CreateSemaphore(NULL, 1, 1, semName); - if (data->semWrite == NULL) { - fprintf(stderr, "Can't init SLIP data write semaphore %s\n", semName); - return -1; - } - sprintf(semName, "semQueue%i", data->stats.id); - data->semQueue = CreateSemaphore(NULL, 1, 1, semName); - if (data->semQueue == NULL) { - fprintf(stderr, "Can't init SLIP data index semaphore %s\n", semName); - return -1; - } -#else - if (sem_init(&data->semWrite, 0, data->queueSize) == -1) { - perror("Can't init SLIP data write semaphore"); - return -1; - } - if (sem_init(&data->semQueue, 0, 1) == -1) { - perror("Can't init SLIP data index semaphore"); - return -1; - } -#endif - data->reading = false; - - *args = data; - return 0; -} - - -void* dabInputSlipThread(void* args) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - TcpSocket* client; - - while ((client = data->server->accept()) != NULL) { - int size = 0; - etiLog.log(info, "SLIP server got a new client.\n"); - -#ifdef _WIN32 - WaitForSingleObject(data->semWrite, INFINITE); - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semWrite); - sem_wait(&data->semQueue); -#endif - UdpPacket* packet = data->packets[data->nbPackets]; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - - while ((size = client->read(packet->getData(), packet->getSize())) - > 0) { - packet->setLength(size); -#ifdef _WIN32 - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semQueue); -#endif - data->nbPackets++; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - -#ifdef _WIN32 - WaitForSingleObject(data->semWrite, INFINITE); - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semWrite); - sem_wait(&data->semQueue); -#endif - packet = data->packets[data->nbPackets]; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - } - etiLog.log(info, "SLIP server client deconnected.\n"); - client->close(); - } - etiLog.log(error, "SLIP thread can't accept new client (%s)\n", - inetErrDesc, inetErrMsg); - - return NULL; -} - - -int dabInputSlipOpen(void* args, const char* inputName) -{ - const char* address; - long port; - address = strchr(inputName, ':'); - if (address == NULL) { - etiLog.log(error, "\"%s\" SLIP address format is invalid: " - "should be [address]:port - > aborting\n", inputName); - return -1; - } - ++address; - port = strtol(address, (char **)NULL, 10); - if ((port == LONG_MIN) || (port == LONG_MAX)) { - etiLog.log(error, "can't convert port number in SLIP address %s\n", - address); - return -1; - } - if (port == 0) { - etiLog.log(error, "can't use port number 0 in SLIP address\n"); - return -1; - } - dabInputSlipData* data = (dabInputSlipData*)args; - if (data->server->create(port) == -1) { - etiLog.log(error, "can't set port %i on SLIP input (%s: %s)\n", - port, inetErrDesc, inetErrMsg); - return -1; - } - - if (data->server->listen() == -1) { - etiLog.log(error, "can't listen on SLIP socket(%s: %s)\n", - inetErrDesc, inetErrMsg); - return -1; - } -#ifdef _WIN32 - data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputSlipThread, data, 0, NULL); - if (data->thread == NULL) { - fprintf(stderr, "Can't create SLIP child"); - return -1; - } -#else - if (pthread_create(&data->thread, NULL, dabInputSlipThread, data)) { - perror("Can't create SLIP child"); - return -1; - } -#endif - - etiLog.log(debug, "check return code of create\n"); - return 0; -} - - -int dabInputSlipSetbuf(void* args, int size) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - - if (size <= 10) { - return -1; - } - - data->packetSize = size - 10; - if (data->packets == NULL) { - data->packets = new UdpPacket*[data->queueSize]; - } - for (int i = 0; i < data->queueSize; ++i) { - if (data->packets[i] == NULL) { - data->packets[i] = new UdpPacket(data->packetSize); - } else { - data->packets[i]->setSize(data->packetSize); - } - } - if (data->buffer == NULL) { - data->buffer = new UdpPacket(data->packetSize); - } else { - data->buffer->setSize(data->packetSize); - } - - return 0; -} - - -int dabInputSlipRead(void* args, void* buffer, int size) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - - if (data->nbPackets > 0) { // data ready - UdpPacket* temp; - temp = data->buffer; - data->buffer = data->packets[0]; - -#ifdef _WIN32 - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semQueue); -#endif - for (int i = 1; i < data->queueSize; ++i) { - data->packets[i - 1] = data->packets[i]; - } - data->packets[data->queueSize - 1] = temp; - --data->nbPackets; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); - ReleaseSemaphore(data->semWrite, 1, NULL); -#else - sem_post(&data->semQueue); - sem_post(&data->semWrite); -#endif - } else { - data->buffer->setLength(0); - } - - return data->buffer->getLength(); -} - - -int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) -{ - int nbBytes = 0; - dabInputSlipData* data = (dabInputSlipData*)args; - dabInputFifoStats* stats = (dabInputFifoStats*)&data->stats; - -#ifdef _WIN32 - WaitForSingleObject(data->semQueue, INFINITE); -#else - sem_wait(&data->semQueue); -#endif - stats->bufferRecords[stats->bufferCount].curSize = data->nbPackets; - stats->bufferRecords[stats->bufferCount].maxSize = data->queueSize; -#ifdef _WIN32 - ReleaseSemaphore(data->semQueue, 1, NULL); -#else - sem_post(&data->semQueue); -#endif - if (++stats->bufferCount == NB_RECORDS) { - etiLog.log(info, "SLIP buffer state: (%i)", stats->id); - for (int i = 0; i < stats->bufferCount; ++i) { - etiLog.log(info, " %i/%i", - stats->bufferRecords[i].curSize, - stats->bufferRecords[i].maxSize); - } - etiLog.log(info, "\n"); - - stats->bufferCount = 0; - } - - data->stats.frameRecords[data->stats.frameCount].curSize = 0; - data->stats.frameRecords[data->stats.frameCount].maxSize = size; - - if (data->buffer->getLength() == 0) { - ops->read(args, NULL, 0); - } - while ((nbBytes = writePacket(data->buffer->getData(), - data->buffer->getLength(), buffer, size, data->info)) - != 0) { - data->stats.frameRecords[data->stats.frameCount].curSize = nbBytes; - ops->read(args, NULL, 0); - } - - if (data->buffer->getLength() != 0) { - data->stats.frameRecords[data->stats.frameCount].curSize = size; - } - - if (++stats->frameCount == NB_RECORDS) { - etiLog.log(info, "Data subchannel usage: (%i)", - stats->id); - for (int i = 0; i < stats->frameCount; ++i) { - etiLog.log(info, " %i/%i", - stats->frameRecords[i].curSize, - stats->frameRecords[i].maxSize); - } - etiLog.log(info, "\n"); - stats->frameCount = 0; - } - return size; -} - - -int dabInputSlipClose(void* args) -{ - dabInputSlipData* data = (dabInputSlipData*)args; - data->server->close(); -#ifdef WIN32 - DWORD status; - for (int i = 0; i < 5; ++i) { - if (GetExitCodeThread(data->thread, &status)) { - break; - } - Sleep(100); - } - TerminateThread(data->thread, 1); -#else - if (data->thread != (pthread_t)NULL) { - pthread_kill(data->thread, SIGPIPE); - } -#endif - return 0; -} - - -int dabInputSlipClean(void** args) -{ - dabInputSlipData* data = (dabInputSlipData*)(*args); -#ifdef _WIN32 - CloseHandle(data->thread); - CloseHandle(data->semWrite); - CloseHandle(data->semQueue); -#else - sem_destroy(&data->semWrite); - sem_destroy(&data->semQueue); -#endif - for (int i = 0; i < data->queueSize; ++i) { - if (data->packets[i] != NULL) { - delete data->packets[i]; - } - } - delete []data->packets; - delete data->buffer; - delete data->server; - delete data->info; - delete data; - return 0; -} - -# endif // HAVE_INPUT_SLIP -#endif // HAVE_FORMAT_BRIDGE - diff --git a/src/dabInputSlip.h b/src/dabInputSlip.h deleted file mode 100644 index 2b8a782..0000000 --- a/src/dabInputSlip.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef DAB_INPUT_SLIP_H -#define DAB_INPUT_SLIP_H - - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif -#include "dabInput.h" - - -#ifdef HAVE_FORMAT_BRIDGE -# ifdef HAVE_INPUT_SLIP - - -extern struct dabInputOperations dabInputSlipOperations; - -int dabInputSlipInit(void** args); -void* dabInputSlipThread(void* args); -int dabInputSlipOpen(void* args, const char* inputName); -int dabInputSlipSetbuf(void* args, int size); -int dabInputSlipRead(void* args, void* buffer, int size); -int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputSlipClose(void* args); -int dabInputSlipClean(void** args); - - -# endif -#endif - - -#endif // DAB_INPUT_SLIP_H diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index d68cd9c..4a528bd 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -2,13 +2,13 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li - http://opendigitalradio.org + http://www.opendigitalradio.org An object-oriented version of the output channels. - */ +*/ /* This file is part of ODR-DabMux. @@ -30,7 +30,6 @@ #define __DAB_OUTPUT_H #include "UdpSocket.h" -#include "TcpServer.h" #include "Log.h" #include "string.h" #include @@ -39,20 +38,11 @@ #include #include -#ifdef _WIN32 -# include -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include -# endif -# include -#else -# include -# include -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif +#include +#include +#ifndef O_BINARY +# define O_BINARY 0 +#endif // O_BINARY #ifdef HAVE_OUTPUT_ZEROMQ # include "zmq.hpp" #endif @@ -168,21 +158,15 @@ class DabOutputRaw : public DabOutput public: DabOutputRaw() { -#ifdef _WIN32 - socket_ = INVALID_HANDLE_VALUE; -#else socket_ = -1; isCyclades_ = false; -#endif buffer_ = new unsigned char[6144]; } DabOutputRaw(const DabOutputRaw& other) { socket_ = other.socket_; -#ifndef _WIN32 isCyclades_ = other.isCyclades_; -#endif buffer_ = new unsigned char[6144]; memcpy(buffer_, other.buffer_, 6144); } @@ -202,12 +186,8 @@ class DabOutputRaw : public DabOutput } private: std::string filename_; -#ifdef _WIN32 - HANDLE socket_; -#else int socket_; bool isCyclades_; -#endif unsigned char* buffer_; }; @@ -216,17 +196,10 @@ class DabOutputUdp : public DabOutput { public: DabOutputUdp() { - UdpSocket::init(); packet_ = new UdpPacket(6144); socket_ = new UdpSocket(); } - // make sure we don't copy this output around - // the UdpPacket and UdpSocket do not support - // copying either - DabOutputUdp(const DabOutputUdp& other); - DabOutputUdp operator=(const DabOutputUdp& other); - ~DabOutputUdp() { delete socket_; delete packet_; @@ -240,34 +213,26 @@ class DabOutputUdp : public DabOutput return "udp://" + uri_; } private: + // make sure we don't copy this output around + // the UdpPacket and UdpSocket do not support + // copying either + DabOutputUdp(const DabOutputUdp& other) = delete; + DabOutputUdp operator=(const DabOutputUdp& other) = delete; + std::string uri_; UdpSocket* socket_; UdpPacket* packet_; }; // -------------- TCP ------------------ +class TCPDataDispatcher; class DabOutputTcp : public DabOutput { public: - DabOutputTcp() - { - TcpSocket::init(); - server = new TcpServer(); - client = NULL; - } - - DabOutputTcp(const DabOutputTcp& other); - DabOutputTcp operator=(const DabOutputTcp& other); - - ~DabOutputTcp() { - -#ifdef _WIN32 - CloseHandle(this->thread_); -#endif - - delete this->server; - delete this->client; - } + DabOutputTcp() {} + DabOutputTcp(const DabOutputTcp& other) = delete; + const DabOutputTcp& operator=(const DabOutputTcp& other) = delete; + ~DabOutputTcp(); int Open(const char* name); int Write(void* buffer, int size); @@ -277,11 +242,10 @@ class DabOutputTcp : public DabOutput return "tcp://" + uri_; } - TcpServer* server; - TcpSocket* client; private: std::string uri_; - pthread_t thread_; + + TCPDataDispatcher* dispatcher_; }; // -------------- Simul ------------------ @@ -306,11 +270,7 @@ class DabOutputSimul : public DabOutput } private: std::string name_; -#ifdef _WIN32 - DWORD startTime_; -#else std::chrono::steady_clock::time_point startTime_; -#endif }; #if defined(HAVE_OUTPUT_ZEROMQ) diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 5f1943d..a8ae1bc 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org TCP output */ @@ -28,48 +30,129 @@ #include #include #include "dabOutput.h" -#include "TcpServer.h" - -#ifdef _WIN32 -# include -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include -# endif -# include -#else -# include -# include -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif - -void* tcpThread(void* param) +#include +#include +#include +#include +#include +#include +#include "ThreadsafeQueue.h" +#include "TcpSocket.h" + +using namespace std; + +using vec_u8 = std::vector; + +// In ETI one element would be an ETI frame of 6144 bytes. +// 250 frames correspond to 6 seconds. This is mostly here +// to ensure we do not accumulate data for faulty sockets, delay +// management has to be done on the receiver end. +const size_t MAX_QUEUED_ELEMS = 250; + +class TCPConnection { - TcpSocket* client; + public: + TCPConnection(TcpSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) { + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); + m_sender_thread = boost::thread(&TCPConnection::process, this, 0); + } + + ~TCPConnection() { + m_running = false; + queue.notify(); + m_sender_thread.join(); + } - DabOutputTcp* tcp = (DabOutputTcp*)param; + ThreadsafeQueue queue; - while ((client = tcp->server->accept()) != NULL) { - etiLog.log(info, "TCP server got a new client.\n"); - if (tcp->client != NULL) { - delete tcp->client; + bool is_overloaded(void) const { + return queue.size() > MAX_QUEUED_ELEMS; } - tcp->client = client; - } - etiLog.log(error, "TCP thread can't accept new client (%s)\n", - inetErrDesc, inetErrMsg); - return NULL; + private: + TCPConnection(const TCPConnection& other) = delete; + TCPConnection& operator=(const TCPConnection& other) = delete; + + atomic m_running; + boost::thread m_sender_thread; + TcpSocket m_sock; + + void process(long) { + while (m_running) { + vec_u8 data; + queue.wait_and_pop(data); + + try { + m_sock.send(&data[0], data.size()); + } + catch (std::runtime_error& e) { + m_running = false; + } + } + } +}; + +class TCPDataDispatcher +{ + public: + ~TCPDataDispatcher() { + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); + } + + void start(int port, const string& address) { + TcpSocket sock(port, address); + m_listener_socket = move(sock); + + m_running = true; + m_listener_thread = boost::thread(&TCPDataDispatcher::process, this, 0); + } + + void Write(const vec_u8& data) { + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if([](TCPConnection& conn){ return conn.is_overloaded(); }); + } + + private: + void process(long) { + m_listener_socket.listen(); + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + m_connections.emplace(m_connections.begin(), m_listener_socket.accept()); + } + } + + atomic m_running; + boost::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list m_connections; +}; + +DabOutputTcp::~DabOutputTcp() +{ + if (dispatcher_) { + delete dispatcher_; + dispatcher_ = nullptr; + } } -int DabOutputTcp::Open(const char* name) +static bool parse_uri(const char *uri, long *port, string& addr) { - char* hostport = strdup(name); // the name is actually an tuple host:port + char* const hostport = strdup(uri); // the uri is actually an tuple host:port char* address; - long port; address = strchr((char*)hostport, ':'); if (address == NULL) { etiLog.log(error, @@ -82,98 +165,61 @@ int DabOutputTcp::Open(const char* name) // terminate string hostport after the host, and advance address to the port number *(address++) = 0; - port = strtol(address, (char **)NULL, 10); - if ((port == LONG_MIN) || (port == LONG_MAX)) { + *port = strtol(address, (char **)NULL, 10); + if ((*port == LONG_MIN) || (*port == LONG_MAX)) { etiLog.log(error, "can't convert port number in tcp address %s\n", address); goto tcp_open_fail; } - if (port == 0) { + if (*port == 0) { etiLog.log(error, "can't use port number 0 in tcp address\n"); goto tcp_open_fail; } - address = hostport; - if (strlen(address) > 0) { - if (this->server->create(port, address) == -1) { - etiLog.log(error, "Can't create Tcp server on %s:%i " - "(%s: %s) -> aborting\n", - address, port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } else { - if (this->server->create(port) == -1) { - etiLog.log(error, "Can't create Tcp server on :%i " - "(%s: %s) -> aborting\n", - port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } + addr = hostport; + free(hostport); + return true; - //sprintf(name, "%s:%i", this->packet_->getAddress().getHostAddress(), - // this->packet_->getAddress().getPort()); +tcp_open_fail: + free(hostport); + return false; +} - if (this->server->listen() == -1) { - etiLog.log(error, "Can't listen on Tcp socket (%s: %s)\n", - inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } -#ifdef _WIN32 - this->thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)tcpThread, this, 0, NULL); - if (this->thread_ == NULL) { - fprintf(stderr, "Can't create TCP child"); - goto tcp_open_fail; +int DabOutputTcp::Open(const char* name) +{ + long port = 0; + string address; + bool success = parse_uri(name, &port, address); + + if (success) { + dispatcher_ = new TCPDataDispatcher(); + dispatcher_->start(port, address); } -#else - if (pthread_create(&this->thread_, NULL, tcpThread, this)) { - perror("Can't create TCP child"); - goto tcp_open_fail; + else { + stringstream ss; + ss << "Could not parse TCP output address " << name; + throw std::runtime_error(ss.str()); } -#endif - return 0; - -tcp_open_fail: - free(hostport); - return -1; } int DabOutputTcp::Write(void* buffer, int size) { + vec_u8 data(6144); + uint8_t* buffer_u8 = (uint8_t*)buffer; - if (this->client != NULL) { - if (this->client->write(&size, 2) == 2) { - if (this->client->write(buffer, size) != size) { - return size; - } - } - else { - etiLog.log(info, "TCP server client disconnected.\n"); - delete this->client; - this->client = NULL; - } - } + std::copy(buffer_u8, buffer_u8 + size, data.begin()); + + // Pad to 6144 bytes + std::fill(data.begin() + size, data.end(), 0x55); + + dispatcher_->Write(data); return size; } int DabOutputTcp::Close() { - this->server->close(); - if( this->client != NULL ) - this->client->close(); -#ifdef WIN32 - DWORD status; - for (int i = 0; i < 5; ++i) { - if (GetExitCodeThread(this->thread_, &status)) { - break; - } - Sleep(100); - } - TerminateThread(this->thread_, 1); -#else - pthread_kill(this->thread_, SIGPIPE); -#endif return 0; } diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index 433ef8f..9d3ea84 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org UDP output */ @@ -79,12 +81,6 @@ int DabOutputUdp::Open(const char* name) this->packet_->getAddress().setPort(port); - if (this->socket_->create() == -1) { - etiLog.level(error) << "can't create UDP socket (" << - inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string query_params = what[3]; smatch query_what; if (regex_match(query_params, query_what, re_query, match_default)) { @@ -132,7 +128,7 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setLength(0); + this->packet_->setSize(0); this->packet_->addData(buffer, size); return this->socket_->send(*this->packet_); } diff --git a/src/utils.cpp b/src/utils.cpp index 7a20c43..54f19b3 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -107,9 +107,6 @@ void header_message() #if defined(HAVE_INPUT_TEST) " test" << #endif -#if defined(HAVE_INPUT_SLIP) - " slip" << -#endif #if defined(HAVE_INPUT_UDP) " udp" << #endif -- cgit v1.2.3