diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2022-08-19 17:12:54 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2022-08-19 17:18:09 +0200 |
commit | 913cd43139d7b5d6eac166a01ac09a754f2bd013 (patch) | |
tree | 98a7bb16c4a05bd7da8aa0fcd25d7620723b999d | |
parent | 7bfb88a7446e7faaee6e297e915a2bf95a699109 (diff) | |
download | dabmux-913cd43139d7b5d6eac166a01ac09a754f2bd013.tar.gz dabmux-913cd43139d7b5d6eac166a01ac09a754f2bd013.tar.bz2 dabmux-913cd43139d7b5d6eac166a01ac09a754f2bd013.zip |
Support EDI TCP server pre-roll on client connect
Includes common code changes: socket changes for keepalive and preroll
-rw-r--r-- | doc/example.mux | 3 | ||||
-rw-r--r-- | lib/Socket.cpp | 58 | ||||
-rw-r--r-- | lib/Socket.h | 14 | ||||
-rw-r--r-- | lib/edioutput/EDIConfig.h | 3 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp | 13 | ||||
-rw-r--r-- | lib/edioutput/Transport.h | 4 | ||||
-rw-r--r-- | src/DabMux.cpp | 7 | ||||
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 2 |
8 files changed, 89 insertions, 15 deletions
diff --git a/doc/example.mux b/doc/example.mux index f170bda..6682f0a 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -285,6 +285,9 @@ outputs { edi { ; Example EDI-over-TCP output ; If TIST is enabled, requires leap-second information + ; + ; When a new client connects, it will receive a pre-roll of 1.5x tist_offset seconds + ; worth of EDI data, so that it can quickly fill its buffers. destinations { example_tcp { protocol tcp diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 1ff6418..10ec1ca 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2020 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -30,6 +30,7 @@ #include <cerrno> #include <fcntl.h> #include <poll.h> +#include <netinet/tcp.h> namespace Socket { @@ -610,6 +611,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock) } } +void TCPSocket::enable_keepalive(int time, int intvl, int probes) +{ + if (m_sock == INVALID_SOCKET) { + throw std::logic_error("You may not call enable_keepalive on invalid socket"); + } + int optval = 1; + auto optlen = sizeof(optval); + if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr); + } + + optval = time; + if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr); + } + + optval = intvl; + if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr); + } + + optval = probes; + if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr); + } +} + void TCPSocket::listen(int port, const string& name) { if (m_sock != INVALID_SOCKET) { @@ -938,8 +970,9 @@ void TCPConnection::process() } -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : - m_max_queue_size(max_queue_size) +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : + m_max_queue_size(max_queue_size), + m_buffers_to_preroll(buffers_to_preroll) { } @@ -967,12 +1000,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data) throw runtime_error(m_exception_data); } + auto lock = unique_lock<mutex>(m_mutex); + + if (m_buffers_to_preroll > 0) { + m_preroll_queue.push_back(data); + if (m_preroll_queue.size() > m_buffers_to_preroll) { + m_preroll_queue.pop_front(); + } + } + for (auto& connection : m_connections) { connection.queue.push(data); } - m_connections.remove_if( - [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); + m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); } void TCPDataDispatcher::process() @@ -984,7 +1025,14 @@ void TCPDataDispatcher::process() // Add a new TCPConnection to the list, constructing it from the client socket auto sock = m_listener_socket.accept(timeout_ms); if (sock.valid()) { + auto lock = unique_lock<mutex>(m_mutex); m_connections.emplace(m_connections.begin(), move(sock)); + + if (m_buffers_to_preroll > 0) { + for (const auto& buf : m_preroll_queue) { + m_connections.front().queue.push(buf); + } + } } } } diff --git a/lib/Socket.h b/lib/Socket.h index f5143a0..d8242e2 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2020 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -173,6 +173,11 @@ class TCPSocket { void listen(int port, const std::string& name); void close(void); + /* Enable TCP keepalive. See + * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html + */ + void enable_keepalive(int time, int intvl, int probes); + /* throws a runtime_error on failure, an invalid socket on timeout */ TCPSocket accept(int timeout_ms); @@ -254,7 +259,7 @@ class TCPConnection class TCPDataDispatcher { public: - TCPDataDispatcher(size_t max_queue_size); + TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll); ~TCPDataDispatcher(); TCPDataDispatcher(const TCPDataDispatcher&) = delete; TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; @@ -266,11 +271,16 @@ class TCPDataDispatcher void process(); size_t m_max_queue_size; + size_t m_buffers_to_preroll; + std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); std::string m_exception_data; std::thread m_listener_thread; TCPSocket m_listener_socket; + + std::mutex m_mutex; + std::deque<std::vector<uint8_t> > m_preroll_queue; std::list<TCPConnection> m_connections; }; diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index d57e9ce..a7225a7 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -74,6 +74,9 @@ struct configuration_t { // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. + // TCP Server output can preroll a fixed number of previous buffers each time a new client connects. + size_t tcp_server_preroll_buffers = 0; + bool enabled() const { return destinations.size() > 0; } void print() const; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 5d34814..a870aa0 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -66,7 +66,7 @@ Sender::Sender(const configuration_t& conf) : edi_pft(m_conf) { if (m_conf.verbose) { - etiLog.log(info, "Setup EDI Output"); + etiLog.level(info) << "Setup EDI Output, TCP output preroll " << m_conf.tcp_server_preroll_buffers; } for (const auto& edi_dest : m_conf.destinations) { @@ -81,7 +81,9 @@ Sender::Sender(const configuration_t& conf) : udp_sockets.emplace(udp_dest.get(), udp_socket); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { - auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued); + auto dispatcher = make_shared<Socket::TCPDataDispatcher>( + tcp_dest->max_frames_queued, m_conf.tcp_server_preroll_buffers); + dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } @@ -135,9 +137,10 @@ void Sender::write(const AFPacket& af_packet) // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); - if (m_conf.verbose) { - fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n", + if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { + etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); + m_last_num_pft_fragments = edi_fragments.size(); } /* Spread out the transmission of all fragments over part of the 24ms AF packet duration diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index be93297..6a3f229 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -90,6 +90,8 @@ class Sender { std::mutex m_mutex; bool m_running = false; std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + + size_t m_last_num_pft_fragments = 0; }; } diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 3938131..4373265 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2022 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -355,6 +355,11 @@ int main(int argc, char *argv[]) } } + const auto tist_offset = pt.get<int>("general.tist_offset", 0); + // By keeping 1.5 x tist_offset worth of EDI in the pre-roll buffer, we ensure that a new client can + // immediately send out frames according to their timestamp. + edi_conf.tcp_server_preroll_buffers = ceil(1.5 * (tist_offset / 24e-3)); + edi_conf.dump = pt_edi.get<bool>("dump", false); edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false); edi_conf.verbose = pt_edi.get<bool>("verbose", false); diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 4dc3538..6453200 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); + dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES, 0); dispatcher_->start(port, address); } else { |