From 353dd9a70e66410e755d2bcaf17d1a9fc8ce8c20 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 19 Aug 2022 17:11:45 +0200 Subject: Common 036201c: socket changes --- lib/RemoteControl.cpp | 4 +--- lib/Socket.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++++++----- lib/Socket.h | 14 +++++++++++-- 3 files changed, 66 insertions(+), 10 deletions(-) (limited to 'lib') diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp index 30dcb60..9ca8d22 100644 --- a/lib/RemoteControl.cpp +++ b/lib/RemoteControl.cpp @@ -29,9 +29,7 @@ #include #include "RemoteControl.h" -#if defined(HAVE_ZEROMQ) - #include "zmq.hpp" -#endif +#include "zmq.hpp" using namespace std; diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 1ff6418..10ec1ca 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2020 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -30,6 +30,7 @@ #include #include #include +#include namespace Socket { @@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock) } } +void TCPSocket::enable_keepalive(int time, int intvl, int probes) +{ + if (m_sock == INVALID_SOCKET) { + throw std::logic_error("You may not call enable_keepalive on invalid socket"); + } + int optval = 1; + auto optlen = sizeof(optval); + if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr); + } + + optval = time; + if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr); + } + + optval = intvl; + if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr); + } + + optval = probes; + if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr); + } +} + void TCPSocket::listen(int port, const string& name) { if (m_sock != INVALID_SOCKET) { @@ -938,8 +970,9 @@ void TCPConnection::process() } -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : - m_max_queue_size(max_queue_size) +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : + m_max_queue_size(max_queue_size), + m_buffers_to_preroll(buffers_to_preroll) { } @@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector& data) throw runtime_error(m_exception_data); } + auto lock = unique_lock(m_mutex); + + if (m_buffers_to_preroll > 0) { + m_preroll_queue.push_back(data); + if (m_preroll_queue.size() > m_buffers_to_preroll) { + m_preroll_queue.pop_front(); + } + } + for (auto& connection : m_connections) { connection.queue.push(data); } - m_connections.remove_if( - [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); + m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); } void TCPDataDispatcher::process() @@ -984,7 +1025,14 @@ void TCPDataDispatcher::process() // Add a new TCPConnection to the list, constructing it from the client socket auto sock = m_listener_socket.accept(timeout_ms); if (sock.valid()) { + auto lock = unique_lock(m_mutex); m_connections.emplace(m_connections.begin(), move(sock)); + + if (m_buffers_to_preroll > 0) { + for (const auto& buf : m_preroll_queue) { + m_connections.front().queue.push(buf); + } + } } } } diff --git a/lib/Socket.h b/lib/Socket.h index f5143a0..d8242e2 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2020 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -173,6 +173,11 @@ class TCPSocket { void listen(int port, const std::string& name); void close(void); + /* Enable TCP keepalive. See + * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html + */ + void enable_keepalive(int time, int intvl, int probes); + /* throws a runtime_error on failure, an invalid socket on timeout */ TCPSocket accept(int timeout_ms); @@ -254,7 +259,7 @@ class TCPConnection class TCPDataDispatcher { public: - TCPDataDispatcher(size_t max_queue_size); + TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll); ~TCPDataDispatcher(); TCPDataDispatcher(const TCPDataDispatcher&) = delete; TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; @@ -266,11 +271,16 @@ class TCPDataDispatcher void process(); size_t m_max_queue_size; + size_t m_buffers_to_preroll; + std::atomic m_running = ATOMIC_VAR_INIT(false); std::string m_exception_data; std::thread m_listener_thread; TCPSocket m_listener_socket; + + std::mutex m_mutex; + std::deque > m_preroll_queue; std::list m_connections; }; -- cgit v1.2.3