summaryrefslogtreecommitdiffstats
path: root/lib/Socket.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
commit8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 (patch)
treec0296c305409bb7fc373625aea05c2f57054eb5c /lib/Socket.cpp
parent43f4a3a2a695c303bd4fdfbd7fec6def29284f2e (diff)
downloaddabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.gz
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.bz2
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.zip
Work on STI-D/EDI input
Diffstat (limited to 'lib/Socket.cpp')
-rw-r--r--lib/Socket.cpp165
1 files changed, 126 insertions, 39 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index fe3df44..9b404eb 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = 0;
hints.ai_protocol = 0;
- hints.ai_canonname = nullptr;
- hints.ai_addr = nullptr;
- hints.ai_next = nullptr;
struct addrinfo *result, *rp;
int s = getaddrinfo(destination.c_str(), service, &hints, &result);
@@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) :
UDPSocket::UDPSocket() :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(0, "");
}
UDPSocket::UDPSocket(int port) :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(port, "");
}
UDPSocket::UDPSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(port, name);
}
@@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) :
void UDPSocket::setBlocking(bool block)
{
- int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK);
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
if (res == -1) {
throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
}
}
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
void UDPSocket::reinit(int port, const std::string& name)
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
}
char service[NI_MAXSERV];
@@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name)
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
@@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name)
}
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
- listenSocket = sfd;
+ m_sock = sfd;
break;
}
@@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name)
void UDPSocket::close()
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
- listenSocket = INVALID_SOCKET;
+ m_sock = INVALID_SOCKET;
}
UDPSocket::~UDPSocket()
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
}
@@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size)
UDPPacket packet(max_size);
socklen_t addrSize;
addrSize = sizeof(*packet.address.as_sockaddr());
- ssize_t ret = recvfrom(listenSocket,
+ ssize_t ret = recvfrom(m_sock,
packet.buffer.data(),
packet.buffer.size(),
0,
@@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size)
if (ret == SOCKET_ERROR) {
packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size)
void UDPSocket::send(UDPPacket& packet)
{
- int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0,
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- throw runtime_error(string("Can't send UDP packet") + strerror(errno));
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
}
}
void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- throw runtime_error(string("Can't send UDP packet") + strerror(errno));
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
}
}
-void UDPSocket::joinGroup(char* groupname)
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
@@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname)
if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
throw runtime_error("Group name is not a multicast address");
}
- group.imr_address.s_addr = htons(INADDR_ANY);;
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
throw runtime_error(string("Can't join multicast group") + strerror(errno));
}
@@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr)
throw runtime_error(string("Can't parse source address") + strerror(errno));
}
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
== SOCKET_ERROR) {
throw runtime_error(string("Can't set source address") + strerror(errno));
}
@@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr)
void UDPSocket::setMulticastTTL(int ttl)
{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
== SOCKET_ERROR) {
throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
}
}
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
-TCPSocket::TCPSocket()
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
{
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
}
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
- &val, sizeof(val)) < 0) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
}
-#endif
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
}
TCPSocket::~TCPSocket()
@@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port)
/* Obtain address(es) matching host/port */
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
@@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name)
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC;
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0;
@@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name)
freeaddrinfo(result);
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
if (rp == nullptr) {
throw runtime_error("Could not bind");
}
@@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher()
m_running = false;
m_connections.clear();
m_listener_socket.close();
- m_listener_thread.join();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
}
void TCPDataDispatcher::start(int port, const string& address)