diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 11:45:08 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 11:45:08 +0200 | 
| commit | b74b895dd44f5a76d581b8dec65dbf76dd5cece9 (patch) | |
| tree | d0420ab64fc3b89ad0a3bd2057df417fa5a987fa | |
| parent | 4b77ed3e2ae60ac5577a850b4c9c7803f01069f2 (diff) | |
| download | dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.gz dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.bz2 dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.zip  | |
Move TCPDataDispatcher into TcpSocket
| -rw-r--r-- | src/TcpSocket.cpp | 130 | ||||
| -rw-r--r-- | src/TcpSocket.h | 57 | ||||
| -rw-r--r-- | src/dabOutput/dabOutput.h | 2 | ||||
| -rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 129 | 
4 files changed, 180 insertions, 138 deletions
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 76e9c2e..c05eace 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -2,7 +2,7 @@     Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -27,15 +27,19 @@  #include "TcpSocket.h"  #include "Log.h"  #include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> +#include <cstdio> +#include <cstring> +#include <cstdint>  #include <signal.h> -#include <stdint.h> +#include <errno.h>  #include <poll.h> +#include <thread>  using namespace std; +using vec_u8 = std::vector<uint8_t>; + +  TcpSocket::TcpSocket() :      m_sock(INVALID_SOCKET)  { @@ -147,7 +151,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)          fds[0].fd = m_sock;          fds[0].events = POLLOUT; -        int retval = poll(fds, 1, timeout_ms); +        const int retval = poll(fds, 1, timeout_ms);          if (retval == -1) {              stringstream ss; @@ -168,7 +172,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)  #else      const int flags = 0;  #endif -    ssize_t ret = ::send(m_sock, (const char*)data, size, flags); +    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);      if (ret == SOCKET_ERROR) {          stringstream ss; @@ -236,3 +240,115 @@ InetAddress TcpSocket::getRemoteAddress() const  {      return m_remote_address;  } + + +TCPConnection::TCPConnection(TcpSocket&& sock) : +            queue(), +            m_running(true), +            m_sender_thread(), +            m_sock(move(sock)) +{ +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "New TCP Connection from " << +        addr.getHostAddress() << ":" << addr.getPort(); +    m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ +    m_running = false; +    vec_u8 termination_marker; +    queue.push(termination_marker); +    m_sender_thread.join(); +} + +void TCPConnection::process() +{ +    while (m_running) { +        vec_u8 data; +        queue.wait_and_pop(data); + +        if (data.empty()) { +            // empty vector is the termination marker +            m_running = false; +            break; +        } + +        try { +            ssize_t remaining = data.size(); +            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); +            const int timeout_ms = 10; // Less than one ETI frame + +            while (m_running and remaining > 0) { +                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); +                if (sent < 0 or sent > remaining) { +                    throw std::logic_error("Invalid TcpSocket::send() return value"); +                } +                remaining -= sent; +                buf += sent; +            } +        } +        catch (const std::runtime_error& e) { +            m_running = false; +        } +    } + +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "Dropping TCP Connection from " << +        addr.getHostAddress() << ":" << addr.getPort(); +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : +    m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ +    m_running = false; +    m_connections.clear(); +    m_listener_socket.close(); +    m_listener_thread.join(); +} + +void TCPDataDispatcher::start(int port, const string& address) +{ +    TcpSocket sock(port, address); +    m_listener_socket = move(sock); + +    m_running = true; +    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vec_u8& data) +{ +    for (auto& connection : m_connections) { +        connection.queue.push(data); +    } + +    m_connections.remove_if( +            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ +    try { +        m_listener_socket.listen(); + +        const int timeout_ms = 1000; + +        while (m_running) { +            // Add a new TCPConnection to the list, constructing it from the client socket +            auto sock = m_listener_socket.accept(timeout_ms); +            if (sock.isValid()) { +                m_connections.emplace(m_connections.begin(), move(sock)); +            } +        } +    } +    catch (const std::runtime_error& e) { +        etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); +        m_running = false; +    } +} + diff --git a/src/TcpSocket.h b/src/TcpSocket.h index 9ff09e5..ec7afd3 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -2,7 +2,7 @@     Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -32,6 +32,7 @@  #endif  #include "InetAddress.h" +#include "ThreadsafeQueue.h"  #include <sys/socket.h>  #include <netinet/in.h>  #include <unistd.h> @@ -45,8 +46,11 @@  #include <iostream>  #include <string> - +#include <vector>  #include <memory> +#include <atomic> +#include <thread> +#include <list>  /**   *  This class represents a TCP socket. @@ -77,8 +81,8 @@ class TcpSocket          /** Send data over the TCP connection.           *  @param data The buffer that will be sent.           *  @param size Number of bytes to send. -         *  @param timeout_ms number of milliseconds before timeout -         *  return number of bytes sent or -1 if error +         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout +         *  return number of bytes sent, 0 on timeout, or throws runtime_error.           */          ssize_t send(const void* data, size_t size, int timeout_ms=0); @@ -112,4 +116,49 @@ class TcpSocket          SOCKET m_sock;  }; +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ +    public: +        TCPConnection(TcpSocket&& sock); +        TCPConnection(const TCPConnection&) = delete; +        TCPConnection& operator=(const TCPConnection&) = delete; +        ~TCPConnection(); + +        ThreadsafeQueue<std::vector<uint8_t> > queue; + +    private: +        std::atomic<bool> m_running; +        std::thread m_sender_thread; +        TcpSocket m_sock; + +        void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ +    public: +        TCPDataDispatcher(size_t max_queue_size); +        ~TCPDataDispatcher(); +        TCPDataDispatcher(const TCPDataDispatcher&) = delete; +        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + +        void start(int port, const std::string& address); +        void write(const std::vector<uint8_t>& data); + +    private: +        void process(void); + +        size_t m_max_queue_size; + +        std::atomic<bool> m_running; +        std::thread m_listener_thread; +        TcpSocket m_listener_socket; +        std::list<TCPConnection> m_connections; +}; +  #endif // _TCPSOCKET diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 677fffc..e5a8a94 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -29,6 +29,7 @@  #pragma once  #include "UdpSocket.h" +#include "TcpSocket.h"  #include "Log.h"  #include "string.h"  #include <stdexcept> @@ -203,7 +204,6 @@ class DabOutputUdp : public DabOutput  };  // -------------- TCP ------------------ -class TCPDataDispatcher;  class DabOutputTcp : public DabOutput  {      public: diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 7fb17ca..87dbfd5 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -37,7 +37,6 @@  #include <atomic>  #include <thread>  #include "ThreadsafeQueue.h" -#include "TcpSocket.h"  using namespace std; @@ -47,129 +46,7 @@ using vec_u8 = std::vector<uint8_t>;  // 250 frames correspond to 6 seconds. This is mostly here  // to ensure we do not accumulate data for faulty sockets, delay  // management has to be done on the receiver end. -const size_t MAX_QUEUED_ELEMS = 250; - -class TCPConnection -{ -    public: -        TCPConnection(TcpSocket&& sock) : -            queue(), -            m_running(true), -            m_sender_thread(), -            m_sock(move(sock)) { -                auto addr = m_sock.getRemoteAddress(); -                etiLog.level(debug) << "New TCP Connection from " << -                    addr.getHostAddress() << ":" << addr.getPort(); -                m_sender_thread = std::thread(&TCPConnection::process, this, 0); -            } - -        ~TCPConnection() { -            m_running = false; -            vec_u8 termination_marker; -            queue.push(termination_marker); -            m_sender_thread.join(); -        } - -        ThreadsafeQueue<vec_u8> queue; - -        bool is_overloaded(void) const { -            return queue.size() > MAX_QUEUED_ELEMS; -        } - -    private: -        TCPConnection(const TCPConnection& other) = delete; -        TCPConnection& operator=(const TCPConnection& other) = delete; - -        atomic<bool> m_running; -        std::thread m_sender_thread; -        TcpSocket m_sock; - -        void process(long) { -            while (m_running) { -                vec_u8 data; -                queue.wait_and_pop(data); - -                if (data.empty()) { -                    // empty vector is the termination marker -                    break; -                } - -                try { -                    ssize_t sent = 0; -                    do { -                        const int timeout_ms = 10; // Less than one ETI frame -                        sent = m_sock.send(&data[0], data.size(), timeout_ms); - -                        if (is_overloaded()) { -                            m_running = false; -                            break; -                        } -                    } -                    while (sent == 0); -                } -                catch (const std::runtime_error& e) { -                    m_running = false; -                } -            } - -            auto addr = m_sock.getRemoteAddress(); -            etiLog.level(debug) << "Dropping TCP Connection from " << -                addr.getHostAddress() << ":" << addr.getPort(); -        } -}; - -class TCPDataDispatcher -{ -    public: -        ~TCPDataDispatcher() { -            m_running = false; -            m_connections.clear(); -            m_listener_socket.close(); -            m_listener_thread.join(); -        } - -        void start(int port, const string& address) { -            TcpSocket sock(port, address); -            m_listener_socket = move(sock); - -            m_running = true; -            m_listener_thread = std::thread(&TCPDataDispatcher::process, this, 0); -        } - -        void Write(const vec_u8& data) { -            for (auto& connection : m_connections) { -                connection.queue.push(data); -            } - -            m_connections.remove_if([](const TCPConnection& conn){ return conn.is_overloaded(); }); -        } - -    private: -        void process(long) { -            try { -                m_listener_socket.listen(); - -                const int timeout_ms = 1000; - -                while (m_running) { -                    // Add a new TCPConnection to the list, constructing it from the client socket -                    auto sock = m_listener_socket.accept(timeout_ms); -                    if (sock.isValid()) { -                        m_connections.emplace(m_connections.begin(), move(sock)); -                    } -                } -            } -            catch (const std::runtime_error& e) { -                etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); -                m_running = false; -            } -        } - -        atomic<bool> m_running; -        std::thread m_listener_thread; -        TcpSocket m_listener_socket; -        std::list<TCPConnection> m_connections; -}; +const size_t MAX_QUEUED_ETI_FRAMES = 250;  static bool parse_uri(const char *uri, long *port, string& addr)  { @@ -217,7 +94,7 @@ int DabOutputTcp::Open(const char* name)      uri_ = name;      if (success) { -        dispatcher_ = make_shared<TCPDataDispatcher>(); +        dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);          dispatcher_->start(port, address);      }      else { @@ -237,7 +114,7 @@ int DabOutputTcp::Write(void* buffer, int size)      // Pad to 6144 bytes      std::fill(data.begin() + size, data.end(), 0x55); -    dispatcher_->Write(data); +    dispatcher_->write(data);      return size;  }  | 
