From 9248c1d7976ba1c37e3df147a1eb3115fe72c8d0 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 11 Sep 2016 22:15:35 +0200 Subject: Drop SLIP, Refactor sockets, improve TCP output Quite a large refactoring of the sockets, TCP and UDP, in order to improve the ETI-over-TCP output. This can now accept several simultaneous connections, and requires a throttle. The SLIP input is gone. The UDP inputs are currently broken. --- src/dabOutput/dabOutput.h | 84 ++++---------- src/dabOutput/dabOutputTcp.cpp | 250 ++++++++++++++++++++++++----------------- src/dabOutput/dabOutputUdp.cpp | 14 +-- 3 files changed, 175 insertions(+), 173 deletions(-) (limited to 'src/dabOutput') diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index d68cd9c..4a528bd 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -2,13 +2,13 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li - http://opendigitalradio.org + http://www.opendigitalradio.org An object-oriented version of the output channels. - */ +*/ /* This file is part of ODR-DabMux. @@ -30,7 +30,6 @@ #define __DAB_OUTPUT_H #include "UdpSocket.h" -#include "TcpServer.h" #include "Log.h" #include "string.h" #include @@ -39,20 +38,11 @@ #include #include -#ifdef _WIN32 -# include -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include -# endif -# include -#else -# include -# include -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif +#include +#include +#ifndef O_BINARY +# define O_BINARY 0 +#endif // O_BINARY #ifdef HAVE_OUTPUT_ZEROMQ # include "zmq.hpp" #endif @@ -168,21 +158,15 @@ class DabOutputRaw : public DabOutput public: DabOutputRaw() { -#ifdef _WIN32 - socket_ = INVALID_HANDLE_VALUE; -#else socket_ = -1; isCyclades_ = false; -#endif buffer_ = new unsigned char[6144]; } DabOutputRaw(const DabOutputRaw& other) { socket_ = other.socket_; -#ifndef _WIN32 isCyclades_ = other.isCyclades_; -#endif buffer_ = new unsigned char[6144]; memcpy(buffer_, other.buffer_, 6144); } @@ -202,12 +186,8 @@ class DabOutputRaw : public DabOutput } private: std::string filename_; -#ifdef _WIN32 - HANDLE socket_; -#else int socket_; bool isCyclades_; -#endif unsigned char* buffer_; }; @@ -216,17 +196,10 @@ class DabOutputUdp : public DabOutput { public: DabOutputUdp() { - UdpSocket::init(); packet_ = new UdpPacket(6144); socket_ = new UdpSocket(); } - // make sure we don't copy this output around - // the UdpPacket and UdpSocket do not support - // copying either - DabOutputUdp(const DabOutputUdp& other); - DabOutputUdp operator=(const DabOutputUdp& other); - ~DabOutputUdp() { delete socket_; delete packet_; @@ -240,34 +213,26 @@ class DabOutputUdp : public DabOutput return "udp://" + uri_; } private: + // make sure we don't copy this output around + // the UdpPacket and UdpSocket do not support + // copying either + DabOutputUdp(const DabOutputUdp& other) = delete; + DabOutputUdp operator=(const DabOutputUdp& other) = delete; + std::string uri_; UdpSocket* socket_; UdpPacket* packet_; }; // -------------- TCP ------------------ +class TCPDataDispatcher; class DabOutputTcp : public DabOutput { public: - DabOutputTcp() - { - TcpSocket::init(); - server = new TcpServer(); - client = NULL; - } - - DabOutputTcp(const DabOutputTcp& other); - DabOutputTcp operator=(const DabOutputTcp& other); - - ~DabOutputTcp() { - -#ifdef _WIN32 - CloseHandle(this->thread_); -#endif - - delete this->server; - delete this->client; - } + DabOutputTcp() {} + DabOutputTcp(const DabOutputTcp& other) = delete; + const DabOutputTcp& operator=(const DabOutputTcp& other) = delete; + ~DabOutputTcp(); int Open(const char* name); int Write(void* buffer, int size); @@ -277,11 +242,10 @@ class DabOutputTcp : public DabOutput return "tcp://" + uri_; } - TcpServer* server; - TcpSocket* client; private: std::string uri_; - pthread_t thread_; + + TCPDataDispatcher* dispatcher_; }; // -------------- Simul ------------------ @@ -306,11 +270,7 @@ class DabOutputSimul : public DabOutput } private: std::string name_; -#ifdef _WIN32 - DWORD startTime_; -#else std::chrono::steady_clock::time_point startTime_; -#endif }; #if defined(HAVE_OUTPUT_ZEROMQ) diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 5f1943d..a8ae1bc 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org TCP output */ @@ -28,48 +30,129 @@ #include #include #include "dabOutput.h" -#include "TcpServer.h" - -#ifdef _WIN32 -# include -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include -# endif -# include -#else -# include -# include -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif - -void* tcpThread(void* param) +#include +#include +#include +#include +#include +#include +#include "ThreadsafeQueue.h" +#include "TcpSocket.h" + +using namespace std; + +using vec_u8 = std::vector; + +// In ETI one element would be an ETI frame of 6144 bytes. +// 250 frames correspond to 6 seconds. This is mostly here +// to ensure we do not accumulate data for faulty sockets, delay +// management has to be done on the receiver end. +const size_t MAX_QUEUED_ELEMS = 250; + +class TCPConnection { - TcpSocket* client; + public: + TCPConnection(TcpSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) { + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); + m_sender_thread = boost::thread(&TCPConnection::process, this, 0); + } + + ~TCPConnection() { + m_running = false; + queue.notify(); + m_sender_thread.join(); + } - DabOutputTcp* tcp = (DabOutputTcp*)param; + ThreadsafeQueue queue; - while ((client = tcp->server->accept()) != NULL) { - etiLog.log(info, "TCP server got a new client.\n"); - if (tcp->client != NULL) { - delete tcp->client; + bool is_overloaded(void) const { + return queue.size() > MAX_QUEUED_ELEMS; } - tcp->client = client; - } - etiLog.log(error, "TCP thread can't accept new client (%s)\n", - inetErrDesc, inetErrMsg); - return NULL; + private: + TCPConnection(const TCPConnection& other) = delete; + TCPConnection& operator=(const TCPConnection& other) = delete; + + atomic m_running; + boost::thread m_sender_thread; + TcpSocket m_sock; + + void process(long) { + while (m_running) { + vec_u8 data; + queue.wait_and_pop(data); + + try { + m_sock.send(&data[0], data.size()); + } + catch (std::runtime_error& e) { + m_running = false; + } + } + } +}; + +class TCPDataDispatcher +{ + public: + ~TCPDataDispatcher() { + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); + } + + void start(int port, const string& address) { + TcpSocket sock(port, address); + m_listener_socket = move(sock); + + m_running = true; + m_listener_thread = boost::thread(&TCPDataDispatcher::process, this, 0); + } + + void Write(const vec_u8& data) { + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if([](TCPConnection& conn){ return conn.is_overloaded(); }); + } + + private: + void process(long) { + m_listener_socket.listen(); + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + m_connections.emplace(m_connections.begin(), m_listener_socket.accept()); + } + } + + atomic m_running; + boost::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list m_connections; +}; + +DabOutputTcp::~DabOutputTcp() +{ + if (dispatcher_) { + delete dispatcher_; + dispatcher_ = nullptr; + } } -int DabOutputTcp::Open(const char* name) +static bool parse_uri(const char *uri, long *port, string& addr) { - char* hostport = strdup(name); // the name is actually an tuple host:port + char* const hostport = strdup(uri); // the uri is actually an tuple host:port char* address; - long port; address = strchr((char*)hostport, ':'); if (address == NULL) { etiLog.log(error, @@ -82,98 +165,61 @@ int DabOutputTcp::Open(const char* name) // terminate string hostport after the host, and advance address to the port number *(address++) = 0; - port = strtol(address, (char **)NULL, 10); - if ((port == LONG_MIN) || (port == LONG_MAX)) { + *port = strtol(address, (char **)NULL, 10); + if ((*port == LONG_MIN) || (*port == LONG_MAX)) { etiLog.log(error, "can't convert port number in tcp address %s\n", address); goto tcp_open_fail; } - if (port == 0) { + if (*port == 0) { etiLog.log(error, "can't use port number 0 in tcp address\n"); goto tcp_open_fail; } - address = hostport; - if (strlen(address) > 0) { - if (this->server->create(port, address) == -1) { - etiLog.log(error, "Can't create Tcp server on %s:%i " - "(%s: %s) -> aborting\n", - address, port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } else { - if (this->server->create(port) == -1) { - etiLog.log(error, "Can't create Tcp server on :%i " - "(%s: %s) -> aborting\n", - port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } + addr = hostport; + free(hostport); + return true; - //sprintf(name, "%s:%i", this->packet_->getAddress().getHostAddress(), - // this->packet_->getAddress().getPort()); +tcp_open_fail: + free(hostport); + return false; +} - if (this->server->listen() == -1) { - etiLog.log(error, "Can't listen on Tcp socket (%s: %s)\n", - inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } -#ifdef _WIN32 - this->thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)tcpThread, this, 0, NULL); - if (this->thread_ == NULL) { - fprintf(stderr, "Can't create TCP child"); - goto tcp_open_fail; +int DabOutputTcp::Open(const char* name) +{ + long port = 0; + string address; + bool success = parse_uri(name, &port, address); + + if (success) { + dispatcher_ = new TCPDataDispatcher(); + dispatcher_->start(port, address); } -#else - if (pthread_create(&this->thread_, NULL, tcpThread, this)) { - perror("Can't create TCP child"); - goto tcp_open_fail; + else { + stringstream ss; + ss << "Could not parse TCP output address " << name; + throw std::runtime_error(ss.str()); } -#endif - return 0; - -tcp_open_fail: - free(hostport); - return -1; } int DabOutputTcp::Write(void* buffer, int size) { + vec_u8 data(6144); + uint8_t* buffer_u8 = (uint8_t*)buffer; - if (this->client != NULL) { - if (this->client->write(&size, 2) == 2) { - if (this->client->write(buffer, size) != size) { - return size; - } - } - else { - etiLog.log(info, "TCP server client disconnected.\n"); - delete this->client; - this->client = NULL; - } - } + std::copy(buffer_u8, buffer_u8 + size, data.begin()); + + // Pad to 6144 bytes + std::fill(data.begin() + size, data.end(), 0x55); + + dispatcher_->Write(data); return size; } int DabOutputTcp::Close() { - this->server->close(); - if( this->client != NULL ) - this->client->close(); -#ifdef WIN32 - DWORD status; - for (int i = 0; i < 5; ++i) { - if (GetExitCodeThread(this->thread_, &status)) { - break; - } - Sleep(100); - } - TerminateThread(this->thread_, 1); -#else - pthread_kill(this->thread_, SIGPIPE); -#endif return 0; } diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index 433ef8f..9d3ea84 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org UDP output */ @@ -79,12 +81,6 @@ int DabOutputUdp::Open(const char* name) this->packet_->getAddress().setPort(port); - if (this->socket_->create() == -1) { - etiLog.level(error) << "can't create UDP socket (" << - inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string query_params = what[3]; smatch query_what; if (regex_match(query_params, query_what, re_query, match_default)) { @@ -132,7 +128,7 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setLength(0); + this->packet_->setSize(0); this->packet_->addData(buffer, size); return this->socket_->send(*this->packet_); } -- cgit v1.2.3