From 8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Jun 2019 10:14:51 +0200 Subject: Work on STI-D/EDI input --- lib/Socket.h | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) (limited to 'lib/Socket.h') diff --git a/lib/Socket.h b/lib/Socket.h index 82ff5ad..2393584 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -104,13 +104,14 @@ class UDPSocket const UDPSocket& operator=(const UDPSocket& other) = delete; /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port); void reinit(int port, const std::string& name); void close(void); void send(UDPPacket& packet); void send(const std::vector& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(char* groupname); + void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -120,9 +121,36 @@ class UDPSocket void setBlocking(bool block); protected: - SOCKET listenSocket; + SOCKET m_sock; }; +/* Threaded UDP receiver */ +class UDPReceiver { + public: + UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UDPReceiver(); + UDPReceiver(const UDPReceiver&) = delete; + UDPReceiver operator=(const UDPReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::string m_bindto; + std::string m_mcastaddr; + size_t m_max_packets_queued; + std::thread m_thread; + std::atomic m_stop; + ThreadsafeQueue m_packets; + UDPSocket m_sock; +}; class TCPSocket { public: -- cgit v1.2.3