From 353dd9a70e66410e755d2bcaf17d1a9fc8ce8c20 Mon Sep 17 00:00:00 2001
From: "Matthias P. Braendli" <matthias.braendli@mpb.li>
Date: Fri, 19 Aug 2022 17:11:45 +0200
Subject: Common 036201c: socket changes

---
 lib/RemoteControl.cpp |  4 +---
 lib/Socket.cpp        | 58 ++++++++++++++++++++++++++++++++++++++++++++++-----
 lib/Socket.h          | 14 +++++++++++--
 3 files changed, 66 insertions(+), 10 deletions(-)

(limited to 'lib')

diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp
index 30dcb60..9ca8d22 100644
--- a/lib/RemoteControl.cpp
+++ b/lib/RemoteControl.cpp
@@ -29,9 +29,7 @@
 #include <algorithm>
 
 #include "RemoteControl.h"
-#if defined(HAVE_ZEROMQ)
-    #include "zmq.hpp"
-#endif
+#include "zmq.hpp"
 
 using namespace std;
 
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 1ff6418..10ec1ca 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -2,7 +2,7 @@
    Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
    Queen in Right of Canada (Communications Research Center Canada)
 
-   Copyright (C) 2020
+   Copyright (C) 2022
    Matthias P. Braendli, matthias.braendli@mpb.li
 
     http://www.opendigitalradio.org
@@ -30,6 +30,7 @@
 #include <cerrno>
 #include <fcntl.h>
 #include <poll.h>
+#include <netinet/tcp.h>
 
 namespace Socket {
 
@@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
     }
 }
 
+void TCPSocket::enable_keepalive(int time, int intvl, int probes)
+{
+    if (m_sock == INVALID_SOCKET) {
+        throw std::logic_error("You may not call enable_keepalive on invalid socket");
+    }
+    int optval = 1;
+    auto optlen = sizeof(optval);
+    if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
+        std::string errstr(strerror(errno));
+        throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr);
+    }
+
+    optval = time;
+    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
+        std::string errstr(strerror(errno));
+        throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr);
+    }
+
+    optval = intvl;
+    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
+        std::string errstr(strerror(errno));
+        throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr);
+    }
+
+    optval = probes;
+    if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
+        std::string errstr(strerror(errno));
+        throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr);
+    }
+}
+
 void TCPSocket::listen(int port, const string& name)
 {
     if (m_sock != INVALID_SOCKET) {
@@ -938,8 +970,9 @@ void TCPConnection::process()
 }
 
 
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
-    m_max_queue_size(max_queue_size)
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
+    m_max_queue_size(max_queue_size),
+    m_buffers_to_preroll(buffers_to_preroll)
 {
 }
 
@@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data)
         throw runtime_error(m_exception_data);
     }
 
+    auto lock = unique_lock<mutex>(m_mutex);
+
+    if (m_buffers_to_preroll > 0) {
+        m_preroll_queue.push_back(data);
+        if (m_preroll_queue.size() > m_buffers_to_preroll) {
+            m_preroll_queue.pop_front();
+        }
+    }
+
     for (auto& connection : m_connections) {
         connection.queue.push(data);
     }
 
-    m_connections.remove_if(
-            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+    m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
 }
 
 void TCPDataDispatcher::process()
@@ -984,7 +1025,14 @@ void TCPDataDispatcher::process()
             // Add a new TCPConnection to the list, constructing it from the client socket
             auto sock = m_listener_socket.accept(timeout_ms);
             if (sock.valid()) {
+                auto lock = unique_lock<mutex>(m_mutex);
                 m_connections.emplace(m_connections.begin(), move(sock));
+
+                if (m_buffers_to_preroll > 0) {
+                    for (const auto& buf : m_preroll_queue) {
+                        m_connections.front().queue.push(buf);
+                    }
+                }
             }
         }
     }
diff --git a/lib/Socket.h b/lib/Socket.h
index f5143a0..d8242e2 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -2,7 +2,7 @@
    Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
    Queen in Right of Canada (Communications Research Center Canada)
 
-   Copyright (C) 2020
+   Copyright (C) 2022
    Matthias P. Braendli, matthias.braendli@mpb.li
 
     http://www.opendigitalradio.org
@@ -173,6 +173,11 @@ class TCPSocket {
         void listen(int port, const std::string& name);
         void close(void);
 
+        /* Enable TCP keepalive. See
+         * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
+         */
+        void enable_keepalive(int time, int intvl, int probes);
+
         /* throws a runtime_error on failure, an invalid socket on timeout */
         TCPSocket accept(int timeout_ms);
 
@@ -254,7 +259,7 @@ class TCPConnection
 class TCPDataDispatcher
 {
     public:
-        TCPDataDispatcher(size_t max_queue_size);
+        TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);
         ~TCPDataDispatcher();
         TCPDataDispatcher(const TCPDataDispatcher&) = delete;
         TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
@@ -266,11 +271,16 @@ class TCPDataDispatcher
         void process();
 
         size_t m_max_queue_size;
+        size_t m_buffers_to_preroll;
+
 
         std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
         std::string m_exception_data;
         std::thread m_listener_thread;
         TCPSocket m_listener_socket;
+
+        std::mutex m_mutex;
+        std::deque<std::vector<uint8_t> > m_preroll_queue;
         std::list<TCPConnection> m_connections;
 };
 
-- 
cgit v1.2.3