summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-07-08 12:08:44 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-07-08 12:08:44 +0200
commitc8c9d5abf8238c3531ecae8fb272fdf5bbffd336 (patch)
tree30049a585baf782b3a0d74a319768efbdf66542e /lib
parent3a7202306c6aca5be2dad604f62063d605fd0982 (diff)
downloaddabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.tar.gz
dabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.tar.bz2
dabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.zip
Unify Socket library with other mmbTools
Diffstat (limited to 'lib')
-rw-r--r--lib/InetAddress.cpp171
-rw-r--r--lib/InetAddress.h80
-rw-r--r--lib/Socket.cpp898
-rw-r--r--lib/Socket.h294
-rw-r--r--lib/ThreadsafeQueue.h176
-rw-r--r--lib/UdpSocket.cpp349
-rw-r--r--lib/UdpSocket.h206
7 files changed, 1368 insertions, 806 deletions
diff --git a/lib/InetAddress.cpp b/lib/InetAddress.cpp
deleted file mode 100644
index d92e1b9..0000000
--- a/lib/InetAddress.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "InetAddress.h"
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
-
-#ifdef TRACE_ON
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl
-# define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl
-# endif
-#else
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func)
-# define TRACE_STATIC(clas, func)
-# endif
-#endif
-
-
-int inetErrNo = 0;
-const char *inetErrMsg = nullptr;
-const char *inetErrDesc = nullptr;
-
-
-/**
- * Constructs an IP address.
- * @param port The port of this address
- * @param name The name of this address
- */
-InetAddress::InetAddress(int port, const char* name) {
- TRACE_CLASS("InetAddress", "InetAddress(int, char)");
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
- if (name)
- setAddress(name);
-}
-
-
-/**
- * Constructs a copy of inet
- * @param inet The address to be copied
- */
-InetAddress::InetAddress(const InetAddress &inet) {
- TRACE_CLASS("InetAddress", "InetAddress(InetAddress)");
- memcpy(&addr, &inet.addr, sizeof(addr));
-}
-
-
-/// Destructor
-InetAddress::~InetAddress() {
- TRACE_CLASS("InetAddress" ,"~InetAddress()");
-}
-
-
-/// Returns the raw IP address of this InetAddress object.
-sockaddr *InetAddress::getAddress() {
- TRACE_CLASS("InetAddress", "getAddress()");
- return (sockaddr *)&addr;
-}
-
-
-/// Return the port of this address.
-int InetAddress::getPort()
-{
- TRACE_CLASS("InetAddress", "getPort()");
- return ntohs(addr.sin_port);
-}
-
-
-/**
- * Returns the IP address string "%d.%d.%d.%d".
- * @return IP address
- */
-const char *InetAddress::getHostAddress() {
- TRACE_CLASS("InetAddress", "getHostAddress()");
- return inet_ntoa(addr.sin_addr);
-}
-
-
-/// Returns true if this address is multicast
-bool InetAddress::isMulticastAddress() {
- TRACE_CLASS("InetAddress", "isMulticastAddress()");
- return IN_MULTICAST(ntohl(addr.sin_addr.s_addr)); // a modifier
-}
-
-
-/**
- * Set the port number
- * @param port The new port number
- */
-void InetAddress::setPort(int port)
-{
- TRACE_CLASS("InetAddress", "setPort(int)");
- addr.sin_port = htons(port);
-}
-
-
-/**
- * Set the address
- * @param name The new address name
- * @return 0 if ok
- * -1 if error
- */
-int InetAddress::setAddress(const std::string& name)
-{
- TRACE_CLASS("InetAddress", "setAddress(string)");
- if (!name.empty()) {
- if (atoi(name.c_str())) { // If it start with a number
- if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Invalid address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- else { // Assume it's a real name
- hostent *host = gethostbyname(name.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
- } else {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Could not find address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- }
- else {
- addr.sin_addr.s_addr = INADDR_ANY;
- }
- return 0;
-}
-
-
-void setInetError(const char* description)
-{
- inetErrNo = 0;
- inetErrNo = errno;
- inetErrMsg = strerror(inetErrNo);
- inetErrDesc = description;
-}
-
diff --git a/lib/InetAddress.h b/lib/InetAddress.h
deleted file mode 100644
index 0ccc70b..0000000
--- a/lib/InetAddress.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _InetAddress
-#define _InetAddress
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <stdlib.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#include <string>
-
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define INVALID_PORT -1
-
-
-/// The last error number
-extern int inetErrNo;
-/// The last error message
-extern const char *inetErrMsg;
-/// The description of the last error
-extern const char *inetErrDesc;
-/// Set the number, message and description of the last error
-void setInetError(const char* description);
-
-
-/**
- * This class represents an Internet Protocol (IP) address.
- * @author Pascal Charest pascal.charest@crc.ca
- */
-class InetAddress {
- public:
- InetAddress(int port = 0, const char* name = NULL);
- InetAddress(const InetAddress &addr);
- ~InetAddress();
-
- sockaddr *getAddress();
- const char *getHostAddress();
- int getPort();
- int setAddress(const std::string& name);
- void setPort(int port);
- bool isMulticastAddress();
-
- private:
- sockaddr_in addr;
-};
-
-
-#endif
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
new file mode 100644
index 0000000..cd70a8e
--- /dev/null
+++ b/lib/Socket.cpp
@@ -0,0 +1,898 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#include "Socket.h"
+
+#include <iostream>
+#include <cstdio>
+#include <cstring>
+#include <cerrno>
+#include <fcntl.h>
+#include <poll.h>
+
+namespace Socket {
+
+using namespace std;
+
+void InetAddress::resolveUdpDestination(const std::string& destination, int port)
+{
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(destination.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ // Take the first result
+ memcpy(&addr, rp->ai_addr, rp->ai_addrlen);
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not resolve");
+ }
+}
+
+UDPPacket::UDPPacket() { }
+
+UDPPacket::UDPPacket(size_t initSize) :
+ buffer(initSize)
+{ }
+
+
+UDPSocket::UDPSocket() :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(0, "");
+}
+
+UDPSocket::UDPSocket(int port) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, "");
+}
+
+UDPSocket::UDPSocket(int port, const std::string& name) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, name);
+}
+
+
+void UDPSocket::setBlocking(bool block)
+{
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
+ if (res == -1) {
+ throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
+void UDPSocket::reinit(int port, const std::string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0; /* Any protocol */
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(),
+ port == 0 ? nullptr : service,
+ &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void UDPSocket::close()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ m_sock = INVALID_SOCKET;
+}
+
+UDPSocket::~UDPSocket()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+}
+
+
+UDPPacket UDPSocket::receive(size_t max_size)
+{
+ UDPPacket packet(max_size);
+ socklen_t addrSize;
+ addrSize = sizeof(*packet.address.as_sockaddr());
+ ssize_t ret = recvfrom(m_sock,
+ packet.buffer.data(),
+ packet.buffer.size(),
+ 0,
+ packet.address.as_sockaddr(),
+ &addrSize);
+
+ if (ret == SOCKET_ERROR) {
+ packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
+ if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
+ return 0;
+ }
+ throw runtime_error(string("Can't receive data: ") + strerror(errno));
+ }
+
+ packet.buffer.resize(ret);
+ return packet;
+}
+
+void UDPSocket::send(UDPPacket& packet)
+{
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
+ packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+
+void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
+{
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
+ destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+{
+ ip_mreqn group;
+ if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
+ throw runtime_error("Cannot convert multicast group name");
+ }
+ if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
+ throw runtime_error("Group name is not a multicast address");
+ }
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
+ group.imr_ifindex = 0;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastSource(const char* source_addr)
+{
+ struct in_addr addr;
+ if (inet_aton(source_addr, &addr) == 0) {
+ throw runtime_error(string("Can't parse source address") + strerror(errno));
+ }
+
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set source address") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastTTL(int ttl)
+{
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
+ }
+}
+
+UDPReceiver::UDPReceiver() { }
+
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ }
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
+}
+
+TCPSocket::~TCPSocket()
+{
+ if (m_sock != -1) {
+ ::close(m_sock);
+ }
+}
+
+TCPSocket::TCPSocket(TCPSocket&& other) :
+ m_sock(other.m_sock),
+ m_remote_address(move(other.m_remote_address))
+{
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+}
+
+TCPSocket& TCPSocket::operator=(TCPSocket&& other)
+{
+ swap(m_remote_address, other.m_remote_address);
+
+ m_sock = other.m_sock;
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+
+ return *this;
+}
+
+bool TCPSocket::valid() const
+{
+ return m_sock != -1;
+}
+
+void TCPSocket::connect(const std::string& hostname, int port)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only connect an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ /* Obtain address(es) matching host/port */
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(hostname.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully connect(2).
+ If socket(2) (or connect(2)) fails, we (close the socket
+ and) try the next address. */
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype,
+ rp->ai_protocol);
+ if (sfd == -1)
+ continue;
+
+ int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
+ if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) {
+ // As the TCPClient could set the socket to nonblocking, we
+ // must handle EINPROGRESS here
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
+ == SOCKET_ERROR) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+ }
+
+ freeaddrinfo(result); /* No longer needed */
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not connect");
+ }
+
+}
+
+void TCPSocket::listen(int port, const string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only listen with an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0;
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
+ int ret = ::listen(m_sock, 0);
+ if (ret == -1) {
+ throw std::runtime_error(string("Could not listen: ") + strerror(errno));
+ }
+ }
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void TCPSocket::close()
+{
+ ::close(m_sock);
+ m_sock = -1;
+}
+
+TCPSocket TCPSocket::accept(int timeout_ms)
+{
+ if (timeout_ms == 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP Socket accept error: " + errstr);
+ }
+ else if (retval > 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ TCPSocket s(-1);
+ return s;
+ }
+ }
+}
+
+ssize_t TCPSocket::sendall(const void *buffer, size_t buflen)
+{
+ uint8_t *buf = (uint8_t*)buffer;
+ while (buflen > 0) {
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process
+ * would not receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the
+ * same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ ssize_t sent = ::send(m_sock, buf, buflen, flags);
+ if (sent < 0) {
+ return -1;
+ }
+ else {
+ buf += sent;
+ buflen -= sent;
+ }
+ }
+ return buflen;
+}
+
+ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms)
+{
+ if (timeout_ms) {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLOUT;
+
+ const int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno));
+ }
+ else if (retval == 0) {
+ // Timed out
+ return 0;
+ }
+ }
+
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
+ * receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
+
+ if (ret == SOCKET_ERROR) {
+ throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno));
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
+{
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive error: " + errstr);
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1 and errno == EINTR) {
+ throw Interrupted();
+ }
+ else if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive with poll() error: " + errstr);
+ }
+ else if (retval > 0 and (fds[0].revents | POLLIN)) {
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ if (errno == ECONNREFUSED) {
+ return 0;
+ }
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive after poll() error: " + errstr);
+ }
+ return ret;
+ }
+ else {
+ throw Timeout();
+ }
+}
+
+TCPSocket::TCPSocket(int sockfd) :
+ m_sock(sockfd),
+ m_remote_address()
+{ }
+
+TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) :
+ m_sock(sockfd),
+ m_remote_address(remote_address)
+{ }
+
+void TCPClient::connect(const std::string& hostname, int port)
+{
+ m_hostname = hostname;
+ m_port = port;
+ reconnect();
+}
+
+ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ try {
+ ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms);
+
+ if (ret == 0) {
+ m_sock.close();
+
+ TCPSocket newsock;
+ m_sock = std::move(newsock);
+ reconnect();
+ }
+
+ return ret;
+ }
+ catch (const TCPSocket::Interrupted&) {
+ return -1;
+ }
+ catch (const TCPSocket::Timeout&) {
+ return 0;
+ }
+
+ return 0;
+}
+
+void TCPClient::reconnect()
+{
+ int flags = fcntl(m_sock.m_sock, F_GETFL);
+ if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+
+ m_sock.connect(m_hostname, m_port);
+}
+
+TCPConnection::TCPConnection(TCPSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock))
+{
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+ m_sender_thread = std::thread(&TCPConnection::process, this);
+}
+
+TCPConnection::~TCPConnection()
+{
+ m_running = false;
+ vector<uint8_t> termination_marker;
+ queue.push(termination_marker);
+ m_sender_thread.join();
+}
+
+void TCPConnection::process()
+{
+ while (m_running) {
+ vector<uint8_t> data;
+ queue.wait_and_pop(data);
+
+ if (data.empty()) {
+ // empty vector is the termination marker
+ m_running = false;
+ break;
+ }
+
+ try {
+ ssize_t remaining = data.size();
+ const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
+ const int timeout_ms = 10; // Less than one ETI frame
+
+ while (m_running and remaining > 0) {
+ const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
+ if (sent < 0 or sent > remaining) {
+ throw std::logic_error("Invalid TCPSocket::send() return value");
+ }
+ remaining -= sent;
+ buf += sent;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "Dropping TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+}
+
+
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
+ m_max_queue_size(max_queue_size)
+{
+}
+
+TCPDataDispatcher::~TCPDataDispatcher()
+{
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+void TCPDataDispatcher::start(int port, const string& address)
+{
+ m_listener_socket.listen(port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
+}
+
+void TCPDataDispatcher::write(const vector<uint8_t>& data)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if(
+ [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+}
+
+void TCPDataDispatcher::process()
+{
+ try {
+ const int timeout_ms = 1000;
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ auto sock = m_listener_socket.accept(timeout_ms);
+ if (sock.valid()) {
+ m_connections.emplace(m_connections.begin(), move(sock));
+ }
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_exception_data = string("TCPDataDispatcher error: ") + e.what();
+ m_running = false;
+ }
+}
+
+TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
+ m_blocksize(blocksize)
+{
+}
+
+void TCPReceiveServer::start(int listen_port, const std::string& address)
+{
+ m_listener_socket.listen(listen_port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPReceiveServer::process, this);
+}
+
+TCPReceiveServer::~TCPReceiveServer()
+{
+ m_running = false;
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+vector<uint8_t> TCPReceiveServer::receive()
+{
+ vector<uint8_t> buffer;
+ m_queue.try_pop(buffer);
+
+ // we can ignore try_pop()'s return value, because
+ // if it is unsuccessful the buffer is not touched.
+ return buffer;
+}
+
+void TCPReceiveServer::process()
+{
+ constexpr int timeout_ms = 1000;
+ constexpr int disconnect_timeout_ms = 10000;
+ constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms;
+
+ while (m_running) {
+ auto sock = m_listener_socket.accept(timeout_ms);
+
+ int num_timeouts = 0;
+
+ while (m_running and sock.valid()) {
+ try {
+ vector<uint8_t> buf(m_blocksize);
+ ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms);
+ if (r < 0) {
+ throw logic_error("Invalid recv return value");
+ }
+ else if (r == 0) {
+ sock.close();
+ break;
+ }
+ else {
+ buf.resize(r);
+ m_queue.push(move(buf));
+ }
+ }
+ catch (const TCPSocket::Interrupted&) {
+ break;
+ }
+ catch (const TCPSocket::Timeout&) {
+ num_timeouts++;
+ }
+
+ if (num_timeouts > max_num_timeouts) {
+ sock.close();
+ }
+ }
+ }
+}
+
+}
diff --git a/lib/Socket.h b/lib/Socket.h
new file mode 100644
index 0000000..8bb7fe1
--- /dev/null
+++ b/lib/Socket.h
@@ -0,0 +1,294 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "ThreadsafeQueue.h"
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include <atomic>
+#include <thread>
+#include <list>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#define SOCKET int
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
+
+
+namespace Socket {
+
+struct InetAddress {
+ struct sockaddr_storage addr;
+
+ struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
+
+ void resolveUdpDestination(const std::string& destination, int port);
+};
+
+/** This class represents a UDP packet.
+ *
+ * A UDP packet contains a payload (sequence of bytes) and an address. For
+ * outgoing packets, the address is the destination address. For incoming
+ * packets, the address tells the user from what source the packet arrived from.
+ */
+class UDPPacket
+{
+ public:
+ UDPPacket();
+ UDPPacket(size_t initSize);
+
+ std::vector<uint8_t> buffer;
+ InetAddress address;
+};
+
+/**
+ * This class represents a socket for sending and receiving UDP packets.
+ *
+ * A UDP socket is the sending or receiving point for a packet delivery service.
+ * Each packet sent or received on a datagram socket is individually
+ * addressed and routed. Multiple packets sent from one machine to another may
+ * be routed differently, and may arrive in any order.
+ */
+class UDPSocket
+{
+ public:
+ /** Create a new socket that will not be bound to any port. To be used
+ * for data output.
+ */
+ UDPSocket();
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ */
+ UDPSocket(int port);
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ * @param name The IP address on which the socket will be bound.
+ * It is used to bind the socket on a specific interface if
+ * the computer have many NICs.
+ */
+ UDPSocket(int port, const std::string& name);
+ ~UDPSocket();
+ UDPSocket(const UDPSocket& other) = delete;
+ const UDPSocket& operator=(const UDPSocket& other) = delete;
+
+ /** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
+ void reinit(int port, const std::string& name);
+
+ void close(void);
+ void send(UDPPacket& packet);
+ void send(const std::vector<uint8_t>& data, InetAddress destination);
+ UDPPacket receive(size_t max_size);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
+ void setMulticastSource(const char* source_addr);
+ void setMulticastTTL(int ttl);
+
+ /** Set blocking mode. By default, the socket is blocking.
+ * throws a runtime_error on error.
+ */
+ void setBlocking(bool block);
+
+ protected:
+ SOCKET m_sock;
+};
+
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver();
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port = 0;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued = 1;
+ std::thread m_thread;
+ std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
+
+class TCPSocket {
+ public:
+ TCPSocket();
+ ~TCPSocket();
+ TCPSocket(const TCPSocket& other) = delete;
+ TCPSocket& operator=(const TCPSocket& other) = delete;
+ TCPSocket(TCPSocket&& other);
+ TCPSocket& operator=(TCPSocket&& other);
+
+ bool valid(void) const;
+ void connect(const std::string& hostname, int port);
+ void listen(int port, const std::string& name);
+ void close(void);
+
+ /* throws a runtime_error on failure, an invalid socket on timeout */
+ TCPSocket accept(int timeout_ms);
+
+ /* returns -1 on error, doesn't work on nonblocking sockets */
+ ssize_t sendall(const void *buffer, size_t buflen);
+
+ /** Send data over the TCP connection.
+ * @param data The buffer that will be sent.
+ * @param size Number of bytes to send.
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
+ */
+ ssize_t send(const void* data, size_t size, int timeout_ms=0);
+
+ /* Returns number of bytes read, 0 on disconnect. Throws a
+ * runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags);
+
+ class Timeout {};
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect or refused connection.
+ * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error
+ */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ explicit TCPSocket(int sockfd);
+ explicit TCPSocket(int sockfd, InetAddress remote_address);
+ SOCKET m_sock = -1;
+
+ InetAddress m_remote_address;
+
+ friend class TCPClient;
+};
+
+/* Implements a TCP receiver that auto-reconnects on errors */
+class TCPClient {
+ public:
+ void connect(const std::string& hostname, int port);
+
+ /* Returns numer of bytes read, 0 on auto-reconnect, -1
+ * on interruption.
+ * Throws a runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ void reconnect(void);
+ TCPSocket m_sock;
+ std::string m_hostname;
+ int m_port;
+};
+
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TCPSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TCPSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process();
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
+ */
+class TCPReceiveServer {
+ public:
+ TCPReceiveServer(size_t blocksize);
+ ~TCPReceiveServer();
+ TCPReceiveServer(const TCPReceiveServer&) = delete;
+ TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
+
+ void start(int listen_port, const std::string& address);
+
+ // Return a vector that contains up to blocksize bytes of data, or
+ // and empty vector if no data is available.
+ std::vector<uint8_t> receive();
+
+ private:
+ void process();
+
+ size_t m_blocksize = 0;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+};
+
+}
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
new file mode 100644
index 0000000..62f4c96
--- /dev/null
+++ b/lib/ThreadsafeQueue.h
@@ -0,0 +1,176 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue, depends on C++11
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <utility>
+
+/* This queue is meant to be used by two threads. One producer
+ * that pushes elements into the queue, and one consumer that
+ * retrieves the elements.
+ *
+ * The queue can make the consumer block until an element
+ * is available, or a wakeup requested.
+ */
+
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
+template<typename T>
+class ThreadsafeQueue
+{
+public:
+ /* Push one element into the queue, and notify another thread that
+ * might be waiting.
+ *
+ * returns the new queue size.
+ */
+ size_t push(T const& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ size_t push(T&& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.emplace(std::move(val));
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
+ {
+ the_rx_notification.notify_one();
+ }
+
+ bool empty() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ size_t size() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.size();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
+ the_rx_notification.wait(lock);
+ }
+
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
+ }
+
+private:
+ std::queue<T> the_queue;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
+};
+
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
deleted file mode 100644
index 345b971..0000000
--- a/lib/UdpSocket.cpp
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "UdpSocket.h"
-#include "Utils.h"
-
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-
-using namespace std;
-
-UdpSocket::UdpSocket() :
- listenSocket(INVALID_SOCKET)
-{
- reinit(0, "");
-}
-
-UdpSocket::UdpSocket(int port) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, "");
-}
-
-UdpSocket::UdpSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, name);
-}
-
-
-int UdpSocket::setBlocking(bool block)
-{
- int res;
- if (block)
- res = fcntl(listenSocket, F_SETFL, 0);
- else
- res = fcntl(listenSocket, F_SETFL, O_NONBLOCK);
- if (res == SOCKET_ERROR) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-}
-
-int UdpSocket::reinit(int port, const std::string& name)
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
- setInetError("Can't create socket");
- return -1;
- }
- reuseopt_t reuse = 1;
- if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- setInetError("Can't reuse address");
- return -1;
- }
-
- if (port) {
- address.setAddress(name);
- address.setPort(port);
-
- if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- setInetError("Can't bind socket");
- ::close(listenSocket);
- listenSocket = INVALID_SOCKET;
- return -1;
- }
- }
- return 0;
-}
-
-int UdpSocket::close()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- listenSocket = INVALID_SOCKET;
-
- return 0;
-}
-
-UdpSocket::~UdpSocket()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-}
-
-static inline bool wait_for_recv_ready(int sock_fd, const size_t timeout_ms)
-{
- //setup timeval for timeout
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = timeout_ms*1000;
-
- //setup rset for timeout
- fd_set rset;
- FD_ZERO(&rset);
- FD_SET(sock_fd, &rset);
-
- return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0;
-}
-
-int UdpSocket::receive(UdpPacket& packet)
-{
- bool ready = wait_for_recv_ready(listenSocket, 2000);
-
- if (ready) {
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- ssize_t ret = recvfrom(listenSocket,
- packet.getData(),
- packet.getSize(),
- 0,
- packet.getAddress().getAddress(),
- &addrSize);
-
- if (ret == SOCKET_ERROR) {
- packet.setSize(0);
- if (errno == EAGAIN) {
- return 0;
- }
- setInetError("Can't receive UDP packet");
- return -1;
- }
- packet.setSize(ret);
- return 0;
- }
- else {
- packet.setSize(0);
- return 0;
- }
-}
-
-int UdpSocket::send(UdpPacket& packet)
-{
- int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0,
- packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
-{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
- destination.getAddress(), sizeof(*destination.getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Must be called to receive data on a multicast address.
- * @param groupname The multicast address to join.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::joinGroup(const char* groupname, const char* if_addr)
-{
- ip_mreqn group;
- if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
- setInetError(groupname);
- return -1;
- }
- if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- setInetError("Not a multicast address");
- return -1;
- }
- group.imr_address.s_addr = inet_addr(if_addr);
- group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- }
- return 0;
-}
-
-int UdpSocket::setMulticastTTL(int ttl)
-{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
- == SOCKET_ERROR) {
- setInetError("Can't set ttl");
- return -1;
- }
-
- return 0;
-}
-
-int UdpSocket::setMulticastSource(const char* source_addr)
-{
- struct in_addr addr;
- if (inet_aton(source_addr, &addr) == 0) {
- setInetError("Can't parse source address");
- return -1;
- }
-
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
- == SOCKET_ERROR) {
- setInetError("Can't set source address");
- return -1;
- }
-
- return 0;
-}
-
-UdpPacket::UdpPacket() { }
-
-UdpPacket::UdpPacket(size_t initSize) :
- m_buffer(initSize)
-{ }
-
-
-void UdpPacket::setSize(size_t newSize)
-{
- m_buffer.resize(newSize);
-}
-
-
-uint8_t* UdpPacket::getData()
-{
- return &m_buffer[0];
-}
-
-
-void UdpPacket::addData(const void *data, size_t size)
-{
- uint8_t *d = (uint8_t*)data;
- std::copy(d, d + size, std::back_inserter(m_buffer));
-}
-
-size_t UdpPacket::getSize()
-{
- return m_buffer.size();
-}
-
-InetAddress UdpPacket::getAddress()
-{
- return address;
-}
-
-UdpReceiver::~UdpReceiver() {
- m_stop = true;
- m_sock.close();
- if (m_thread.joinable()) {
- m_thread.join();
- }
-}
-
-void UdpReceiver::start(int port, std::string& bindto, std::string& mcastaddr, size_t max_packets_queued) {
- m_port = port;
- m_bindto = bindto;
- m_mcastaddr = mcastaddr;
- m_max_packets_queued = max_packets_queued;
- m_thread = std::thread(&UdpReceiver::m_run, this);
-}
-
-std::vector<uint8_t> UdpReceiver::get_packet_buffer()
-{
- if (m_stop) {
- throw runtime_error("UDP Receiver not running");
- }
-
- UdpPacket p;
- m_packets.wait_and_pop(p);
-
- return p.getBuffer();
-}
-
-void UdpReceiver::m_run()
-{
- // Ensure that stop is set to true in case of exception or return
- struct SetStopOnDestruct {
- SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
- ~SetStopOnDestruct() { m_stop = true; }
- private: atomic<bool>& m_stop;
- } autoSetStop(m_stop);
-
- set_thread_name("udp_rx");
-
- if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
- m_sock.reinit(m_port, m_mcastaddr);
- m_sock.setMulticastSource(m_bindto.c_str());
- m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
- }
- else {
- m_sock.reinit(m_port, m_bindto);
- }
-
- const size_t packsize = 8192;
- UdpPacket packet(packsize);
-
- while (not m_stop) {
- int ret = m_sock.receive(packet);
- if (ret == 0) {
- if (packet.getSize() == packsize) {
- // TODO replace fprintf
- fprintf(stderr, "Warning, possible UDP truncation\n");
- }
-
- // If this blocks, the UDP socket will lose incoming packets
- m_packets.push_wait_if_full(packet, m_max_packets_queued);
- }
- else {
- if (inetErrNo != EINTR) {
- // TODO replace fprintf
- fprintf(stderr, "Socket error: %s\n", inetErrMsg);
- }
- m_stop = true;
- }
- }
-}
-
diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h
deleted file mode 100644
index b299c82..0000000
--- a/lib/UdpSocket.h
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include "ThreadsafeQueue.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <stdlib.h>
-#include <iostream>
-#include <vector>
-#include <thread>
-#include <atomic>
-
-class UdpPacket;
-
-
-/**
- * This class represents a socket for sending and receiving UDP packets.
- *
- * A UDP socket is the sending or receiving point for a packet delivery service.
- * Each packet sent or received on a datagram socket is individually
- * addressed and routed. Multiple packets sent from one machine to another may
- * be routed differently, and may arrive in any order.
- */
-class UdpSocket
-{
- public:
- /** Create a new socket that will not be bound to any port. To be used
- * for data output.
- */
- UdpSocket();
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- */
- UdpSocket(int port);
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- UdpSocket(int port, const std::string& name);
- ~UdpSocket();
- UdpSocket(const UdpSocket& other) = delete;
- const UdpSocket& operator=(const UdpSocket& other) = delete;
-
- /** reinitialise socket. Close the already open socket, and
- * create a new one
- */
- int reinit(int port, const std::string& name);
-
- /** Close the socket
- */
- int close(void);
-
- /** Send an UDP packet.
- * @param packet The UDP packet to be sent. It includes the data and the
- * destination address
- * return 0 if ok, -1 if error
- */
- int send(UdpPacket& packet);
-
- /** Send an UDP packet
- *
- * return 0 if ok, -1 if error
- */
- int send(const std::vector<uint8_t>& data, InetAddress destination);
-
- /** Receive an UDP packet.
- * @param packet The packet that will receive the data. The address will be set
- * to the source address.
- * @return 0 if ok or timeout, -1 if error
- */
- int receive(UdpPacket& packet);
-
- int joinGroup(const char* groupname, const char *if_addr);
- int setMulticastSource(const char* source_addr);
- int setMulticastTTL(int ttl);
-
- /** Set blocking mode. By default, the socket is blocking.
- * @return 0 if ok
- * -1 if error
- */
- int setBlocking(bool block);
-
- protected:
-
- /// The address on which the socket is bound.
- InetAddress address;
- /// The low-level socket used by system functions.
- SOCKET listenSocket;
-};
-
-/** This class represents a UDP packet.
- *
- * A UDP packet contains a payload (sequence of bytes) and an address. For
- * outgoing packets, the address is the destination address. For incoming
- * packets, the address tells the user from what source the packet arrived from.
- */
-class UdpPacket
-{
- public:
- /** Construct an empty UDP packet.
- */
- UdpPacket();
- UdpPacket(size_t initSize);
-
- /** Give the pointer to data.
- * @return The pointer
- */
- uint8_t* getData(void);
-
- /** Append some data at the end of data buffer and adjust size.
- * @param data Pointer to the data to add
- * @param size Size in bytes of new data
- */
- void addData(const void *data, size_t size);
-
- size_t getSize(void);
-
- /** Changes size of the data buffer size. Keeps data intact unless
- * truncated.
- */
- void setSize(size_t newSize);
-
- /** Returns the UDP address of the packet.
- */
- InetAddress getAddress(void);
-
- const std::vector<uint8_t>& getBuffer(void) const {
- return m_buffer;
- }
-
-
- private:
- std::vector<uint8_t> m_buffer;
- InetAddress address;
-};
-
-/* Threaded UDP receiver */
-class UdpReceiver {
- public:
- UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
- ~UdpReceiver();
- UdpReceiver(const UdpReceiver&) = delete;
- UdpReceiver operator=(const UdpReceiver&) = delete;
-
- // Start the receiver in a separate thread
- void start(int port, std::string& bindto, std::string& mcastaddr, size_t max_packets_queued);
-
- // Get the data contained in a UDP packet, blocks if none available
- // In case of error, throws a runtime_error
- std::vector<uint8_t> get_packet_buffer(void);
-
- private:
- void m_run(void);
-
- int m_port;
- std::string m_bindto;
- std::string m_mcastaddr;
- size_t m_max_packets_queued;
- std::thread m_thread;
- std::atomic<bool> m_stop;
- ThreadsafeQueue<UdpPacket> m_packets;
- UdpSocket m_sock;
-};
-
-