From d14814a92377084177753c7a60d83a9307ad0672 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 16 Jan 2021 08:06:09 +0100 Subject: Update common code to latest, update zmq.hpp and adapt --- lib/Socket.h | 65 ++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 26 deletions(-) (limited to 'lib/Socket.h') diff --git a/lib/Socket.h b/lib/Socket.h index 8881be3..33cdc05 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -30,11 +30,12 @@ #include "ThreadsafeQueue.h" #include -#include -#include #include -#include +#include #include +#include +#include +#include #include #include @@ -55,6 +56,8 @@ struct InetAddress { struct sockaddr *as_sockaddr() { return reinterpret_cast(&addr); }; void resolveUdpDestination(const std::string& destination, int port); + + std::string to_string() const; }; /** This class represents a UDP packet. @@ -102,6 +105,8 @@ class UDPSocket ~UDPSocket(); UDPSocket(const UDPSocket& other) = delete; const UDPSocket& operator=(const UDPSocket& other) = delete; + UDPSocket(UDPSocket&& other); + const UDPSocket& operator=(UDPSocket&& other); /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ void reinit(int port); @@ -120,36 +125,36 @@ class UDPSocket */ void setBlocking(bool block); + SOCKET getNativeSocket() const; + int getPort() const; + protected: - SOCKET m_sock; + SOCKET m_sock = INVALID_SOCKET; + int m_port = 0; }; -/* Threaded UDP receiver */ +/* UDP packet receiver supporting receiving from several ports at once */ class UDPReceiver { public: - UDPReceiver(); - ~UDPReceiver(); - UDPReceiver(const UDPReceiver&) = delete; - UDPReceiver operator=(const UDPReceiver&) = delete; + void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr); - // Start the receiver in a separate thread - void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + struct ReceivedPacket { + std::vector packetdata; + InetAddress received_from; + int port_received_on; + }; - // Get the data contained in a UDP packet, blocks if none available - // In case of error, throws a runtime_error - std::vector get_packet_buffer(void); + class Interrupted {}; + class Timeout {}; + /* Returns one or several packets, + * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error + * on error. */ + std::vector receive(int timeout_ms); private: void m_run(void); - int m_port = 0; - std::string m_bindto; - std::string m_mcastaddr; - size_t m_max_packets_queued = 1; - std::thread m_thread; - std::atomic m_stop = ATOMIC_VAR_INIT(false); - ThreadsafeQueue m_packets; - UDPSocket m_sock; + std::vector m_sockets; }; class TCPSocket { @@ -265,6 +270,14 @@ class TCPDataDispatcher std::list m_connections; }; +struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; }; +struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { }; +struct TCPReceiveMessageEmpty : public TCPReceiveMessage { }; +struct TCPReceiveMessageData : public TCPReceiveMessage { + TCPReceiveMessageData(std::vector d) : data(d) {}; + std::vector data; +}; + /* A TCP Server to receive data, which abstracts the handling of connects and disconnects. */ class TCPReceiveServer { @@ -276,15 +289,15 @@ class TCPReceiveServer { void start(int listen_port, const std::string& address); - // Return a vector that contains up to blocksize bytes of data, or - // and empty vector if no data is available. - std::vector receive(); + // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize + // bytes of data, or TCPReceiveMessageEmpty if no data is available. + std::shared_ptr receive(); private: void process(); size_t m_blocksize = 0; - ThreadsafeQueue > m_queue; + ThreadsafeQueue > m_queue; std::atomic m_running = ATOMIC_VAR_INIT(false); std::string m_exception_data; std::thread m_listener_thread; -- cgit v1.2.3