From 43f4a3a2a695c303bd4fdfbd7fec6def29284f2e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 28 May 2019 16:56:43 +0200 Subject: Unify Socket abstractions --- lib/Socket.cpp | 730 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/Socket.h | 240 +++++++++++++++++++ 2 files changed, 970 insertions(+) create mode 100644 lib/Socket.cpp create mode 100644 lib/Socket.h (limited to 'lib') diff --git a/lib/Socket.cpp b/lib/Socket.cpp new file mode 100644 index 0000000..fe3df44 --- /dev/null +++ b/lib/Socket.cpp @@ -0,0 +1,730 @@ +/* + Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include "Socket.h" + +#include +#include +#include +#include +#include +#include + +namespace Socket { + +using namespace std; + +void InetAddress::resolveUdpDestination(const std::string& destination, int port) +{ + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ + hints.ai_flags = 0; + hints.ai_protocol = 0; + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + struct addrinfo *result, *rp; + int s = getaddrinfo(destination.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + for (rp = result; rp != nullptr; rp = rp->ai_next) { + // Take the first result + memcpy(&addr, rp->ai_addr, rp->ai_addrlen); + break; + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw runtime_error("Could not resolve"); + } +} + +UDPPacket::UDPPacket() { } + +UDPPacket::UDPPacket(size_t initSize) : + buffer(initSize) +{ } + + +UDPSocket::UDPSocket() : + listenSocket(INVALID_SOCKET) +{ + reinit(0, ""); +} + +UDPSocket::UDPSocket(int port) : + listenSocket(INVALID_SOCKET) +{ + reinit(port, ""); +} + +UDPSocket::UDPSocket(int port, const std::string& name) : + listenSocket(INVALID_SOCKET) +{ + reinit(port, name); +} + + +void UDPSocket::setBlocking(bool block) +{ + int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK); + if (res == -1) { + throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); + } +} + +void UDPSocket::reinit(int port, const std::string& name) +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ + hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ + hints.ai_protocol = 0; /* Any protocol */ + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + struct addrinfo *result, *rp; + int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), + port == 0 ? nullptr : service, + &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(2). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { + listenSocket = sfd; + break; + } + + ::close(sfd); + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw runtime_error("Could not bind"); + } +} + +void UDPSocket::close() +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } + + listenSocket = INVALID_SOCKET; +} + +UDPSocket::~UDPSocket() +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } +} + + +UDPPacket UDPSocket::receive(size_t max_size) +{ + UDPPacket packet(max_size); + socklen_t addrSize; + addrSize = sizeof(*packet.address.as_sockaddr()); + ssize_t ret = recvfrom(listenSocket, + packet.buffer.data(), + packet.buffer.size(), + 0, + packet.address.as_sockaddr(), + &addrSize); + + if (ret == SOCKET_ERROR) { + packet.buffer.resize(0); + if (errno == EAGAIN) { + return 0; + } + throw runtime_error(string("Can't receive data: ") + strerror(errno)); + } + + packet.buffer.resize(ret); + return packet; +} + +void UDPSocket::send(UDPPacket& packet) +{ + int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0, + packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + } +} + + +void UDPSocket::send(const std::vector& data, InetAddress destination) +{ + int ret = sendto(listenSocket, &data[0], data.size(), 0, + destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + } +} + +void UDPSocket::joinGroup(char* groupname) +{ + ip_mreqn group; + if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { + throw runtime_error("Cannot convert multicast group name"); + } + if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { + throw runtime_error("Group name is not a multicast address"); + } + group.imr_address.s_addr = htons(INADDR_ANY);; + group.imr_ifindex = 0; + if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + == SOCKET_ERROR) { + throw runtime_error(string("Can't join multicast group") + strerror(errno)); + } +} + +void UDPSocket::setMulticastSource(const char* source_addr) +{ + struct in_addr addr; + if (inet_aton(source_addr, &addr) == 0) { + throw runtime_error(string("Can't parse source address") + strerror(errno)); + } + + if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) + == SOCKET_ERROR) { + throw runtime_error(string("Can't set source address") + strerror(errno)); + } +} + +void UDPSocket::setMulticastTTL(int ttl) +{ + if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) + == SOCKET_ERROR) { + throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); + } +} + + +TCPSocket::TCPSocket() +{ + if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + throw std::runtime_error("Can't create TCP socket"); + } + +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif +} + +TCPSocket::~TCPSocket() +{ + if (m_sock != -1) { + ::close(m_sock); + } +} + +TCPSocket::TCPSocket(TCPSocket&& other) : + m_sock(other.m_sock), + m_remote_address(other.m_remote_address) +{ + if (other.m_sock != -1) { + other.m_sock = -1; + } +} + +TCPSocket& TCPSocket::operator=(TCPSocket&& other) +{ + m_sock = other.m_sock; + m_remote_address = other.m_remote_address; + + if (other.m_sock != -1) { + other.m_sock = -1; + } + + return *this; +} + +bool TCPSocket::valid() const +{ + return m_sock != -1; +} + +void TCPSocket::connect(const std::string& hostname, int port) +{ + if (m_sock != INVALID_SOCKET) { + throw std::logic_error("You may only connect an invalid TCPSocket"); + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + /* Obtain address(es) matching host/port */ + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = 0; + hints.ai_protocol = 0; + + struct addrinfo *result, *rp; + int s = getaddrinfo(hostname.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully connect(2). + If socket(2) (or connect(2)) fails, we (close the socket + and) try the next address. */ + + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, + rp->ai_protocol); + if (sfd == -1) + continue; + + int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen); + if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) { + // As the TCPClient could set the socket to nonblocking, we + // must handle EINPROGRESS here + m_sock = sfd; + break; + } + + ::close(sfd); + } + + if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) + == SOCKET_ERROR) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + } + + freeaddrinfo(result); /* No longer needed */ + + if (rp == nullptr) { + throw runtime_error("Could not connect"); + } + +} + +void TCPSocket::listen(int port, const string& name) +{ + if (m_sock != INVALID_SOCKET) { + throw std::logic_error("You may only listen with an invalid TCPSocket"); + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ + hints.ai_protocol = 0; + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + struct addrinfo *result, *rp; + int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(2). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { + m_sock = sfd; + break; + } + + ::close(sfd); + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw runtime_error("Could not bind"); + } +} + +void TCPSocket::close() +{ + ::close(m_sock); + m_sock = -1; +} + +TCPSocket TCPSocket::accept(int timeout_ms) +{ + if (timeout_ms == 0) { + InetAddress remote_addr; + socklen_t client_len = sizeof(remote_addr.addr); + int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); + TCPSocket s(sockfd, remote_addr); + return s; + } + else { + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP Socket accept error: " + errstr); + } + else if (retval > 0) { + InetAddress remote_addr; + socklen_t client_len = sizeof(remote_addr.addr); + int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); + TCPSocket s(sockfd, remote_addr); + return s; + } + else { + TCPSocket s(-1); + return s; + } + } +} + +ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) +{ + uint8_t *buf = (uint8_t*)buffer; + while (buflen > 0) { + /* On Linux, the MSG_NOSIGNAL flag ensures that the process + * would not receive a SIGPIPE and die. + * Other systems have SO_NOSIGPIPE set on the socket for the + * same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) + const int flags = MSG_NOSIGNAL; +#else + const int flags = 0; +#endif + ssize_t sent = ::send(m_sock, buf, buflen, flags); + if (sent < 0) { + return -1; + } + else { + buf += sent; + buflen -= sent; + } + } + return buflen; +} + +ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms) +{ + if (timeout_ms) { + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLOUT; + + const int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno)); + } + else if (retval == 0) { + // Timed out + return 0; + } + } + + /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not + * receive a SIGPIPE and die. + * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) + const int flags = MSG_NOSIGNAL; +#else + const int flags = 0; +#endif + const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); + + if (ret == SOCKET_ERROR) { + throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno)); + } + return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) +{ + ssize_t ret = ::recv(m_sock, buffer, length, flags); + if (ret == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive error: " + errstr); + } + return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1 and errno == EINTR) { + throw Interrupted(); + } + else if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive with poll() error: " + errstr); + } + else if (retval > 0 and (fds[0].revents | POLLIN)) { + ssize_t ret = ::recv(m_sock, buffer, length, flags); + if (ret == -1) { + if (errno == ECONNREFUSED) { + return 0; + } + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive after poll() error: " + errstr); + } + return ret; + } + else { + throw Timeout(); + } +} + +TCPSocket::TCPSocket(int sockfd) : + m_sock(sockfd), + m_remote_address() +{ } + +TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) : + m_sock(sockfd), + m_remote_address(remote_address) +{ } + +void TCPClient::connect(const std::string& hostname, int port) +{ + m_hostname = hostname; + m_port = port; + reconnect(); +} + +ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ + try { + ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); + + if (ret == 0) { + m_sock.close(); + + TCPSocket newsock; + m_sock = std::move(newsock); + reconnect(); + } + + return ret; + } + catch (const TCPSocket::Interrupted&) { + return -1; + } + catch (const TCPSocket::Timeout&) { + return 0; + } + + return 0; +} + +void TCPClient::reconnect() +{ + int flags = fcntl(m_sock.m_sock, F_GETFL); + if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); + } + + m_sock.connect(m_hostname, m_port); +} + +TCPConnection::TCPConnection(TCPSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) +{ +#if MISSING_OWN_ADDR + auto own_addr = m_sock.getOwnAddress(); + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection on port " << + own_addr.getPort() << " from " << + addr.getHostAddress() << ":" << addr.getPort(); +#endif + m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ + m_running = false; + vector termination_marker; + queue.push(termination_marker); + m_sender_thread.join(); +} + +void TCPConnection::process() +{ + while (m_running) { + vector data; + queue.wait_and_pop(data); + + if (data.empty()) { + // empty vector is the termination marker + m_running = false; + break; + } + + try { + ssize_t remaining = data.size(); + const uint8_t *buf = reinterpret_cast(data.data()); + const int timeout_ms = 10; // Less than one ETI frame + + while (m_running and remaining > 0) { + const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); + if (sent < 0 or sent > remaining) { + throw std::logic_error("Invalid TCPSocket::send() return value"); + } + remaining -= sent; + buf += sent; + } + } + catch (const std::runtime_error& e) { + m_running = false; + } + } + +#if MISSING_OWN_ADDR + auto own_addr = m_sock.getOwnAddress(); + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "Dropping TCP Connection on port " << + own_addr.getPort() << " from " << + addr.getHostAddress() << ":" << addr.getPort(); +#endif +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : + m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + m_listener_thread.join(); +} + +void TCPDataDispatcher::start(int port, const string& address) +{ + m_listener_socket.listen(port, address); + + m_running = true; + m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vector& data) +{ + if (not m_running) { + throw runtime_error(m_exception_data); + } + + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if( + [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ + try { + const int timeout_ms = 1000; + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + auto sock = m_listener_socket.accept(timeout_ms); + if (sock.valid()) { + m_connections.emplace(m_connections.begin(), move(sock)); + } + } + } + catch (const std::runtime_error& e) { + m_exception_data = string("TCPDataDispatcher error: ") + e.what(); + m_running = false; + } +} + +} diff --git a/lib/Socket.h b/lib/Socket.h new file mode 100644 index 0000000..82ff5ad --- /dev/null +++ b/lib/Socket.h @@ -0,0 +1,240 @@ +/* + Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "ThreadsafeQueue.h" +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 + + +namespace Socket { + +struct InetAddress { + struct sockaddr_storage addr; + + struct sockaddr *as_sockaddr() { return reinterpret_cast(&addr); }; + + void resolveUdpDestination(const std::string& destination, int port); +}; + +/** This class represents a UDP packet. + * + * A UDP packet contains a payload (sequence of bytes) and an address. For + * outgoing packets, the address is the destination address. For incoming + * packets, the address tells the user from what source the packet arrived from. + */ +class UDPPacket +{ + public: + UDPPacket(); + UDPPacket(size_t initSize); + + std::vector buffer; + InetAddress address; +}; + +/** + * This class represents a socket for sending and receiving UDP packets. + * + * A UDP socket is the sending or receiving point for a packet delivery service. + * Each packet sent or received on a datagram socket is individually + * addressed and routed. Multiple packets sent from one machine to another may + * be routed differently, and may arrive in any order. + */ +class UDPSocket +{ + public: + /** Create a new socket that will not be bound to any port. To be used + * for data output. + */ + UDPSocket(); + /** Create a new socket. + * @param port The port number on which the socket will be bound + */ + UDPSocket(int port); + /** Create a new socket. + * @param port The port number on which the socket will be bound + * @param name The IP address on which the socket will be bound. + * It is used to bind the socket on a specific interface if + * the computer have many NICs. + */ + UDPSocket(int port, const std::string& name); + ~UDPSocket(); + UDPSocket(const UDPSocket& other) = delete; + const UDPSocket& operator=(const UDPSocket& other) = delete; + + /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port, const std::string& name); + + void close(void); + void send(UDPPacket& packet); + void send(const std::vector& data, InetAddress destination); + UDPPacket receive(size_t max_size); + void joinGroup(char* groupname); + void setMulticastSource(const char* source_addr); + void setMulticastTTL(int ttl); + + /** Set blocking mode. By default, the socket is blocking. + * throws a runtime_error on error. + */ + void setBlocking(bool block); + + protected: + SOCKET listenSocket; +}; + + +class TCPSocket { + public: + TCPSocket(); + ~TCPSocket(); + TCPSocket(const TCPSocket& other) = delete; + TCPSocket& operator=(const TCPSocket& other) = delete; + TCPSocket(TCPSocket&& other); + TCPSocket& operator=(TCPSocket&& other); + + bool valid(void) const; + void connect(const std::string& hostname, int port); + void listen(int port, const std::string& name); + void close(void); + + /* throws a runtime_error on failure, an invalid socket on timeout */ + TCPSocket accept(int timeout_ms); + + /* returns -1 on error, doesn't work on nonblocking sockets */ + ssize_t sendall(const void *buffer, size_t buflen); + + /** Send data over the TCP connection. + * @param data The buffer that will be sent. + * @param size Number of bytes to send. + * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout + * return number of bytes sent, 0 on timeout, or throws runtime_error. + */ + ssize_t send(const void* data, size_t size, int timeout_ms=0); + + /* Returns number of bytes read, 0 on disconnect. Throws a + * runtime_error on error */ + ssize_t recv(void *buffer, size_t length, int flags); + + class Timeout {}; + class Interrupted {}; + /* Returns number of bytes read, 0 on disconnect or refused connection. + * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error + * on error + */ + ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + + private: + explicit TCPSocket(int sockfd); + explicit TCPSocket(int sockfd, InetAddress remote_address); + SOCKET m_sock = -1; + + InetAddress m_remote_address; + + friend class TCPClient; +}; + +/* Implements a TCP receiver that auto-reconnects on errors */ +class TCPClient { + public: + void connect(const std::string& hostname, int port); + + /* Returns numer of bytes read, 0 on auto-reconnect, -1 + * on interruption. + * Throws a runtime_error on error */ + ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + + private: + void reconnect(void); + TCPSocket m_sock; + std::string m_hostname; + int m_port; +}; + +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ + public: + TCPConnection(TCPSocket&& sock); + TCPConnection(const TCPConnection&) = delete; + TCPConnection& operator=(const TCPConnection&) = delete; + ~TCPConnection(); + + ThreadsafeQueue > queue; + + private: + std::atomic m_running; + std::thread m_sender_thread; + TCPSocket m_sock; + + void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ + public: + TCPDataDispatcher(size_t max_queue_size); + ~TCPDataDispatcher(); + TCPDataDispatcher(const TCPDataDispatcher&) = delete; + TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + + void start(int port, const std::string& address); + void write(const std::vector& data); + + private: + void process(void); + + size_t m_max_queue_size; + + std::atomic m_running; + std::string m_exception_data; + std::thread m_listener_thread; + TCPSocket m_listener_socket; + std::list m_connections; +}; + +} -- cgit v1.2.3