diff options
-rw-r--r-- | src/TcpSocket.cpp | 130 | ||||
-rw-r--r-- | src/TcpSocket.h | 57 | ||||
-rw-r--r-- | src/dabOutput/dabOutput.h | 2 | ||||
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 129 |
4 files changed, 180 insertions, 138 deletions
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 76e9c2e..c05eace 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -2,7 +2,7 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -27,15 +27,19 @@ #include "TcpSocket.h" #include "Log.h" #include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> +#include <cstdio> +#include <cstring> +#include <cstdint> #include <signal.h> -#include <stdint.h> +#include <errno.h> #include <poll.h> +#include <thread> using namespace std; +using vec_u8 = std::vector<uint8_t>; + + TcpSocket::TcpSocket() : m_sock(INVALID_SOCKET) { @@ -147,7 +151,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) fds[0].fd = m_sock; fds[0].events = POLLOUT; - int retval = poll(fds, 1, timeout_ms); + const int retval = poll(fds, 1, timeout_ms); if (retval == -1) { stringstream ss; @@ -168,7 +172,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) #else const int flags = 0; #endif - ssize_t ret = ::send(m_sock, (const char*)data, size, flags); + const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); if (ret == SOCKET_ERROR) { stringstream ss; @@ -236,3 +240,115 @@ InetAddress TcpSocket::getRemoteAddress() const { return m_remote_address; } + + +TCPConnection::TCPConnection(TcpSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) +{ + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); + m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ + m_running = false; + vec_u8 termination_marker; + queue.push(termination_marker); + m_sender_thread.join(); +} + +void TCPConnection::process() +{ + while (m_running) { + vec_u8 data; + queue.wait_and_pop(data); + + if (data.empty()) { + // empty vector is the termination marker + m_running = false; + break; + } + + try { + ssize_t remaining = data.size(); + const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); + const int timeout_ms = 10; // Less than one ETI frame + + while (m_running and remaining > 0) { + const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); + if (sent < 0 or sent > remaining) { + throw std::logic_error("Invalid TcpSocket::send() return value"); + } + remaining -= sent; + buf += sent; + } + } + catch (const std::runtime_error& e) { + m_running = false; + } + } + + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "Dropping TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : + m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); +} + +void TCPDataDispatcher::start(int port, const string& address) +{ + TcpSocket sock(port, address); + m_listener_socket = move(sock); + + m_running = true; + m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vec_u8& data) +{ + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if( + [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ + try { + m_listener_socket.listen(); + + const int timeout_ms = 1000; + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + auto sock = m_listener_socket.accept(timeout_ms); + if (sock.isValid()) { + m_connections.emplace(m_connections.begin(), move(sock)); + } + } + } + catch (const std::runtime_error& e) { + etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); + m_running = false; + } +} + diff --git a/src/TcpSocket.h b/src/TcpSocket.h index 9ff09e5..ec7afd3 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -2,7 +2,7 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -32,6 +32,7 @@ #endif #include "InetAddress.h" +#include "ThreadsafeQueue.h" #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> @@ -45,8 +46,11 @@ #include <iostream> #include <string> - +#include <vector> #include <memory> +#include <atomic> +#include <thread> +#include <list> /** * This class represents a TCP socket. @@ -77,8 +81,8 @@ class TcpSocket /** Send data over the TCP connection. * @param data The buffer that will be sent. * @param size Number of bytes to send. - * @param timeout_ms number of milliseconds before timeout - * return number of bytes sent or -1 if error + * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout + * return number of bytes sent, 0 on timeout, or throws runtime_error. */ ssize_t send(const void* data, size_t size, int timeout_ms=0); @@ -112,4 +116,49 @@ class TcpSocket SOCKET m_sock; }; +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ + public: + TCPConnection(TcpSocket&& sock); + TCPConnection(const TCPConnection&) = delete; + TCPConnection& operator=(const TCPConnection&) = delete; + ~TCPConnection(); + + ThreadsafeQueue<std::vector<uint8_t> > queue; + + private: + std::atomic<bool> m_running; + std::thread m_sender_thread; + TcpSocket m_sock; + + void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ + public: + TCPDataDispatcher(size_t max_queue_size); + ~TCPDataDispatcher(); + TCPDataDispatcher(const TCPDataDispatcher&) = delete; + TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + + void start(int port, const std::string& address); + void write(const std::vector<uint8_t>& data); + + private: + void process(void); + + size_t m_max_queue_size; + + std::atomic<bool> m_running; + std::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list<TCPConnection> m_connections; +}; + #endif // _TCPSOCKET diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 677fffc..e5a8a94 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -29,6 +29,7 @@ #pragma once #include "UdpSocket.h" +#include "TcpSocket.h" #include "Log.h" #include "string.h" #include <stdexcept> @@ -203,7 +204,6 @@ class DabOutputUdp : public DabOutput }; // -------------- TCP ------------------ -class TCPDataDispatcher; class DabOutputTcp : public DabOutput { public: diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 7fb17ca..87dbfd5 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -37,7 +37,6 @@ #include <atomic> #include <thread> #include "ThreadsafeQueue.h" -#include "TcpSocket.h" using namespace std; @@ -47,129 +46,7 @@ using vec_u8 = std::vector<uint8_t>; // 250 frames correspond to 6 seconds. This is mostly here // to ensure we do not accumulate data for faulty sockets, delay // management has to be done on the receiver end. -const size_t MAX_QUEUED_ELEMS = 250; - -class TCPConnection -{ - public: - TCPConnection(TcpSocket&& sock) : - queue(), - m_running(true), - m_sender_thread(), - m_sock(move(sock)) { - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "New TCP Connection from " << - addr.getHostAddress() << ":" << addr.getPort(); - m_sender_thread = std::thread(&TCPConnection::process, this, 0); - } - - ~TCPConnection() { - m_running = false; - vec_u8 termination_marker; - queue.push(termination_marker); - m_sender_thread.join(); - } - - ThreadsafeQueue<vec_u8> queue; - - bool is_overloaded(void) const { - return queue.size() > MAX_QUEUED_ELEMS; - } - - private: - TCPConnection(const TCPConnection& other) = delete; - TCPConnection& operator=(const TCPConnection& other) = delete; - - atomic<bool> m_running; - std::thread m_sender_thread; - TcpSocket m_sock; - - void process(long) { - while (m_running) { - vec_u8 data; - queue.wait_and_pop(data); - - if (data.empty()) { - // empty vector is the termination marker - break; - } - - try { - ssize_t sent = 0; - do { - const int timeout_ms = 10; // Less than one ETI frame - sent = m_sock.send(&data[0], data.size(), timeout_ms); - - if (is_overloaded()) { - m_running = false; - break; - } - } - while (sent == 0); - } - catch (const std::runtime_error& e) { - m_running = false; - } - } - - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "Dropping TCP Connection from " << - addr.getHostAddress() << ":" << addr.getPort(); - } -}; - -class TCPDataDispatcher -{ - public: - ~TCPDataDispatcher() { - m_running = false; - m_connections.clear(); - m_listener_socket.close(); - m_listener_thread.join(); - } - - void start(int port, const string& address) { - TcpSocket sock(port, address); - m_listener_socket = move(sock); - - m_running = true; - m_listener_thread = std::thread(&TCPDataDispatcher::process, this, 0); - } - - void Write(const vec_u8& data) { - for (auto& connection : m_connections) { - connection.queue.push(data); - } - - m_connections.remove_if([](const TCPConnection& conn){ return conn.is_overloaded(); }); - } - - private: - void process(long) { - try { - m_listener_socket.listen(); - - const int timeout_ms = 1000; - - while (m_running) { - // Add a new TCPConnection to the list, constructing it from the client socket - auto sock = m_listener_socket.accept(timeout_ms); - if (sock.isValid()) { - m_connections.emplace(m_connections.begin(), move(sock)); - } - } - } - catch (const std::runtime_error& e) { - etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); - m_running = false; - } - } - - atomic<bool> m_running; - std::thread m_listener_thread; - TcpSocket m_listener_socket; - std::list<TCPConnection> m_connections; -}; +const size_t MAX_QUEUED_ETI_FRAMES = 250; static bool parse_uri(const char *uri, long *port, string& addr) { @@ -217,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared<TCPDataDispatcher>(); + dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); dispatcher_->start(port, address); } else { @@ -237,7 +114,7 @@ int DabOutputTcp::Write(void* buffer, int size) // Pad to 6144 bytes std::fill(data.begin() + size, data.end(), 0x55); - dispatcher_->Write(data); + dispatcher_->write(data); return size; } |