diff options
Diffstat (limited to 'src/dabOutput/dabOutputTcp.cpp')
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 257 |
1 files changed, 155 insertions, 102 deletions
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 5f1943d..9a84937 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -2,8 +2,10 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013 Matthias P. Braendli - http://mpb.li + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org TCP output */ @@ -28,48 +30,129 @@ #include <signal.h> #include <limits.h> #include "dabOutput.h" -#include "TcpServer.h" - -#ifdef _WIN32 -# include <io.h> -# ifdef __MINGW32__ -# define FS_DECLARE_CFG_ARRAYS -# include <winioctl.h> -# endif -# include <sdci.h> -#else -# include <unistd.h> -# include <sys/time.h> -# ifndef O_BINARY -# define O_BINARY 0 -# endif // O_BINARY -#endif - -void* tcpThread(void* param) +#include <unistd.h> +#include <sys/time.h> +#include <list> +#include <vector> +#include <atomic> +#include <boost/thread.hpp> +#include "ThreadsafeQueue.h" +#include "TcpSocket.h" + +using namespace std; + +using vec_u8 = std::vector<uint8_t>; + +// In ETI one element would be an ETI frame of 6144 bytes. +// 250 frames correspond to 6 seconds. This is mostly here +// to ensure we do not accumulate data for faulty sockets, delay +// management has to be done on the receiver end. +const size_t MAX_QUEUED_ELEMS = 250; + +class TCPConnection { - TcpSocket* client; + public: + TCPConnection(TcpSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) { + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); + m_sender_thread = boost::thread(&TCPConnection::process, this, 0); + } + + ~TCPConnection() { + m_running = false; + queue.notify(); + m_sender_thread.join(); + } - DabOutputTcp* tcp = (DabOutputTcp*)param; + ThreadsafeQueue<vec_u8> queue; - while ((client = tcp->server->accept()) != NULL) { - etiLog.log(info, "TCP server got a new client.\n"); - if (tcp->client != NULL) { - delete tcp->client; + bool is_overloaded(void) const { + return queue.size() > MAX_QUEUED_ELEMS; + } + + private: + TCPConnection(const TCPConnection& other) = delete; + TCPConnection& operator=(const TCPConnection& other) = delete; + + atomic<bool> m_running; + boost::thread m_sender_thread; + TcpSocket m_sock; + + void process(long) { + while (m_running) { + vec_u8 data; + queue.wait_and_pop(data); + + try { + m_sock.send(&data[0], data.size()); + } + catch (std::runtime_error& e) { + m_running = false; + } + } + } +}; + +class TCPDataDispatcher +{ + public: + ~TCPDataDispatcher() { + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); + } + + void start(int port, const string& address) { + TcpSocket sock(port, address); + m_listener_socket = move(sock); + + m_running = true; + m_listener_thread = boost::thread(&TCPDataDispatcher::process, this, 0); } - tcp->client = client; - } - etiLog.log(error, "TCP thread can't accept new client (%s)\n", - inetErrDesc, inetErrMsg); - return NULL; + void Write(const vec_u8& data) { + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if([](TCPConnection& conn){ return conn.is_overloaded(); }); + } + + private: + void process(long) { + m_listener_socket.listen(); + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + m_connections.emplace(m_connections.begin(), m_listener_socket.accept()); + } + } + + atomic<bool> m_running; + boost::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list<TCPConnection> m_connections; +}; + +DabOutputTcp::~DabOutputTcp() +{ + if (dispatcher_) { + delete dispatcher_; + dispatcher_ = nullptr; + } } -int DabOutputTcp::Open(const char* name) +static bool parse_uri(const char *uri, long *port, string& addr) { - char* hostport = strdup(name); // the name is actually an tuple host:port + char* const hostport = strdup(uri); // the uri is actually an tuple host:port char* address; - long port; address = strchr((char*)hostport, ':'); if (address == NULL) { etiLog.log(error, @@ -82,98 +165,68 @@ int DabOutputTcp::Open(const char* name) // terminate string hostport after the host, and advance address to the port number *(address++) = 0; - port = strtol(address, (char **)NULL, 10); - if ((port == LONG_MIN) || (port == LONG_MAX)) { + *port = strtol(address, (char **)NULL, 10); + if ((*port == LONG_MIN) || (*port == LONG_MAX)) { etiLog.log(error, "can't convert port number in tcp address %s\n", address); goto tcp_open_fail; } - if (port == 0) { + if (*port == 0) { etiLog.log(error, "can't use port number 0 in tcp address\n"); goto tcp_open_fail; } - address = hostport; - if (strlen(address) > 0) { - if (this->server->create(port, address) == -1) { - etiLog.log(error, "Can't create Tcp server on %s:%i " - "(%s: %s) -> aborting\n", - address, port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } else { - if (this->server->create(port) == -1) { - etiLog.log(error, "Can't create Tcp server on :%i " - "(%s: %s) -> aborting\n", - port, inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } - } + addr = hostport; + free(hostport); + return true; - //sprintf(name, "%s:%i", this->packet_->getAddress().getHostAddress(), - // this->packet_->getAddress().getPort()); +tcp_open_fail: + free(hostport); + return false; +} - if (this->server->listen() == -1) { - etiLog.log(error, "Can't listen on Tcp socket (%s: %s)\n", - inetErrDesc, inetErrMsg); - goto tcp_open_fail; - } -#ifdef _WIN32 - this->thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)tcpThread, this, 0, NULL); - if (this->thread_ == NULL) { - fprintf(stderr, "Can't create TCP child"); - goto tcp_open_fail; +int DabOutputTcp::Open(const char* name) +{ + long port = 0; + string address; + bool success = parse_uri(name, &port, address); + + if (success) { + dispatcher_ = new TCPDataDispatcher(); + try { + dispatcher_->start(port, address); + } + catch (std::runtime_error& e) { + stringstream ss; + ss << "Caught error during socket open of TCP output " << name; + throw e; + } } -#else - if (pthread_create(&this->thread_, NULL, tcpThread, this)) { - perror("Can't create TCP child"); - goto tcp_open_fail; + else { + stringstream ss; + ss << "Could not parse TCP output address " << name; + throw std::runtime_error(ss.str()); } -#endif - return 0; - -tcp_open_fail: - free(hostport); - return -1; } int DabOutputTcp::Write(void* buffer, int size) { + vec_u8 data(6144); + uint8_t* buffer_u8 = (uint8_t*)buffer; - if (this->client != NULL) { - if (this->client->write(&size, 2) == 2) { - if (this->client->write(buffer, size) != size) { - return size; - } - } - else { - etiLog.log(info, "TCP server client disconnected.\n"); - delete this->client; - this->client = NULL; - } - } + std::copy(buffer_u8, buffer_u8 + size, data.begin()); + + // Pad to 6144 bytes + std::fill(data.begin() + size, data.end(), 0x55); + + dispatcher_->Write(data); return size; } int DabOutputTcp::Close() { - this->server->close(); - if( this->client != NULL ) - this->client->close(); -#ifdef WIN32 - DWORD status; - for (int i = 0; i < 5; ++i) { - if (GetExitCodeThread(this->thread_, &status)) { - break; - } - Sleep(100); - } - TerminateThread(this->thread_, 1); -#else - pthread_kill(this->thread_, SIGPIPE); -#endif return 0; } |