1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
/*
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <list>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>
#define SOCKET int
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
namespace Socket {
struct InetAddress {
struct sockaddr_storage addr = {};
struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
void resolveUdpDestination(const std::string& destination, int port);
};
/** This class represents a UDP packet.
*
* A UDP packet contains a payload (sequence of bytes) and an address. For
* outgoing packets, the address is the destination address. For incoming
* packets, the address tells the user from what source the packet arrived from.
*/
class UDPPacket
{
public:
UDPPacket();
UDPPacket(size_t initSize);
std::vector<uint8_t> buffer;
InetAddress address;
};
/**
* This class represents a socket for sending and receiving UDP packets.
*
* A UDP socket is the sending or receiving point for a packet delivery service.
* Each packet sent or received on a datagram socket is individually
* addressed and routed. Multiple packets sent from one machine to another may
* be routed differently, and may arrive in any order.
*/
class UDPSocket
{
public:
/** Create a new socket that will not be bound to any port. To be used
* for data output.
*/
UDPSocket();
/** Create a new socket.
* @param port The port number on which the socket will be bound
*/
UDPSocket(int port);
/** Create a new socket.
* @param port The port number on which the socket will be bound
* @param name The IP address on which the socket will be bound.
* It is used to bind the socket on a specific interface if
* the computer have many NICs.
*/
UDPSocket(int port, const std::string& name);
~UDPSocket();
UDPSocket(const UDPSocket& other) = delete;
const UDPSocket& operator=(const UDPSocket& other) = delete;
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
void reinit(int port, const std::string& name);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector<uint8_t>& data, InetAddress destination);
UDPPacket receive(size_t max_size);
void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
/** Set blocking mode. By default, the socket is blocking.
* throws a runtime_error on error.
*/
void setBlocking(bool block);
protected:
SOCKET m_sock;
};
/* Threaded UDP receiver */
class UDPReceiver {
public:
UDPReceiver();
~UDPReceiver();
UDPReceiver(const UDPReceiver&) = delete;
UDPReceiver operator=(const UDPReceiver&) = delete;
// Start the receiver in a separate thread
void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
// Get the data contained in a UDP packet, blocks if none available
// In case of error, throws a runtime_error
std::vector<uint8_t> get_packet_buffer(void);
private:
void m_run(void);
int m_port = 0;
std::string m_bindto;
std::string m_mcastaddr;
size_t m_max_packets_queued = 1;
std::thread m_thread;
std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
ThreadsafeQueue<UDPPacket> m_packets;
UDPSocket m_sock;
};
class TCPSocket {
public:
TCPSocket();
~TCPSocket();
TCPSocket(const TCPSocket& other) = delete;
TCPSocket& operator=(const TCPSocket& other) = delete;
TCPSocket(TCPSocket&& other);
TCPSocket& operator=(TCPSocket&& other);
bool valid(void) const;
void connect(const std::string& hostname, int port, bool nonblock = false);
void listen(int port, const std::string& name);
void close(void);
/* throws a runtime_error on failure, an invalid socket on timeout */
TCPSocket accept(int timeout_ms);
/* returns -1 on error, doesn't work on nonblocking sockets */
ssize_t sendall(const void *buffer, size_t buflen);
/** Send data over the TCP connection.
* @param data The buffer that will be sent.
* @param size Number of bytes to send.
* @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
* return number of bytes sent, 0 on timeout, or throws runtime_error.
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
/* Returns number of bytes read, 0 on disconnect. Throws a
* runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags);
class Timeout {};
class Interrupted {};
/* Returns number of bytes read, 0 on disconnect or refused connection.
* Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error
*/
ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
private:
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
SOCKET m_sock = -1;
InetAddress m_remote_address;
friend class TCPClient;
};
/* Implements a TCP receiver that auto-reconnects on errors */
class TCPClient {
public:
void connect(const std::string& hostname, int port);
/* Returns numer of bytes read, 0 on auto-reconnect, -1
* on interruption.
* Throws a runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
private:
void reconnect(void);
TCPSocket m_sock;
std::string m_hostname;
int m_port;
};
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
* a sender thread. */
class TCPConnection
{
public:
TCPConnection(TCPSocket&& sock);
TCPConnection(const TCPConnection&) = delete;
TCPConnection& operator=(const TCPConnection&) = delete;
~TCPConnection();
ThreadsafeQueue<std::vector<uint8_t> > queue;
private:
std::atomic<bool> m_running;
std::thread m_sender_thread;
TCPSocket m_sock;
void process(void);
};
/* Send a TCP stream to several destinations, and automatically disconnect destinations
* whose buffer overflows.
*/
class TCPDataDispatcher
{
public:
TCPDataDispatcher(size_t max_queue_size);
~TCPDataDispatcher();
TCPDataDispatcher(const TCPDataDispatcher&) = delete;
TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
void start(int port, const std::string& address);
void write(const std::vector<uint8_t>& data);
private:
void process();
size_t m_max_queue_size;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
std::list<TCPConnection> m_connections;
};
/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
*/
class TCPReceiveServer {
public:
TCPReceiveServer(size_t blocksize);
~TCPReceiveServer();
TCPReceiveServer(const TCPReceiveServer&) = delete;
TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
void start(int listen_port, const std::string& address);
// Return a vector that contains up to blocksize bytes of data, or
// and empty vector if no data is available.
std::vector<uint8_t> receive();
private:
void process();
size_t m_blocksize = 0;
ThreadsafeQueue<std::vector<uint8_t> > m_queue;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
};
/* A TCP client that abstracts the handling of connects and disconnects.
*/
class TCPSendClient {
public:
TCPSendClient(const std::string& hostname, int port);
~TCPSendClient();
/* Throws a runtime_error on error
*/
void sendall(std::vector<uint8_t>&& buffer);
private:
void process();
std::string m_hostname;
int m_port;
bool m_is_connected = false;
TCPSocket m_sock;
static constexpr size_t MAX_QUEUE_SIZE = 1024;
ThreadsafeQueue<std::vector<uint8_t> > m_queue;
std::atomic<bool> m_running;
std::string m_exception_data;
std::thread m_sender_thread;
TCPSocket m_listener_socket;
};
}
|