aboutsummaryrefslogtreecommitdiffstats
path: root/lib/Socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Socket.cpp')
-rw-r--r--lib/Socket.cpp58
1 files changed, 53 insertions, 5 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 1ff6418..10ec1ca 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -30,6 +30,7 @@
#include <cerrno>
#include <fcntl.h>
#include <poll.h>
+#include <netinet/tcp.h>
namespace Socket {
@@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
}
}
+void TCPSocket::enable_keepalive(int time, int intvl, int probes)
+{
+ if (m_sock == INVALID_SOCKET) {
+ throw std::logic_error("You may not call enable_keepalive on invalid socket");
+ }
+ int optval = 1;
+ auto optlen = sizeof(optval);
+ if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr);
+ }
+
+ optval = time;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr);
+ }
+
+ optval = intvl;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr);
+ }
+
+ optval = probes;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr);
+ }
+}
+
void TCPSocket::listen(int port, const string& name)
{
if (m_sock != INVALID_SOCKET) {
@@ -938,8 +970,9 @@ void TCPConnection::process()
}
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
- m_max_queue_size(max_queue_size)
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
+ m_max_queue_size(max_queue_size),
+ m_buffers_to_preroll(buffers_to_preroll)
{
}
@@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data)
throw runtime_error(m_exception_data);
}
+ auto lock = unique_lock<mutex>(m_mutex);
+
+ if (m_buffers_to_preroll > 0) {
+ m_preroll_queue.push_back(data);
+ if (m_preroll_queue.size() > m_buffers_to_preroll) {
+ m_preroll_queue.pop_front();
+ }
+ }
+
for (auto& connection : m_connections) {
connection.queue.push(data);
}
- m_connections.remove_if(
- [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+ m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
}
void TCPDataDispatcher::process()
@@ -984,7 +1025,14 @@ void TCPDataDispatcher::process()
// Add a new TCPConnection to the list, constructing it from the client socket
auto sock = m_listener_socket.accept(timeout_ms);
if (sock.valid()) {
+ auto lock = unique_lock<mutex>(m_mutex);
m_connections.emplace(m_connections.begin(), move(sock));
+
+ if (m_buffers_to_preroll > 0) {
+ for (const auto& buf : m_preroll_queue) {
+ m_connections.front().queue.push(buf);
+ }
+ }
}
}
}