From 9719087ebcb4bd422c592f994f2e1691191e91d6 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 16 Jan 2021 08:05:41 +0100 Subject: Common 405ad00: Reimplement UDPReceiver with poll instead of a thread --- lib/Socket.cpp | 142 ++++++++++++++++++++++++++++++++++----------------------- lib/Socket.h | 42 +++++++++-------- 2 files changed, 107 insertions(+), 77 deletions(-) (limited to 'lib') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 6a20429..c876f32 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port } } +string InetAddress::to_string() const +{ + char received_from_str[64] = {}; + sockaddr *addr = reinterpret_cast(&addr); + const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63); + + if (ret == nullptr) { + throw invalid_argument(string("Error converting InetAddress") + strerror(errno)); + } + return ret; +} + UDPPacket::UDPPacket() { } UDPPacket::UDPPacket(size_t initSize) : @@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) : { } -UDPSocket::UDPSocket() : - m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket() { reinit(0, ""); } -UDPSocket::UDPSocket(int port) : - m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port) { reinit(port, ""); } -UDPSocket::UDPSocket(int port, const std::string& name) : - m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port, const std::string& name) { reinit(port, name); } +UDPSocket::UDPSocket(UDPSocket&& other) +{ + m_sock = other.m_sock; + m_port = other.m_port; + other.m_port = 0; + other.m_sock = INVALID_SOCKET; +} + +const UDPSocket& UDPSocket::operator=(UDPSocket&& other) +{ + m_sock = other.m_sock; + m_port = other.m_port; + other.m_port = 0; + other.m_sock = INVALID_SOCKET; + return *this; +} void UDPSocket::setBlocking(bool block) { @@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name) ::close(m_sock); } + m_port = port; + if (port == 0) { // No need to bind to a given port, creating the // socket is enough @@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl) } } -UDPReceiver::UDPReceiver() { } - -UDPReceiver::~UDPReceiver() { - m_stop = true; - m_sock.close(); - if (m_thread.joinable()) { - m_thread.join(); - } +SOCKET UDPSocket::getNativeSocket() const +{ + return m_sock; } -void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { - m_port = port; - m_bindto = bindto; - m_mcastaddr = mcastaddr; - m_max_packets_queued = max_packets_queued; - m_thread = std::thread(&UDPReceiver::m_run, this); +int UDPSocket::getPort() const +{ + return m_port; } -std::vector UDPReceiver::get_packet_buffer() -{ - if (m_stop) { - throw runtime_error("UDP Receiver not running"); - } +void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) { + UDPSocket sock; - UDPPacket p; - m_packets.wait_and_pop(p); + if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { + sock.reinit(port, mcastaddr); + sock.setMulticastSource(bindto.c_str()); + sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); + } + else { + sock.reinit(port, bindto); + } - return p.buffer; + m_sockets.push_back(move(sock)); } -void UDPReceiver::m_run() +vector UDPReceiver::receive(int timeout_ms) { - // Ensure that stop is set to true in case of exception or return - struct SetStopOnDestruct { - SetStopOnDestruct(atomic& stop) : m_stop(stop) {} - ~SetStopOnDestruct() { m_stop = true; } - private: atomic& m_stop; - } autoSetStop(m_stop); - - if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { - m_sock.reinit(m_port, m_mcastaddr); - m_sock.setMulticastSource(m_bindto.c_str()); - m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); + constexpr size_t MAX_FDS = 64; + struct pollfd fds[MAX_FDS]; + if (m_sockets.size() > MAX_FDS) { + throw std::runtime_error("UDPReceiver only supports up to 64 ports"); } - else { - m_sock.reinit(m_port, m_bindto); + + for (size_t i = 0; i < m_sockets.size(); i++) { + fds[i].fd = m_sockets[i].getNativeSocket(); + fds[i].events = POLLIN; } - while (not m_stop) { - constexpr size_t packsize = 8192; - try { - auto packet = m_sock.receive(packsize); - if (packet.buffer.size() == packsize) { - // TODO replace fprintf - fprintf(stderr, "Warning, possible UDP truncation\n"); - } + int retval = poll(fds, m_sockets.size(), timeout_ms); - // If this blocks, the UDP socket will lose incoming packets - m_packets.push_wait_if_full(packet, m_max_packets_queued); - } - catch (const std::runtime_error& e) { - // TODO replace fprintf - // TODO handle intr - fprintf(stderr, "Socket error: %s\n", e.what()); - m_stop = true; + if (retval == -1 and errno == EINTR) { + throw Interrupted(); + } + else if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("UDP receive with poll() error: " + errstr); + } + else if (retval > 0) { + vector received; + + for (size_t i = 0; i < m_sockets.size(); i++) { + if (fds[i].revents & POLLIN) { + auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU + ReceivedPacket rp; + rp.packetdata = move(p.buffer); + rp.received_from = move(p.address); + rp.port_received_on = m_sockets[i].getPort(); + received.push_back(move(rp)); + } } + + return received; + } + else { + throw Timeout(); } } diff --git a/lib/Socket.h b/lib/Socket.h index 2291dd5..33cdc05 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -56,6 +56,8 @@ struct InetAddress { struct sockaddr *as_sockaddr() { return reinterpret_cast(&addr); }; void resolveUdpDestination(const std::string& destination, int port); + + std::string to_string() const; }; /** This class represents a UDP packet. @@ -103,6 +105,8 @@ class UDPSocket ~UDPSocket(); UDPSocket(const UDPSocket& other) = delete; const UDPSocket& operator=(const UDPSocket& other) = delete; + UDPSocket(UDPSocket&& other); + const UDPSocket& operator=(UDPSocket&& other); /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ void reinit(int port); @@ -121,36 +125,36 @@ class UDPSocket */ void setBlocking(bool block); + SOCKET getNativeSocket() const; + int getPort() const; + protected: - SOCKET m_sock; + SOCKET m_sock = INVALID_SOCKET; + int m_port = 0; }; -/* Threaded UDP receiver */ +/* UDP packet receiver supporting receiving from several ports at once */ class UDPReceiver { public: - UDPReceiver(); - ~UDPReceiver(); - UDPReceiver(const UDPReceiver&) = delete; - UDPReceiver operator=(const UDPReceiver&) = delete; + void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr); - // Start the receiver in a separate thread - void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + struct ReceivedPacket { + std::vector packetdata; + InetAddress received_from; + int port_received_on; + }; - // Get the data contained in a UDP packet, blocks if none available - // In case of error, throws a runtime_error - std::vector get_packet_buffer(void); + class Interrupted {}; + class Timeout {}; + /* Returns one or several packets, + * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error + * on error. */ + std::vector receive(int timeout_ms); private: void m_run(void); - int m_port = 0; - std::string m_bindto; - std::string m_mcastaddr; - size_t m_max_packets_queued = 1; - std::thread m_thread; - std::atomic m_stop = ATOMIC_VAR_INIT(false); - ThreadsafeQueue m_packets; - UDPSocket m_sock; + std::vector m_sockets; }; class TCPSocket { -- cgit v1.2.3