summaryrefslogtreecommitdiffstats
path: root/src/TcpSocket.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/TcpSocket.h')
-rw-r--r--src/TcpSocket.h57
1 files changed, 53 insertions, 4 deletions
diff --git a/src/TcpSocket.h b/src/TcpSocket.h
index 9ff09e5..ec7afd3 100644
--- a/src/TcpSocket.h
+++ b/src/TcpSocket.h
@@ -2,7 +2,7 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -32,6 +32,7 @@
#endif
#include "InetAddress.h"
+#include "ThreadsafeQueue.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
@@ -45,8 +46,11 @@
#include <iostream>
#include <string>
-
+#include <vector>
#include <memory>
+#include <atomic>
+#include <thread>
+#include <list>
/**
* This class represents a TCP socket.
@@ -77,8 +81,8 @@ class TcpSocket
/** Send data over the TCP connection.
* @param data The buffer that will be sent.
* @param size Number of bytes to send.
- * @param timeout_ms number of milliseconds before timeout
- * return number of bytes sent or -1 if error
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
@@ -112,4 +116,49 @@ class TcpSocket
SOCKET m_sock;
};
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TcpSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TcpSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process(void);
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::thread m_listener_thread;
+ TcpSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
#endif // _TCPSOCKET