diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 11:45:08 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 11:45:08 +0200 |
commit | b74b895dd44f5a76d581b8dec65dbf76dd5cece9 (patch) | |
tree | d0420ab64fc3b89ad0a3bd2057df417fa5a987fa /src/TcpSocket.h | |
parent | 4b77ed3e2ae60ac5577a850b4c9c7803f01069f2 (diff) | |
download | dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.gz dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.bz2 dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.zip |
Move TCPDataDispatcher into TcpSocket
Diffstat (limited to 'src/TcpSocket.h')
-rw-r--r-- | src/TcpSocket.h | 57 |
1 files changed, 53 insertions, 4 deletions
diff --git a/src/TcpSocket.h b/src/TcpSocket.h index 9ff09e5..ec7afd3 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -2,7 +2,7 @@ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -32,6 +32,7 @@ #endif #include "InetAddress.h" +#include "ThreadsafeQueue.h" #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> @@ -45,8 +46,11 @@ #include <iostream> #include <string> - +#include <vector> #include <memory> +#include <atomic> +#include <thread> +#include <list> /** * This class represents a TCP socket. @@ -77,8 +81,8 @@ class TcpSocket /** Send data over the TCP connection. * @param data The buffer that will be sent. * @param size Number of bytes to send. - * @param timeout_ms number of milliseconds before timeout - * return number of bytes sent or -1 if error + * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout + * return number of bytes sent, 0 on timeout, or throws runtime_error. */ ssize_t send(const void* data, size_t size, int timeout_ms=0); @@ -112,4 +116,49 @@ class TcpSocket SOCKET m_sock; }; +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ + public: + TCPConnection(TcpSocket&& sock); + TCPConnection(const TCPConnection&) = delete; + TCPConnection& operator=(const TCPConnection&) = delete; + ~TCPConnection(); + + ThreadsafeQueue<std::vector<uint8_t> > queue; + + private: + std::atomic<bool> m_running; + std::thread m_sender_thread; + TcpSocket m_sock; + + void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ + public: + TCPDataDispatcher(size_t max_queue_size); + ~TCPDataDispatcher(); + TCPDataDispatcher(const TCPDataDispatcher&) = delete; + TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + + void start(int port, const std::string& address); + void write(const std::vector<uint8_t>& data); + + private: + void process(void); + + size_t m_max_queue_size; + + std::atomic<bool> m_running; + std::thread m_listener_thread; + TcpSocket m_listener_socket; + std::list<TCPConnection> m_connections; +}; + #endif // _TCPSOCKET |