diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 11:45:08 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 11:45:08 +0200 | 
| commit | b74b895dd44f5a76d581b8dec65dbf76dd5cece9 (patch) | |
| tree | d0420ab64fc3b89ad0a3bd2057df417fa5a987fa /src/TcpSocket.cpp | |
| parent | 4b77ed3e2ae60ac5577a850b4c9c7803f01069f2 (diff) | |
| download | dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.gz dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.bz2 dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.zip | |
Move TCPDataDispatcher into TcpSocket
Diffstat (limited to 'src/TcpSocket.cpp')
| -rw-r--r-- | src/TcpSocket.cpp | 130 | 
1 files changed, 123 insertions, 7 deletions
| diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 76e9c2e..c05eace 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -2,7 +2,7 @@     Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -27,15 +27,19 @@  #include "TcpSocket.h"  #include "Log.h"  #include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> +#include <cstdio> +#include <cstring> +#include <cstdint>  #include <signal.h> -#include <stdint.h> +#include <errno.h>  #include <poll.h> +#include <thread>  using namespace std; +using vec_u8 = std::vector<uint8_t>; + +  TcpSocket::TcpSocket() :      m_sock(INVALID_SOCKET)  { @@ -147,7 +151,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)          fds[0].fd = m_sock;          fds[0].events = POLLOUT; -        int retval = poll(fds, 1, timeout_ms); +        const int retval = poll(fds, 1, timeout_ms);          if (retval == -1) {              stringstream ss; @@ -168,7 +172,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)  #else      const int flags = 0;  #endif -    ssize_t ret = ::send(m_sock, (const char*)data, size, flags); +    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);      if (ret == SOCKET_ERROR) {          stringstream ss; @@ -236,3 +240,115 @@ InetAddress TcpSocket::getRemoteAddress() const  {      return m_remote_address;  } + + +TCPConnection::TCPConnection(TcpSocket&& sock) : +            queue(), +            m_running(true), +            m_sender_thread(), +            m_sock(move(sock)) +{ +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "New TCP Connection from " << +        addr.getHostAddress() << ":" << addr.getPort(); +    m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ +    m_running = false; +    vec_u8 termination_marker; +    queue.push(termination_marker); +    m_sender_thread.join(); +} + +void TCPConnection::process() +{ +    while (m_running) { +        vec_u8 data; +        queue.wait_and_pop(data); + +        if (data.empty()) { +            // empty vector is the termination marker +            m_running = false; +            break; +        } + +        try { +            ssize_t remaining = data.size(); +            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); +            const int timeout_ms = 10; // Less than one ETI frame + +            while (m_running and remaining > 0) { +                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); +                if (sent < 0 or sent > remaining) { +                    throw std::logic_error("Invalid TcpSocket::send() return value"); +                } +                remaining -= sent; +                buf += sent; +            } +        } +        catch (const std::runtime_error& e) { +            m_running = false; +        } +    } + +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "Dropping TCP Connection from " << +        addr.getHostAddress() << ":" << addr.getPort(); +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : +    m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ +    m_running = false; +    m_connections.clear(); +    m_listener_socket.close(); +    m_listener_thread.join(); +} + +void TCPDataDispatcher::start(int port, const string& address) +{ +    TcpSocket sock(port, address); +    m_listener_socket = move(sock); + +    m_running = true; +    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vec_u8& data) +{ +    for (auto& connection : m_connections) { +        connection.queue.push(data); +    } + +    m_connections.remove_if( +            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ +    try { +        m_listener_socket.listen(); + +        const int timeout_ms = 1000; + +        while (m_running) { +            // Add a new TCPConnection to the list, constructing it from the client socket +            auto sock = m_listener_socket.accept(timeout_ms); +            if (sock.isValid()) { +                m_connections.emplace(m_connections.begin(), move(sock)); +            } +        } +    } +    catch (const std::runtime_error& e) { +        etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); +        m_running = false; +    } +} + | 
