/*
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#pragma once
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "ThreadsafeQueue.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SOCKET int
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
namespace Socket {
struct InetAddress {
struct sockaddr_storage addr = {};
struct sockaddr *as_sockaddr() { return reinterpret_cast(&addr); };
void resolveUdpDestination(const std::string& destination, int port);
std::string to_string() const;
};
/** This class represents a UDP packet.
*
* A UDP packet contains a payload (sequence of bytes) and an address. For
* outgoing packets, the address is the destination address. For incoming
* packets, the address tells the user from what source the packet arrived from.
*/
class UDPPacket
{
public:
UDPPacket();
UDPPacket(size_t initSize);
std::vector buffer;
InetAddress address;
};
/**
* This class represents a socket for sending and receiving UDP packets.
*
* A UDP socket is the sending or receiving point for a packet delivery service.
* Each packet sent or received on a datagram socket is individually
* addressed and routed. Multiple packets sent from one machine to another may
* be routed differently, and may arrive in any order.
*/
class UDPSocket
{
public:
/** Create a new socket that will not be bound to any port. To be used
* for data output.
*/
UDPSocket();
/** Create a new socket.
* @param port The port number on which the socket will be bound
*/
UDPSocket(int port);
/** Create a new socket.
* @param port The port number on which the socket will be bound
* @param name The IP address on which the socket will be bound.
* It is used to bind the socket on a specific interface if
* the computer have many NICs.
*/
UDPSocket(int port, const std::string& name);
~UDPSocket();
UDPSocket(const UDPSocket& other) = delete;
const UDPSocket& operator=(const UDPSocket& other) = delete;
UDPSocket(UDPSocket&& other);
const UDPSocket& operator=(UDPSocket&& other);
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
void reinit(int port, const std::string& name);
void init_receive_multicast(int port, const std::string& local_if_addr, const std::string& mcastaddr);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector& data, InetAddress destination);
void send(const std::string& data, InetAddress destination);
UDPPacket receive(size_t max_size);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
/** Set blocking mode. By default, the socket is blocking.
* throws a runtime_error on error.
*/
void setBlocking(bool block);
SOCKET getNativeSocket() const;
int getPort() const;
private:
void join_group(const char* groupname, const char* if_addr = nullptr);
void post_init();
protected:
SOCKET m_sock = INVALID_SOCKET;
int m_port = 0;
std::string m_multicast_source = "";
};
/* UDP packet receiver supporting receiving from several ports at once */
class UDPReceiver {
public:
void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr);
struct ReceivedPacket {
std::vector packetdata;
InetAddress received_from;
int port_received_on;
};
class Interrupted {};
class Timeout {};
/* Returns one or several packets,
* throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error. */
std::vector receive(int timeout_ms);
private:
void m_run(void);
std::vector m_sockets;
};
class TCPSocket {
public:
TCPSocket();
~TCPSocket();
TCPSocket(const TCPSocket& other) = delete;
TCPSocket& operator=(const TCPSocket& other) = delete;
TCPSocket(TCPSocket&& other);
TCPSocket& operator=(TCPSocket&& other);
bool valid(void) const;
void connect(const std::string& hostname, int port, bool nonblock = false);
void connect(const std::string& hostname, int port, int timeout_ms);
void listen(int port, const std::string& name);
void close(void);
/* Enable TCP keepalive. See
* https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
*/
void enable_keepalive(int time, int intvl, int probes);
/* throws a runtime_error on failure, an invalid socket on timeout */
TCPSocket accept(int timeout_ms);
/* returns -1 on error, doesn't work on nonblocking sockets */
ssize_t sendall(const void *buffer, size_t buflen);
/** Send data over the TCP connection.
* @param data The buffer that will be sent.
* @param size Number of bytes to send.
* @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
* return number of bytes sent, 0 on timeout, or throws runtime_error.
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
class Interrupted {};
/* Returns number of bytes read, 0 on disconnect.
* Throws Interrupted on EINTR, runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags);
class Timeout {};
/* Returns number of bytes read, 0 on disconnect or refused connection.
* Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error
*/
ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
SOCKET get_sockfd() const { return m_sock; }
private:
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
SOCKET m_sock = -1;
InetAddress m_remote_address;
friend class TCPClient;
};
/* Implements a TCP receiver that auto-reconnects on errors */
class TCPClient {
public:
void connect(const std::string& hostname, int port);
/* Returns numer of bytes read, 0 on auto-reconnect, -1
* on interruption.
* Throws a runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
private:
void reconnect(void);
TCPSocket m_sock;
std::string m_hostname;
int m_port;
std::optional m_last_received_packet_ts;
};
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
* a sender thread. */
class TCPConnection
{
public:
TCPConnection(TCPSocket&& sock);
TCPConnection(const TCPConnection&) = delete;
TCPConnection& operator=(const TCPConnection&) = delete;
~TCPConnection();
ThreadsafeQueue > queue;
private:
std::atomic m_running;
std::thread m_sender_thread;
TCPSocket m_sock;
void process(void);
};
/* Send a TCP stream to several destinations, and automatically disconnect destinations
* whose buffer overflows.
*/
class TCPDataDispatcher
{
public:
TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);
~TCPDataDispatcher();
TCPDataDispatcher(const TCPDataDispatcher&) = delete;
TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
void start(int port, const std::string& address);
void write(const std::vector& data);
private:
void process();
size_t m_max_queue_size;
size_t m_buffers_to_preroll;
std::atomic m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
std::mutex m_mutex;
std::deque > m_preroll_queue;
std::list m_connections;
};
struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; };
struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { };
struct TCPReceiveMessageEmpty : public TCPReceiveMessage { };
struct TCPReceiveMessageData : public TCPReceiveMessage {
TCPReceiveMessageData(std::vector d) : data(d) {};
std::vector data;
};
/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
*/
class TCPReceiveServer {
public:
TCPReceiveServer(size_t blocksize);
~TCPReceiveServer();
TCPReceiveServer(const TCPReceiveServer&) = delete;
TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
void start(int listen_port, const std::string& address);
// Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize
// bytes of data, or TCPReceiveMessageEmpty if no data is available.
std::shared_ptr receive();
private:
void process();
size_t m_blocksize = 0;
ThreadsafeQueue > m_queue;
std::atomic m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
};
/* A TCP client that abstracts the handling of connects and disconnects.
*/
class TCPSendClient {
public:
TCPSendClient(const std::string& hostname, int port);
~TCPSendClient();
TCPSendClient(const TCPSendClient&) = delete;
TCPSendClient& operator=(const TCPSendClient&) = delete;
struct ErrorStats {
std::string last_error = "";
size_t num_reconnects = 0;
bool has_seen_new_errors = false;
};
/* Throws a runtime_error when the process thread isn't running */
ErrorStats sendall(const std::vector& buffer);
private:
void process();
std::string m_hostname;
int m_port;
bool m_is_connected = false;
TCPSocket m_sock;
static constexpr size_t MAX_QUEUE_SIZE = 512;
ThreadsafeQueue > m_queue;
std::atomic m_running;
std::string m_exception_data;
std::thread m_sender_thread;
TCPSocket m_listener_socket;
std::atomic m_num_reconnects = ATOMIC_VAR_INIT(0);
size_t m_num_reconnects_prev = 0;
std::mutex m_error_mutex;
std::string m_last_error = "";
};
}