aboutsummaryrefslogtreecommitdiffstats
path: root/lib/Socket.h
blob: d8242e20d2e444a1465198f6312fb940d825298c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
/*
   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
   Queen in Right of Canada (Communications Research Center Canada)

   Copyright (C) 2022
   Matthias P. Braendli, matthias.braendli@mpb.li

    http://www.opendigitalradio.org
   */
/*
   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation, either version 3 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <https://www.gnu.org/licenses/>.
*/

#pragma once

#ifdef HAVE_CONFIG_H
#   include "config.h"
#endif

#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <atomic>
#include <iostream>
#include <list>
#include <memory>
#include <thread>
#include <vector>

#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>
#define SOCKET           int
#define INVALID_SOCKET   -1
#define SOCKET_ERROR     -1


namespace Socket {

struct InetAddress {
    struct sockaddr_storage addr = {};

    struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };

    void resolveUdpDestination(const std::string& destination, int port);

    std::string to_string() const;
};

/** This class represents a UDP packet.
 *
 *  A UDP packet contains a payload (sequence of bytes) and an address. For
 *  outgoing packets, the address is the destination address. For incoming
 *  packets, the address tells the user from what source the packet arrived from.
 */
class UDPPacket
{
    public:
        UDPPacket();
        UDPPacket(size_t initSize);

        std::vector<uint8_t> buffer;
        InetAddress address;
};

/**
 *  This class represents a socket for sending and receiving UDP packets.
 *
 *  A UDP socket is the sending or receiving point for a packet delivery service.
 *  Each packet sent or received on a datagram socket is individually
 *  addressed and routed. Multiple packets sent from one machine to another may
 *  be routed differently, and may arrive in any order.
 */
class UDPSocket
{
    public:
        /** Create a new socket that will not be bound to any port. To be used
         * for data output.
         */
        UDPSocket();
        /** Create a new socket.
         *  @param port The port number on which the socket will be bound
         */
        UDPSocket(int port);
        /** Create a new socket.
         *  @param port The port number on which the socket will be bound
         *  @param name The IP address on which the socket will be bound.
         *              It is used to bind the socket on a specific interface if
         *              the computer have many NICs.
         */
        UDPSocket(int port, const std::string& name);
        ~UDPSocket();
        UDPSocket(const UDPSocket& other) = delete;
        const UDPSocket& operator=(const UDPSocket& other) = delete;
        UDPSocket(UDPSocket&& other);
        const UDPSocket& operator=(UDPSocket&& other);

        /** Close the already open socket, and create a new one. Throws a runtime_error on error.  */
        void reinit(int port);
        void reinit(int port, const std::string& name);

        void close(void);
        void send(UDPPacket& packet);
        void send(const std::vector<uint8_t>& data, InetAddress destination);
        void send(const std::string& data, InetAddress destination);
        UDPPacket receive(size_t max_size);
        void joinGroup(const char* groupname, const char* if_addr = nullptr);
        void setMulticastSource(const char* source_addr);
        void setMulticastTTL(int ttl);

        /** Set blocking mode. By default, the socket is blocking.
         * throws a runtime_error on error.
         */
        void setBlocking(bool block);

        SOCKET getNativeSocket() const;
        int getPort() const;

    protected:
        SOCKET m_sock = INVALID_SOCKET;
        int m_port = 0;
};

/* UDP packet receiver supporting receiving from several ports at once */
class UDPReceiver {
    public:
        void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr);

        struct ReceivedPacket {
            std::vector<uint8_t> packetdata;
            InetAddress received_from;
            int port_received_on;
        };

        class Interrupted {};
        class Timeout {};
        /* Returns one or several packets,
         * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
         * on error. */
        std::vector<ReceivedPacket> receive(int timeout_ms);

    private:
        void m_run(void);

        std::vector<UDPSocket> m_sockets;
};

class TCPSocket {
    public:
        TCPSocket();
        ~TCPSocket();
        TCPSocket(const TCPSocket& other) = delete;
        TCPSocket& operator=(const TCPSocket& other) = delete;
        TCPSocket(TCPSocket&& other);
        TCPSocket& operator=(TCPSocket&& other);

        bool valid(void) const;
        void connect(const std::string& hostname, int port, bool nonblock = false);
        void connect(const std::string& hostname, int port, int timeout_ms);
        void listen(int port, const std::string& name);
        void close(void);

        /* Enable TCP keepalive. See
         * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
         */
        void enable_keepalive(int time, int intvl, int probes);

