diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-25 10:50:23 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-25 10:50:23 +0200 |
commit | 03967733d70220e2de7af3cdad320aec5c82ede1 (patch) | |
tree | 4a1bd7adfb8825c95cfc1fa0c69f857aef234561 /lib/Socket.h | |
parent | 15d7ad8ac5bb187ac323da7dc30b9724b18c7df7 (diff) | |
download | dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.tar.gz dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.tar.bz2 dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.zip |
Add more EDI input improvements
Diffstat (limited to 'lib/Socket.h')
-rw-r--r-- | lib/Socket.h | 36 |
1 files changed, 31 insertions, 5 deletions
diff --git a/lib/Socket.h b/lib/Socket.h index 2393584..8bb7fe1 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -127,7 +127,7 @@ class UDPSocket /* Threaded UDP receiver */ class UDPReceiver { public: - UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + UDPReceiver(); ~UDPReceiver(); UDPReceiver(const UDPReceiver&) = delete; UDPReceiver operator=(const UDPReceiver&) = delete; @@ -142,12 +142,12 @@ class UDPReceiver { private: void m_run(void); - int m_port; + int m_port = 0; std::string m_bindto; std::string m_mcastaddr; - size_t m_max_packets_queued; + size_t m_max_packets_queued = 1; std::thread m_thread; - std::atomic<bool> m_stop; + std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false); ThreadsafeQueue<UDPPacket> m_packets; UDPSocket m_sock; }; @@ -254,7 +254,7 @@ class TCPDataDispatcher void write(const std::vector<uint8_t>& data); private: - void process(void); + void process(); size_t m_max_queue_size; @@ -265,4 +265,30 @@ class TCPDataDispatcher std::list<TCPConnection> m_connections; }; +/* A TCP Server to receive data, which abstracts the handling of connects and disconnects. + */ +class TCPReceiveServer { + public: + TCPReceiveServer(size_t blocksize); + ~TCPReceiveServer(); + TCPReceiveServer(const TCPReceiveServer&) = delete; + TCPReceiveServer& operator=(const TCPReceiveServer&) = delete; + + void start(int listen_port, const std::string& address); + + // Return a vector that contains up to blocksize bytes of data, or + // and empty vector if no data is available. + std::vector<uint8_t> receive(); + + private: + void process(); + + size_t m_blocksize = 0; + ThreadsafeQueue<std::vector<uint8_t> > m_queue; + std::atomic<bool> m_running; + std::string m_exception_data; + std::thread m_listener_thread; + TCPSocket m_listener_socket; +}; + } |