aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2022-08-19 17:12:54 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2022-08-19 17:18:09 +0200
commit913cd43139d7b5d6eac166a01ac09a754f2bd013 (patch)
tree98a7bb16c4a05bd7da8aa0fcd25d7620723b999d
parent7bfb88a7446e7faaee6e297e915a2bf95a699109 (diff)
downloaddabmux-913cd43139d7b5d6eac166a01ac09a754f2bd013.tar.gz
dabmux-913cd43139d7b5d6eac166a01ac09a754f2bd013.tar.bz2
dabmux-913cd43139d7b5d6eac166a01ac09a754f2bd013.zip
Support EDI TCP server pre-roll on client connect
Includes common code changes: socket changes for keepalive and preroll
-rw-r--r--doc/example.mux3
-rw-r--r--lib/Socket.cpp58
-rw-r--r--lib/Socket.h14
-rw-r--r--lib/edioutput/EDIConfig.h3
-rw-r--r--lib/edioutput/Transport.cpp13
-rw-r--r--lib/edioutput/Transport.h4
-rw-r--r--src/DabMux.cpp7
-rw-r--r--src/dabOutput/dabOutputTcp.cpp2
8 files changed, 89 insertions, 15 deletions
diff --git a/doc/example.mux b/doc/example.mux
index f170bda..6682f0a 100644
--- a/doc/example.mux
+++ b/doc/example.mux
@@ -285,6 +285,9 @@ outputs {
edi {
; Example EDI-over-TCP output
; If TIST is enabled, requires leap-second information
+ ;
+ ; When a new client connects, it will receive a pre-roll of 1.5x tist_offset seconds
+ ; worth of EDI data, so that it can quickly fill its buffers.
destinations {
example_tcp {
protocol tcp
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 1ff6418..10ec1ca 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -30,6 +30,7 @@
#include <cerrno>
#include <fcntl.h>
#include <poll.h>
+#include <netinet/tcp.h>
namespace Socket {
@@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
}
}
+void TCPSocket::enable_keepalive(int time, int intvl, int probes)
+{
+ if (m_sock == INVALID_SOCKET) {
+ throw std::logic_error("You may not call enable_keepalive on invalid socket");
+ }
+ int optval = 1;
+ auto optlen = sizeof(optval);
+ if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr);
+ }
+
+ optval = time;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr);
+ }
+
+ optval = intvl;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr);
+ }
+
+ optval = probes;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr);
+ }
+}
+
void TCPSocket::listen(int port, const string& name)
{
if (m_sock != INVALID_SOCKET) {
@@ -938,8 +970,9 @@ void TCPConnection::process()
}
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
- m_max_queue_size(max_queue_size)
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
+ m_max_queue_size(max_queue_size),
+ m_buffers_to_preroll(buffers_to_preroll)
{
}
@@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data)
throw runtime_error(m_exception_data);
}
+ auto lock = unique_lock<mutex>(m_mutex);
+
+ if (m_buffers_to_preroll > 0) {
+ m_preroll_queue.push_back(data);
+ if (m_preroll_queue.size() > m_buffers_to_preroll) {
+ m_preroll_queue.pop_front();
+ }
+ }
+
for (auto& connection : m_connections) {
connection.queue.push(data);
}
- m_connections.remove_if(
- [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+ m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
}
void TCPDataDispatcher::process()
@@ -984,7 +1025,14 @@ void TCPDataDispatcher::process()
// Add a new TCPConnection to the list, constructing it from the client socket
auto sock = m_listener_socket.accept(timeout_ms);
if (sock.valid()) {
+ auto lock = unique_lock<mutex>(m_mutex);
m_connections.emplace(m_connections.begin(), move(sock));
+
+ if (m_buffers_to_preroll > 0) {
+ for (const auto& buf : m_preroll_queue) {
+ m_connections.front().queue.push(buf);
+ }
+ }
}
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index f5143a0..d8242e2 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -173,6 +173,11 @@ class TCPSocket {
void listen(int port, const std::string& name);
void close(void);
+ /* Enable TCP keepalive. See
+ * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
+ */
+ void enable_keepalive(int time, int intvl, int probes);
+
/* throws a runtime_error on failure, an invalid socket on timeout */
TCPSocket accept(int timeout_ms);
@@ -254,7 +259,7 @@ class TCPConnection
class TCPDataDispatcher
{
public:
- TCPDataDispatcher(size_t max_queue_size);
+ TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);
~TCPDataDispatcher();
TCPDataDispatcher(const TCPDataDispatcher&) = delete;
TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
@@ -266,11 +271,16 @@ class TCPDataDispatcher
void process();
size_t m_max_queue_size;
+ size_t m_buffers_to_preroll;
+
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
+
+ std::mutex m_mutex;
+ std::deque<std::vector<uint8_t> > m_preroll_queue;
std::list<TCPConnection> m_connections;
};
diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h
index d57e9ce..a7225a7 100644
--- a/lib/edioutput/EDIConfig.h
+++ b/lib/edioutput/EDIConfig.h
@@ -74,6 +74,9 @@ struct configuration_t {
// Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
// Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
+ // TCP Server output can preroll a fixed number of previous buffers each time a new client connects.
+ size_t tcp_server_preroll_buffers = 0;
+
bool enabled() const { return destinations.size() > 0; }
void print() const;
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index 5d34814..a870aa0 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2020
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -66,7 +66,7 @@ Sender::Sender(const configuration_t& conf) :
edi_pft(m_conf)
{
if (m_conf.verbose) {
- etiLog.log(info, "Setup EDI Output");
+ etiLog.level(info) << "Setup EDI Output, TCP output preroll " << m_conf.tcp_server_preroll_buffers;
}
for (const auto& edi_dest : m_conf.destinations) {
@@ -81,7 +81,9 @@ Sender::Sender(const configuration_t& conf) :
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
- auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);
+ auto dispatcher = make_shared<Socket::TCPDataDispatcher>(
+ tcp_dest->max_frames_queued, m_conf.tcp_server_preroll_buffers);
+
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
@@ -135,9 +137,10 @@ void Sender::write(const AFPacket& af_packet)
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
- if (m_conf.verbose) {
- fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n",
+ if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) {
+ etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
+ m_last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index be93297..6a3f229 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2020
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -90,6 +90,8 @@ class Sender {
std::mutex m_mutex;
bool m_running = false;
std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
+
+ size_t m_last_num_pft_fragments = 0;
};
}
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 3938131..4373265 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -355,6 +355,11 @@ int main(int argc, char *argv[])
}
}
+ const auto tist_offset = pt.get<int>("general.tist_offset", 0);
+ // By keeping 1.5 x tist_offset worth of EDI in the pre-roll buffer, we ensure that a new client can
+ // immediately send out frames according to their timestamp.
+ edi_conf.tcp_server_preroll_buffers = ceil(1.5 * (tist_offset / 24e-3));
+
edi_conf.dump = pt_edi.get<bool>("dump", false);
edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false);
edi_conf.verbose = pt_edi.get<bool>("verbose", false);
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 4dc3538..6453200 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)
uri_ = name;
if (success) {
- dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
+ dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES, 0);
dispatcher_->start(port, address);
}
else {