summaryrefslogtreecommitdiffstats
path: root/lib/Socket.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-07-08 12:08:44 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-07-08 12:08:44 +0200
commitc8c9d5abf8238c3531ecae8fb272fdf5bbffd336 (patch)
tree30049a585baf782b3a0d74a319768efbdf66542e /lib/Socket.h
parent3a7202306c6aca5be2dad604f62063d605fd0982 (diff)
downloaddabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.tar.gz
dabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.tar.bz2
dabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.zip
Unify Socket library with other mmbTools
Diffstat (limited to 'lib/Socket.h')
-rw-r--r--lib/Socket.h294
1 files changed, 294 insertions, 0 deletions
diff --git a/lib/Socket.h b/lib/Socket.h
new file mode 100644
index 0000000..8bb7fe1
--- /dev/null
+++ b/lib/Socket.h
@@ -0,0 +1,294 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "ThreadsafeQueue.h"
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include <atomic>
+#include <thread>
+#include <list>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#define SOCKET int
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
+
+
+namespace Socket {
+
+struct InetAddress {
+ struct sockaddr_storage addr;
+
+ struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
+
+ void resolveUdpDestination(const std::string& destination, int port);
+};
+
+/** This class represents a UDP packet.
+ *
+ * A UDP packet contains a payload (sequence of bytes) and an address. For
+ * outgoing packets, the address is the destination address. For incoming
+ * packets, the address tells the user from what source the packet arrived from.
+ */
+class UDPPacket
+{
+ public:
+ UDPPacket();
+ UDPPacket(size_t initSize);
+
+ std::vector<uint8_t> buffer;
+ InetAddress address;
+};
+
+/**
+ * This class represents a socket for sending and receiving UDP packets.
+ *
+ * A UDP socket is the sending or receiving point for a packet delivery service.
+ * Each packet sent or received on a datagram socket is individually
+ * addressed and routed. Multiple packets sent from one machine to another may
+ * be routed differently, and may arrive in any order.
+ */
+class UDPSocket
+{
+ public:
+ /** Create a new socket that will not be bound to any port. To be used
+ * for data output.
+ */
+ UDPSocket();
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ */
+ UDPSocket(int port);
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ * @param name The IP address on which the socket will be bound.
+ * It is used to bind the socket on a specific interface if
+ * the computer have many NICs.
+ */
+ UDPSocket(int port, const std::string& name);
+ ~UDPSocket();
+ UDPSocket(const UDPSocket& other) = delete;
+ const UDPSocket& operator=(const UDPSocket& other) = delete;
+
+ /** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
+ void reinit(int port, const std::string& name);
+
+ void close(void);
+ void send(UDPPacket& packet);
+ void send(const std::vector<uint8_t>& data, InetAddress destination);
+ UDPPacket receive(size_t max_size);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
+ void setMulticastSource(const char* source_addr);
+ void setMulticastTTL(int ttl);
+
+ /** Set blocking mode. By default, the socket is blocking.
+ * throws a runtime_error on error.
+ */
+ void setBlocking(bool block);
+
+ protected:
+ SOCKET m_sock;
+};
+
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver();
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port = 0;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued = 1;
+ std::thread m_thread;
+ std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
+
+class TCPSocket {
+ public:
+ TCPSocket();
+ ~TCPSocket();
+ TCPSocket(const TCPSocket& other) = delete;
+ TCPSocket& operator=(const TCPSocket& other) = delete;
+ TCPSocket(TCPSocket&& other);
+ TCPSocket& operator=(TCPSocket&& other);
+
+ bool valid(void) const;
+ void connect(const std::string& hostname, int port);
+ void listen(int port, const std::string& name);
+ void close(void);
+
+ /* throws a runtime_error on failure, an invalid socket on timeout */
+ TCPSocket accept(int timeout_ms);
+
+ /* returns -1 on error, doesn't work on nonblocking sockets */
+ ssize_t sendall(const void *buffer, size_t buflen);
+
+ /** Send data over the TCP connection.
+ * @param data The buffer that will be sent.
+ * @param size Number of bytes to send.
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
+ */
+ ssize_t send(const void* data, size_t size, int timeout_ms=0);
+
+ /* Returns number of bytes read, 0 on disconnect. Throws a
+ * runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags);
+
+ class Timeout {};
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect or refused connection.
+ * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error
+ */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ explicit TCPSocket(int sockfd);
+ explicit TCPSocket(int sockfd, InetAddress remote_address);
+ SOCKET m_sock = -1;
+
+ InetAddress m_remote_address;
+
+ friend class TCPClient;
+};
+
+/* Implements a TCP receiver that auto-reconnects on errors */
+class TCPClient {
+ public:
+ void connect(const std::string& hostname, int port);
+
+ /* Returns numer of bytes read, 0 on auto-reconnect, -1
+ * on interruption.
+ * Throws a runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ void reconnect(void);
+ TCPSocket m_sock;
+ std::string m_hostname;
+ int m_port;
+};
+
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TCPSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TCPSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process();
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
+ */
+class TCPReceiveServer {
+ public:
+ TCPReceiveServer(size_t blocksize);
+ ~TCPReceiveServer();
+ TCPReceiveServer(const TCPReceiveServer&) = delete;
+ TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
+
+ void start(int listen_port, const std::string& address);
+
+ // Return a vector that contains up to blocksize bytes of data, or
+ // and empty vector if no data is available.
+ std::vector<uint8_t> receive();
+
+ private:
+ void process();
+
+ size_t m_blocksize = 0;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+};
+
+}