From b74b895dd44f5a76d581b8dec65dbf76dd5cece9 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 6 May 2019 11:45:08 +0200 Subject: Move TCPDataDispatcher into TcpSocket --- src/TcpSocket.cpp | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 123 insertions(+), 7 deletions(-) (limited to 'src/TcpSocket.cpp') diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 76e9c2e..c05eace 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -2,7 +2,7 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -27,15 +27,19 @@ #include "TcpSocket.h" #include "Log.h" #include -#include -#include -#include +#include +#include +#include #include -#include +#include #include +#include using namespace std; +using vec_u8 = std::vector; + + TcpSocket::TcpSocket() : m_sock(INVALID_SOCKET) { @@ -147,7 +151,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) fds[0].fd = m_sock; fds[0].events = POLLOUT; - int retval = poll(fds, 1, timeout_ms); + const int retval = poll(fds, 1, timeout_ms); if (retval == -1) { stringstream ss; @@ -168,7 +172,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) #else const int flags = 0; #endif - ssize_t ret = ::send(m_sock, (const char*)data, size, flags); + const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); if (ret == SOCKET_ERROR) { stringstream ss; @@ -236,3 +240,115 @@ InetAddress TcpSocket::getRemoteAddress() const { return m_remote_address; } + + +TCPConnection::TCPConnection(TcpSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) +{ + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); + m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ + m_running = false; + vec_u8 termination_marker; + queue.push(termination_marker); + m_sender_thread.join(); +} + +void TCPConnection::process() +{ + while (m_running) { + vec_u8 data; + queue.wait_and_pop(data); + + if (data.empty()) { + // empty vector is the termination marker + m_running = false; + break; + } + + try { + ssize_t remaining = data.size(); + const uint8_t *buf = reinterpret_cast(data.data()); + const int timeout_ms = 10; // Less than one ETI frame + + while (m_running and remaining > 0) { + const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); + if (sent < 0 or sent > remaining) { + throw std::logic_error("Invalid TcpSocket::send() return value"); + } + remaining -= sent; + buf += sent; + } + } + catch (const std::runtime_error& e) { + m_running = false; + } + } + + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "Dropping TCP Connection from " << + addr.getHostAddress() << ":" << addr.getPort(); +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : + m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); +} + +void TCPDataDispatcher::start(int port, const string& address) +{ + TcpSocket sock(port, address); + m_listener_socket = move(sock); + + m_running = true; + m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vec_u8& data) +{ + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if( + [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ + try { + m_listener_socket.listen(); + + const int timeout_ms = 1000; + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + auto sock = m_listener_socket.accept(timeout_ms); + if (sock.isValid()) { + m_connections.emplace(m_connections.begin(), move(sock)); + } + } + } + catch (const std::runtime_error& e) { + etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); + m_running = false; + } +} + -- cgit v1.2.3