summaryrefslogtreecommitdiffstats
path: root/lib/Socket.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:06:09 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:06:09 +0100
commitd14814a92377084177753c7a60d83a9307ad0672 (patch)
tree3b901eed7eaacc07341d16dbcd0db0d60951a5e0 /lib/Socket.h
parent0efb3830dcd441ffdb53ebe69f2dc2886614fb8b (diff)
downloaddabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.gz
dabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.bz2
dabmod-d14814a92377084177753c7a60d83a9307ad0672.zip
Update common code to latest, update zmq.hpp and adapt
Diffstat (limited to 'lib/Socket.h')
-rw-r--r--lib/Socket.h65
1 files changed, 39 insertions, 26 deletions
diff --git a/lib/Socket.h b/lib/Socket.h
index 8881be3..33cdc05 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -30,11 +30,12 @@
#include "ThreadsafeQueue.h"
#include <cstdlib>
-#include <iostream>
-#include <vector>
#include <atomic>
-#include <thread>
+#include <iostream>
#include <list>
+#include <memory>
+#include <thread>
+#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -55,6 +56,8 @@ struct InetAddress {
struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
void resolveUdpDestination(const std::string& destination, int port);
+
+ std::string to_string() const;
};
/** This class represents a UDP packet.
@@ -102,6 +105,8 @@ class UDPSocket
~UDPSocket();
UDPSocket(const UDPSocket& other) = delete;
const UDPSocket& operator=(const UDPSocket& other) = delete;
+ UDPSocket(UDPSocket&& other);
+ const UDPSocket& operator=(UDPSocket&& other);
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
@@ -120,36 +125,36 @@ class UDPSocket
*/
void setBlocking(bool block);
+ SOCKET getNativeSocket() const;
+ int getPort() const;
+
protected:
- SOCKET m_sock;
+ SOCKET m_sock = INVALID_SOCKET;
+ int m_port = 0;
};
-/* Threaded UDP receiver */
+/* UDP packet receiver supporting receiving from several ports at once */
class UDPReceiver {
public:
- UDPReceiver();
- ~UDPReceiver();
- UDPReceiver(const UDPReceiver&) = delete;
- UDPReceiver operator=(const UDPReceiver&) = delete;
+ void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr);
- // Start the receiver in a separate thread
- void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+ struct ReceivedPacket {
+ std::vector<uint8_t> packetdata;
+ InetAddress received_from;
+ int port_received_on;
+ };
- // Get the data contained in a UDP packet, blocks if none available
- // In case of error, throws a runtime_error
- std::vector<uint8_t> get_packet_buffer(void);
+ class Interrupted {};
+ class Timeout {};
+ /* Returns one or several packets,
+ * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error. */
+ std::vector<ReceivedPacket> receive(int timeout_ms);
private:
void m_run(void);
- int m_port = 0;
- std::string m_bindto;
- std::string m_mcastaddr;
- size_t m_max_packets_queued = 1;
- std::thread m_thread;
- std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
- ThreadsafeQueue<UDPPacket> m_packets;
- UDPSocket m_sock;
+ std::vector<UDPSocket> m_sockets;
};
class TCPSocket {
@@ -265,6 +270,14 @@ class TCPDataDispatcher
std::list<TCPConnection> m_connections;
};
+struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; };
+struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { };
+struct TCPReceiveMessageEmpty : public TCPReceiveMessage { };
+struct TCPReceiveMessageData : public TCPReceiveMessage {
+ TCPReceiveMessageData(std::vector<uint8_t> d) : data(d) {};
+ std::vector<uint8_t> data;
+};
+
/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
*/
class TCPReceiveServer {
@@ -276,15 +289,15 @@ class TCPReceiveServer {
void start(int listen_port, const std::string& address);
- // Return a vector that contains up to blocksize bytes of data, or
- // and empty vector if no data is available.
- std::vector<uint8_t> receive();
+ // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize
+ // bytes of data, or TCPReceiveMessageEmpty if no data is available.
+ std::shared_ptr<TCPReceiveMessage> receive();
private:
void process();
size_t m_blocksize = 0;
- ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ ThreadsafeQueue<std::shared_ptr<TCPReceiveMessage> > m_queue;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;