1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
/*
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This file is part of ODR-DabMux.
ODR-DabMux is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMux is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TCPSOCKET
#define _TCPSOCKET
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "InetAddress.h"
#include "ThreadsafeQueue.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>
#define SOCKET int
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#define reuseopt_t int
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <atomic>
#include <thread>
#include <list>
/**
* This class represents a TCP socket.
*/
class TcpSocket
{
public:
/** Create a new socket that does nothing */
TcpSocket();
/** Create a new socket listening for incoming connections.
* @param port The port number on which the socket will listen.
* @param name The IP address on which the socket will be bound.
* It is used to bind the socket on a specific interface if
* the computer have many NICs.
*/
TcpSocket(int port, const std::string& name);
~TcpSocket();
TcpSocket(TcpSocket&& other);
TcpSocket& operator=(TcpSocket&& other);
TcpSocket(const TcpSocket& other) = delete;
TcpSocket& operator=(const TcpSocket& other) = delete;
bool isValid(void);
int close(void);
/** Send data over the TCP connection.
* @param data The buffer that will be sent.
* @param size Number of bytes to send.
* @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
* return number of bytes sent, 0 on timeout, or throws runtime_error.
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
/** Receive data from the socket.
* @param data The buffer that will receive data.
* @param size The buffer size.
* @return number of bytes received or -1 (SOCKET_ERROR) if error
*/
ssize_t recv(void* data, size_t size);
void listen(void);
TcpSocket accept(void);
/* Returns either valid socket if a connection was
* accepted before the timeout expired, or an invalid
* socket otherwise.
*/
TcpSocket accept(int timeout_ms);
/** Retrieve address this socket is bound to */
InetAddress getOwnAddress() const;
InetAddress getRemoteAddress() const;
private:
TcpSocket(SOCKET sock, InetAddress own, InetAddress remote);
/// The address on which the socket is bound.
InetAddress m_own_address;
InetAddress m_remote_address;
/// The low-level socket used by system functions.
SOCKET m_sock;
};
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
* a sender thread. */
class TCPConnection
{
public:
TCPConnection(TcpSocket&& sock);
TCPConnection(const TCPConnection&) = delete;
TCPConnection& operator=(const TCPConnection&) = delete;
~TCPConnection();
ThreadsafeQueue<std::vector<uint8_t> > queue;
private:
std::atomic<bool> m_running;
std::thread m_sender_thread;
TcpSocket m_sock;
void process(void);
};
/* Send a TCP stream to several destinations, and automatically disconnect destinations
* whose buffer overflows.
*/
class TCPDataDispatcher
{
public:
TCPDataDispatcher(size_t max_queue_size);
~TCPDataDispatcher();
TCPDataDispatcher(const TCPDataDispatcher&) = delete;
TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
void start(int port, const std::string& address);
void write(const std::vector<uint8_t>& data);
private:
void process(void);
size_t m_max_queue_size;
std::atomic<bool> m_running;
std::thread m_listener_thread;
TcpSocket m_listener_socket;
std::list<TCPConnection> m_connections;
};
#endif // _TCPSOCKET
|