Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
#pragma once
# include "config.h"
#include "ThreadsafeQueue.h"
#define SOCKET int
#define SOCKET_ERROR -1
namespace Socket {
struct InetAddress {
struct sockaddr_storage addr = {};
struct sockaddr *as_sockaddr() { return reinterpret_cast(&addr); };
void resolveUdpDestination(const std::string& destination, int port);
std::string to_string() const;
/** This class represents a UDP packet.
* A UDP packet contains a payload (sequence of bytes) and an address. For
* outgoing packets, the address is the destination address. For incoming
* packets, the address tells the user from what source the packet arrived from.
class UDPPacket
UDPPacket(size_t initSize);
std::vector buffer;
InetAddress address;
* This class represents a socket for sending and receiving UDP packets.
* A UDP socket is the sending or receiving point for a packet delivery service.
* Each packet sent or received on a datagram socket is individually
* addressed and routed. Multiple packets sent from one machine to another may
* be routed differently, and may arrive in any order.
class UDPSocket
/** Create a new socket that will not be bound to any port. To be used
* for data output.
/** Create a new socket.
* @param port The port number on which the socket will be bound
UDPSocket(int port);
/** Create a new socket.
* @param port The port number on which the socket will be bound
* @param name The IP address on which the socket will be bound.
* It is used to bind the socket on a specific interface if
* the computer have many NICs.
UDPSocket(int port, const std::string& name);
UDPSocket(const UDPSocket& other) = delete;
const UDPSocket& operator=(const UDPSocket& other) = delete;
UDPSocket(UDPSocket&& other);
const UDPSocket& operator=(UDPSocket&& other);
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
void reinit(int port, const std::string& name);
void init_receive_multicast(int port, const std::string& local_if_addr, const std::string& mcastaddr);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector& data, InetAddress destination);
void send(const std::string& data, InetAddress destination);
UDPPacket receive(size_t max_size);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
/** Set blocking mode. By default, the socket is blocking.
* throws a runtime_error on error.
void setBlocking(bool block);
SOCKET getNativeSocket() const;
int getPort() const;
void join_group(const char* groupname, const char* if_addr = nullptr);
void post_init();
int m_port = 0;
std::string m_multicast_source = "";
/* UDP packet receiver supporting receiving from several ports at once */
class UDPReceiver {
void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr);
struct ReceivedPacket {
std::vector packetdata;
InetAddress received_from;
int port_received_on;
class Interrupted {};
class Timeout {};
/* Returns one or several packets,
* throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error. */
std::vector receive(int timeout_ms);
void m_run(void);
std::vector m_sockets;
class TCPSocket {
TCPSocket(const TCPSocket& other) = delete;
TCPSocket& operator=(const TCPSocket& other) = delete;
TCPSocket(TCPSocket&& other);
TCPSocket& operator=(TCPSocket&& other);
bool valid(void) const;
void connect(const std::string& hostname, int port, bool nonblock = false);
void connect(const std::string& hostname, int port, int timeout_ms);
void listen(int port, const std::string& name);
void close(void);
/* Enable TCP keepalive. See
* https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
void enable_keepalive(int time, int intvl, int probes);
/* throws a runtime_error on failure, an invalid socket on timeout */
TCPSocket accept(int timeout_ms);
/* returns -1 on error, doesn't work on nonblocking sockets */
ssize_t sendall(const void *buffer, size_t buflen);
/** Send data over the TCP connection.
* @param data The buffer that will be sent.
* @param size Number of bytes to send.
* @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
* return number of bytes sent, 0 on timeout, or throws runtime_error.
ssize_t send(const void* data, size_t size, int timeout_ms=0);
class Interrupted {};
/* Returns number of bytes read, 0 on disconnect.
* Throws Interrupted on EINTR, runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags);
class Timeout {};
/* Returns number of bytes read, 0 on disconnect or refused connection.
* Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error
ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
SOCKET get_sockfd() const { return m_sock; }
InetAddress get_remote_address() const { return m_remote_address; }
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
SOCKET m_sock = -1;
InetAddress m_remote_address;
friend class TCPClient;
/* Implements a TCP receiver that auto-reconnects on errors */
class TCPClient {
void connect(const std::string& hostname, int port);
/* Returns numer of bytes read, 0 on auto-reconnect, -1
* on interruption.
* Throws a runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
void reconnect(void);
TCPSocket m_sock;
std::string m_hostname;
int m_port;
std::optional m_last_received_packet_ts;
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
* a sender thread. */
class TCPConnection
TCPConnection(TCPSocket&& sock);
TCPConnection(const TCPConnection&) = delete;
TCPConnection& operator=(const TCPConnection&) = delete;
ThreadsafeQueue > queue;
struct stats_t {
size_t buffer_fullness = 0;
InetAddress remote_address;
stats_t get_stats() const;
std::atomic m_running;
std::thread m_sender_thread;
TCPSocket m_sock;
void process(void);
/* Send a TCP stream to several destinations, and automatically disconnect destinations
* whose buffer overflows.
class TCPDataDispatcher
TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);
TCPDataDispatcher(const TCPDataDispatcher&) = delete;
TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
void start(int port, const std::string& address);
void write(const std::vector& data);
std::vector get_stats() const;
void process();
size_t m_max_queue_size;
size_t m_buffers_to_preroll;
std::atomic m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
std::mutex m_mutex;
std::deque > m_preroll_queue;
std::list m_connections;
struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; };
struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { };
struct TCPReceiveMessageEmpty : public TCPReceiveMessage { };
struct TCPReceiveMessageData : public TCPReceiveMessage {
TCPReceiveMessageData(std::vector d) : data(d) {};
std::vector data;
/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
class TCPReceiveServer {
TCPReceiveServer(size_t blocksize);
TCPReceiveServer(const TCPReceiveServer&) = delete;
TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
void start(int listen_port, const std::string& address);
// Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize
// bytes of data, or TCPReceiveMessageEmpty if no data is available.
std::shared_ptr receive();
void process();
size_t m_blocksize = 0;
ThreadsafeQueue > m_queue;
std::atomic m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
/* A TCP client that abstracts the handling of connects and disconnects.
class TCPSendClient {
TCPSendClient(const std::string& hostname, int port);
TCPSendClient(const TCPSendClient&) = delete;
TCPSendClient& operator=(const TCPSendClient&) = delete;
struct ErrorStats {
std::string last_error = "";
size_t num_reconnects = 0;
bool has_seen_new_errors = false;
/* Throws a runtime_error when the process thread isn't running */
ErrorStats sendall(const std::vector& buffer);
void process();
std::string m_hostname;
int m_port;
bool m_is_connected = false;
TCPSocket m_sock;
static constexpr size_t MAX_QUEUE_SIZE = 512;
ThreadsafeQueue > m_queue;
std::atomic m_running;
std::string m_exception_data;
std::thread m_sender_thread;
TCPSocket m_listener_socket;
std::atomic m_num_reconnects = ATOMIC_VAR_INIT(0);
size_t m_num_reconnects_prev = 0;
std::mutex m_error_mutex;
std::string m_last_error = "";