        /* throws a runtime_error on failure, an invalid socket on timeout */
        TCPSocket accept(int timeout_ms);

        /* returns -1 on error, doesn't work on nonblocking sockets */
        ssize_t sendall(const void *buffer, size_t buflen);

        /** Send data over the TCP connection.
         *  @param data The buffer that will be sent.
         *  @param size Number of bytes to send.
         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
         *  return number of bytes sent, 0 on timeout, or throws runtime_error.
         */
        ssize_t send(const void* data, size_t size, int timeout_ms=0);

        class Interrupted {};
        /* Returns number of bytes read, 0 on disconnect.
         * Throws Interrupted on EINTR, runtime_error on error */
        ssize_t recv(void *buffer, size_t length, int flags);

        class Timeout {};
        /* Returns number of bytes read, 0 on disconnect or refused connection.
         * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
         * on error
         */
        ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);

        SOCKET get_sockfd() const { return m_sock; }

    private:
        explicit TCPSocket(int sockfd);
        explicit TCPSocket(int sockfd, InetAddress remote_address);
        SOCKET m_sock = -1;

        InetAddress m_remote_address;

        friend class TCPClient;
};

/* Implements a TCP receiver that auto-reconnects on errors */
class TCPClient {
    public:
        void connect(const std::string& hostname, int port);

        /* Returns numer of bytes read, 0 on auto-reconnect, -1
         * on interruption.
         * Throws a runtime_error on error */
        ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);

    private:
        void reconnect(void);
        TCPSocket m_sock;
        std::string m_hostname;
        int m_port;
};

/* Helper class for TCPDataDispatcher, contains a queue of pending data and
 * a sender thread. */
class TCPConnection
{
    public:
        TCPConnection(TCPSocket&& sock);
        TCPConnection(const TCPConnection&) = delete;
        TCPConnection& operator=(const TCPConnection&) = delete;
        ~TCPConnection();

        ThreadsafeQueue<std::vector<uint8_t> > queue;

    private:
        std::atomic<bool> m_running;
        std::thread m_sender_thread;
        TCPSocket m_sock;

        void process(void);
};

/* Send a TCP stream to several destinations, and automatically disconnect destinations
 * whose buffer overflows.
 */
class TCPDataDispatcher
{
    public:
        TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);
        ~TCPDataDispatcher();
        TCPDataDispatcher(const TCPDataDispatcher&) = delete;
        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;

        void start(int port, const std::string& address);
        void write(const std::vector<uint8_t>& data);

    private:
        void process();

        size_t m_max_queue_size;
        size_t m_buffers_to_preroll;


        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
        std::string m_exception_data;
        std::thread m_listener_thread;
        TCPSocket m_listener_socket;

        std::mutex m_mutex;
        std::deque<std::vector<uint8_t> > m_preroll_queue;
        std::list<TCPConnection> m_connections;
};

struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; };
struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { };
struct TCPReceiveMessageEmpty : public TCPReceiveMessage { };
struct TCPReceiveMessageData : public TCPReceiveMessage {
    TCPReceiveMessageData(std::vector<uint8_t> d) : data(d) {};
    std::vector<uint8_t> data;
};

/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
 */
class TCPReceiveServer {
    public:
        TCPReceiveServer(size_t blocksize);
        ~TCPReceiveServer();
        TCPReceiveServer(const TCPReceiveServer&) = delete;
        TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;

        void start(int listen_port, const std::string& address);

        // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize
        // bytes of data, or TCPReceiveMessageEmpty if no data is available.
        std::shared_ptr<TCPReceiveMessage> receive();

    private:
        void process();

        size_t m_blocksize = 0;
        ThreadsafeQueue<std::shared_ptr<TCPReceiveMessage> > m_queue;
        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
        std::string m_exception_data;
        std::thread m_listener_thread;
        TCPSocket m_listener_socket;
};

/* A TCP client that abstracts the handling of connects and disconnects.
 */
class TCPSendClient {
    public:
        TCPSendClient(const std::string& hostname, int port);
        ~TCPSendClient();

        /* Throws a runtime_error on error
         */
        void sendall(const std::vector<uint8_t>& buffer);

    private:
        void process();

        std::string m_hostname;
        int m_port;

        bool m_is_connected = false;

        TCPSocket m_sock;
        static constexpr size_t MAX_QUEUE_SIZE = 512;
        ThreadsafeQueue<std::vector<uint8_t> > m_queue;
        std::atomic<bool> m_running;
        std::string m_exception_data;
        std::thread m_sender_thread;
        TCPSocket m_listener_socket;
};

}