diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Json.cpp | 4 | ||||
-rw-r--r-- | lib/Json.h | 4 | ||||
-rw-r--r-- | lib/Socket.cpp | 58 | ||||
-rw-r--r-- | lib/Socket.h | 35 | ||||
-rw-r--r-- | lib/ThreadsafeQueue.h | 38 | ||||
-rw-r--r-- | lib/edi/STIDecoder.cpp | 21 | ||||
-rw-r--r-- | lib/edi/STIDecoder.hpp | 5 | ||||
-rw-r--r-- | lib/edioutput/EDIConfig.h | 31 | ||||
-rw-r--r-- | lib/edioutput/PFT.cpp | 12 | ||||
-rw-r--r-- | lib/edioutput/PFT.h | 15 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp | 268 | ||||
-rw-r--r-- | lib/edioutput/Transport.h | 106 |
12 files changed, 395 insertions, 202 deletions
diff --git a/lib/Json.cpp b/lib/Json.cpp index 361a149..ee33671 100644 --- a/lib/Json.cpp +++ b/lib/Json.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -27,7 +27,7 @@ #include <sstream> #include <iomanip> #include <string> -#include <algorithm> +#include <stdexcept> #include "Json.h" @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -34,10 +34,10 @@ #include <vector> #include <memory> #include <optional> -#include <stdexcept> #include <string> #include <unordered_map> #include <variant> +#include <cstdint> namespace json { diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 2df1559..5c920d7 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,6 +24,7 @@ #include "Socket.h" +#include <numeric> #include <stdexcept> #include <cstdio> #include <cstring> @@ -478,7 +479,7 @@ TCPSocket::~TCPSocket() TCPSocket::TCPSocket(TCPSocket&& other) : m_sock(other.m_sock), - m_remote_address(move(other.m_remote_address)) + m_remote_address(std::move(other.m_remote_address)) { if (other.m_sock != -1) { other.m_sock = -1; @@ -967,12 +968,22 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) reconnect(); } + m_last_received_packet_ts = chrono::steady_clock::now(); + return ret; } catch (const TCPSocket::Interrupted&) { return -1; } catch (const TCPSocket::Timeout&) { + const auto timeout = chrono::milliseconds(timeout_ms * 5); + if (m_last_received_packet_ts.has_value() and + chrono::steady_clock::now() - *m_last_received_packet_ts > timeout) + { + // This is to catch half-closed TCP connections + reconnect(); + } + return 0; } @@ -983,6 +994,7 @@ void TCPClient::reconnect() { TCPSocket newsock; m_sock = std::move(newsock); + m_last_received_packet_ts = nullopt; m_sock.connect(m_hostname, m_port, true); } @@ -990,7 +1002,7 @@ TCPConnection::TCPConnection(TCPSocket&& sock) : queue(), m_running(true), m_sender_thread(), - m_sock(move(sock)) + m_sock(std::move(sock)) { #if MISSING_OWN_ADDR auto own_addr = m_sock.getOwnAddress(); @@ -1052,6 +1064,17 @@ void TCPConnection::process() #endif } +TCPConnection::stats_t TCPConnection::get_stats() const +{ + TCPConnection::stats_t s; + const vector<size_t> buffer_sizes = queue.map<size_t>( + [](const vector<uint8_t>& vec) { return vec.size(); } + ); + + s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0); + s.remote_address = m_sock.get_remote_address(); + return s; +} TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : m_max_queue_size(max_queue_size), @@ -1109,7 +1132,7 @@ void TCPDataDispatcher::process() auto sock = m_listener_socket.accept(timeout_ms); if (sock.valid()) { auto lock = unique_lock<mutex>(m_mutex); - m_connections.emplace(m_connections.begin(), move(sock)); + m_connections.emplace(m_connections.begin(), std::move(sock)); if (m_buffers_to_preroll > 0) { for (const auto& buf : m_preroll_queue) { @@ -1125,6 +1148,16 @@ void TCPDataDispatcher::process() } } + +std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const +{ + std::vector<TCPConnection::stats_t> s; + for (const auto& conn : m_connections) { + s.push_back(conn.get_stats()); + } + return s; +} + TCPReceiveServer::TCPReceiveServer(size_t blocksize) : m_blocksize(blocksize) { @@ -1181,7 +1214,7 @@ void TCPReceiveServer::process() } else { buf.resize(r); - m_queue.push(make_shared<TCPReceiveMessageData>(move(buf))); + m_queue.push(make_shared<TCPReceiveMessageData>(std::move(buf))); } } catch (const TCPSocket::Interrupted&) { @@ -1222,7 +1255,7 @@ TCPSendClient::~TCPSendClient() } } -void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) +TCPSendClient::ErrorStats TCPSendClient::sendall(const std::vector<uint8_t>& buffer) { if (not m_running) { throw runtime_error(m_exception_data); @@ -1234,6 +1267,17 @@ void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) vector<uint8_t> discard; m_queue.try_pop(discard); } + + TCPSendClient::ErrorStats es; + es.num_reconnects = m_num_reconnects.load(); + + es.has_seen_new_errors = es.num_reconnects != m_num_reconnects_prev; + m_num_reconnects_prev = es.num_reconnects; + + auto lock = unique_lock<mutex>(m_error_mutex); + es.last_error = m_last_error; + + return es; } void TCPSendClient::process() @@ -1255,12 +1299,16 @@ void TCPSendClient::process() } else { try { + m_num_reconnects.fetch_add(1, std::memory_order_seq_cst); m_sock.connect(m_hostname, m_port); m_is_connected = true; } catch (const runtime_error& e) { m_is_connected = false; this_thread::sleep_for(chrono::seconds(1)); + + auto lock = unique_lock<mutex>(m_error_mutex); + m_last_error = e.what(); } } } diff --git a/lib/Socket.h b/lib/Socket.h index 1320a64..29b618a 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -31,9 +31,11 @@ #include "ThreadsafeQueue.h" #include <cstdlib> #include <atomic> -#include <string> +#include <chrono> #include <list> #include <memory> +#include <optional> +#include <string> #include <thread> #include <vector> @@ -211,6 +213,8 @@ class TCPSocket { SOCKET get_sockfd() const { return m_sock; } + InetAddress get_remote_address() const { return m_remote_address; } + private: explicit TCPSocket(int sockfd); explicit TCPSocket(int sockfd, InetAddress remote_address); @@ -236,6 +240,8 @@ class TCPClient { TCPSocket m_sock; std::string m_hostname; int m_port; + + std::optional<std::chrono::steady_clock::time_point> m_last_received_packet_ts; }; /* Helper class for TCPDataDispatcher, contains a queue of pending data and @@ -250,6 +256,12 @@ class TCPConnection ThreadsafeQueue<std::vector<uint8_t> > queue; + struct stats_t { + size_t buffer_fullness = 0; + InetAddress remote_address; + }; + stats_t get_stats() const; + private: std::atomic<bool> m_running; std::thread m_sender_thread; @@ -272,6 +284,8 @@ class TCPDataDispatcher void start(int port, const std::string& address); void write(const std::vector<uint8_t>& data); + std::vector<TCPConnection::stats_t> get_stats() const; + private: void process(); @@ -329,10 +343,18 @@ class TCPSendClient { public: TCPSendClient(const std::string& hostname, int port); ~TCPSendClient(); + TCPSendClient(const TCPSendClient&) = delete; + TCPSendClient& operator=(const TCPSendClient&) = delete; - /* Throws a runtime_error on error - */ - void sendall(const std::vector<uint8_t>& buffer); + + struct ErrorStats { + std::string last_error = ""; + size_t num_reconnects = 0; + bool has_seen_new_errors = false; + }; + + /* Throws a runtime_error when the process thread isn't running */ + ErrorStats sendall(const std::vector<uint8_t>& buffer); private: void process(); @@ -349,6 +371,11 @@ class TCPSendClient { std::string m_exception_data; std::thread m_sender_thread; TCPSocket m_listener_socket; + + std::atomic<size_t> m_num_reconnects = ATOMIC_VAR_INIT(0); + size_t m_num_reconnects_prev = 0; + std::mutex m_error_mutex; + std::string m_last_error = ""; }; } diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@ #pragma once +#include <functional> #include <mutex> #include <condition_variable> #include <queue> @@ -63,10 +64,10 @@ public: std::unique_lock<std::mutex> lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.push(val); + the_queue.push_back(val); } else if (queue_size_before < max_size) { - the_queue.push(val); + the_queue.push_back(val); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -80,10 +81,10 @@ public: std::unique_lock<std::mutex> lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } else if (queue_size_before < max_size) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -110,9 +111,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.push(val); + the_queue.push_back(val); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -129,9 +130,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -152,7 +153,7 @@ public: while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } - the_queue.push(val); + the_queue.push_back(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -198,7 +199,7 @@ public: } popped_value = the_queue.front(); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public: } else { std::swap(popped_value, the_queue.front()); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); } } + template<typename R> + std::vector<R> map(std::function<R(const T&)> func) const + { + std::vector<R> result; + std::unique_lock<std::mutex> lock(the_mutex); + for (const T& elem : the_queue) { + result.push_back(func(elem)); + } + return result; + } + private: - std::queue<T> the_queue; + std::deque<T> the_queue; mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp index 2de828b..0499c53 100644 --- a/lib/edi/STIDecoder.cpp +++ b/lib/edi/STIDecoder.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -20,7 +20,6 @@ */ #include "STIDecoder.hpp" #include "buffer_unpack.hpp" -#include "crc.h" #include "Log.h" #include <cstdio> #include <cassert> @@ -180,7 +179,13 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t& n |= (uint16_t)(name[3]); if (n == 0) { - etiLog.level(warn) << "EDI: Stream index SSnn tag is zero"; + if (not m_ssnn_zero_warning_printed) { + etiLog.level(warn) << "EDI: Stream index SSnn tag is zero"; + } + m_ssnn_zero_warning_printed = true; + } + else { + m_ssnn_zero_warning_printed = false; } if (m_filter_stream and m_filtered_stream_index != n) { @@ -197,14 +202,20 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t& sti.stid = istc & 0xFFF; if (sti.rfa != 0) { - etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null"; + if (not m_rfa_nonnull_warning_printed) { + etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null"; + } + m_rfa_nonnull_warning_printed = true; + } + else { + m_rfa_nonnull_warning_printed = false; } copy( value.cbegin() + 3, value.cend(), back_inserter(sti.istd)); - m_data_collector.add_payload(move(sti)); + m_data_collector.add_payload(std::move(sti)); return true; } diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp index 5e71ce7..81fbd82 100644 --- a/lib/edi/STIDecoder.hpp +++ b/lib/edi/STIDecoder.hpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -139,6 +139,9 @@ class STIDecoder { bool m_filter_stream = false; uint16_t m_filtered_stream_index = 1; + + bool m_ssnn_zero_warning_printed = false; + bool m_rfa_nonnull_warning_printed = false; }; } diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 1997210..7016e87 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -36,17 +36,31 @@ namespace edi { /** Configuration for EDI output */ +struct pft_settings_t { + // protection and fragmentation settings + bool verbose = false; + bool enable_pft = false; + unsigned chunk_len = 207; // RSk, data length of each chunk + unsigned fec = 0; // number of fragments that can be recovered + double fragment_spreading_factor = 0.95; + // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) + // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. +}; + struct destination_t { virtual ~destination_t() {}; + + pft_settings_t pft_settings = {}; }; + // Can represent both unicast and multicast destinations struct udp_destination_t : public destination_t { std::string dest_addr; - unsigned int dest_port = 0; + uint16_t dest_port = 0; std::string source_addr; - unsigned int source_port = 0; - unsigned int ttl = 10; + uint16_t source_port = 0; + uint8_t ttl = 10; }; // TCP server that can accept multiple connections @@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t { }; struct configuration_t { - unsigned chunk_len = 207; // RSk, data length of each chunk - unsigned fec = 0; // number of fragments that can be recovered - bool dump = false; // dump a file with the EDI packets - bool verbose = false; - bool enable_pft = false; // Enable protection and fragmentation + bool verbose = false; unsigned int tagpacket_alignment = 0; std::vector<std::shared_ptr<destination_t> > destinations; - double fragment_spreading_factor = 0.95; - // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) - // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp index 7e0e8e9..f65fd67 100644 --- a/lib/edioutput/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,7 +31,6 @@ */ #include <vector> -#include <list> #include <cstdio> #include <cstring> #include <cstdint> @@ -41,6 +40,7 @@ #include "PFT.h" #include "crc.h" #include "ReedSolomon.h" +#include "Log.h" namespace edi { @@ -51,11 +51,10 @@ using namespace std; PFT::PFT() { } -PFT::PFT(const configuration_t &conf) : +PFT::PFT(const pft_settings_t& conf) : + m_enabled(conf.enable_pft), m_k(conf.chunk_len), m_m(conf.fec), - m_pseq(0), - m_num_chunks(0), m_verbose(conf.verbose) { if (m_k > 207) { @@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq) m_pseq = pseq; } -} - +} // namespace edi diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h index 42569a0..52e9f46 100644 --- a/lib/edioutput/PFT.h +++ b/lib/edioutput/PFT.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,12 +33,8 @@ #pragma once #include <vector> -#include <list> -#include <stdexcept> #include <cstdint> #include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h" #include "EDIConfig.h" namespace edi { @@ -52,21 +48,24 @@ class PFT static constexpr int PARITYBYTES = 48; PFT(); - PFT(const configuration_t& conf); + PFT(const pft_settings_t& conf); + + bool is_enabled() const { return m_enabled and m_k > 0; } // return a list of PFT fragments with the correct // PFT headers - std::vector< PFTFragment > Assemble(AFPacket af_packet); + std::vector<PFTFragment> Assemble(AFPacket af_packet); // Apply Reed-Solomon FEC to the AF Packet RSBlock Protect(AFPacket af_packet); // Cut a RSBlock into several fragments that can be transmitted - std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); + std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet); void OverridePSeq(uint16_t pseq); private: + bool m_enabled = false; unsigned int m_k = 207; // length of RS data word unsigned int m_m = 3; // number of fragments that can be recovered if lost uint16_t m_pseq = 0; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 8ebb9fc..e9559b5 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -25,7 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "Transport.h" -#include <iterator> +#include "Log.h" #include <cmath> #include <thread> @@ -57,13 +57,18 @@ void configuration_t::print() const else { throw logic_error("EDI destination not implemented"); } + etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft; + if (edi_dest->pft_settings.enable_pft) { + etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec; + etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len; + etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor; + } } } Sender::Sender(const configuration_t& conf) : - m_conf(conf), - edi_pft(m_conf) + m_conf(conf) { if (m_conf.verbose) { etiLog.level(info) << "Setup EDI Output"; @@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); + Socket::UDPSocket udp_socket(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - udp_socket->setMulticastTTL(udp_dest->ttl); + udp_socket.setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket.setMulticastTTL(udp_dest->ttl); } - udp_sockets.emplace(udp_dest.get(), udp_socket); + auto sender = make_shared<udp_sender_t>( + udp_dest->dest_addr, + udp_dest->dest_port, + std::move(udp_socket)); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(udp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { - auto dispatcher = make_shared<Socket::TCPDataDispatcher>( - tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers); - - dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); - tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); + auto sender = make_shared<tcp_dispatcher_t>( + tcp_dest->listen_port, + tcp_dest->max_frames_queued, + tcp_dest->tcp_server_preroll_buffers); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { - auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); - tcp_senders.emplace(tcp_dest.get(), tcp_send_client); + auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else { throw logic_error("EDI destination not implemented"); } } - if (m_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (m_conf.enable_pft) { - unique_lock<mutex> lock(m_mutex); + { m_running = true; m_thread = thread(&Sender::run, this); } @@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) : } } +void Sender::write(const TagPacket& tagpacket) +{ + // Assemble into one AF Packet + edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket); + + write(af_packet); +} + +void Sender::write(const AFPacket& af_packet) +{ + for (auto& sender : m_pft_spreaders) { + sender->send_af_packet(af_packet); + } +} + +void Sender::override_af_sequence(uint16_t seq) +{ + edi_af_packetiser.OverrideSeq(seq); +} + +void Sender::override_pft_sequence(uint16_t pseq) +{ + for (auto& spreader : m_pft_spreaders) { + spreader->edi_pft.OverridePSeq(pseq); + } +} + +std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +{ + std::vector<Sender::stats_t> stats; + + for (auto& spreader : m_pft_spreaders) { + if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) { + Sender::stats_t s; + s.listen_port = sender->listen_port; + s.stats = sender->sock.get_stats(); + stats.push_back(s); + } + } + + return stats; +} + Sender::~Sender() { { - unique_lock<mutex> lock(m_mutex); m_running = false; } @@ -123,36 +172,89 @@ Sender::~Sender() } } -void Sender::write(const TagPacket& tagpacket) +void Sender::run() { - // Assemble into one AF Packet - edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + while (m_running) { + const auto now = chrono::steady_clock::now(); + for (auto& spreader : m_pft_spreaders) { + spreader->tick(now); + } - write(af_packet); + this_thread::sleep_for(chrono::microseconds(500)); + } } -void Sender::write(const AFPacket& af_packet) + +void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame) +{ + Socket::InetAddress addr; + addr.resolveUdpDestination(dest_addr, dest_port); + sock.send(frame, addr); +} + +void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame) { - if (m_conf.enable_pft) { + sock.write(frame); +} + +void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame) +{ + sock.sendall(frame); +} + +Sender::udp_sender_t::udp_sender_t(std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock) : + dest_addr(dest_addr), + dest_port(dest_port), + sock(std::move(sock)) +{ +} + +Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers) : + listen_port(listen_port), + sock(max_frames_queued, tcp_server_preroll_buffers) +{ + sock.start(listen_port, "0.0.0.0"); +} + +Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr, + uint16_t dest_port) : + sock(dest_addr, dest_port) +{ +} + +Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) : + sender(sender), + edi_pft(conf) +{ +} + +void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet) +{ + using namespace std::chrono; + if (edi_pft.is_enabled()) { // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); - if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { + if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) { etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); - m_last_num_pft_fragments = edi_fragments.size(); + last_num_pft_fragments = edi_fragments.size(); } /* Spread out the transmission of all fragments over part of the 24ms AF packet duration * to reduce the risk of losing a burst of fragments because of congestion. */ - using namespace std::chrono; auto inter_fragment_wait_time = microseconds(1); if (edi_fragments.size() > 1) { - if (m_conf.fragment_spreading_factor > 0) { + if (settings.fragment_spreading_factor > 0) { inter_fragment_wait_time = - microseconds( - llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) - ); + microseconds(llrint( + settings.fragment_spreading_factor * 24000.0 / + edi_fragments.size() + )); } } @@ -162,99 +264,35 @@ void Sender::write(const AFPacket& af_packet) auto tp = now; unique_lock<mutex> lock(m_mutex); for (auto& edi_frag : edi_fragments) { - m_pending_frames[tp] = move(edi_frag); + m_pending_frames[tp] = std::move(edi_frag); tp += inter_fragment_wait_time; } } - - // Transmission done in run() function } else /* PFT disabled */ { - // Send over ethernet - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(af_packet.begin(), af_packet.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { - fprintf(stderr, "EDI Output: AF packet larger than 1400," - " consider using PFT to avoid UP fragmentation.\n"); - m_udp_fragmentation_warning_printed = true; - } - - udp_sockets.at(udp_dest.get())->send(af_packet, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(af_packet); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(af_packet); - } - else { - throw logic_error("EDI destination not implemented"); - } - } + const auto now = steady_clock::now(); + unique_lock<mutex> lock(m_mutex); + m_pending_frames[now] = std::move(af_packet); } -} -void Sender::override_af_sequence(uint16_t seq) -{ - edi_afPacketiser.OverrideSeq(seq); + // Actual transmission done in tick() function } -void Sender::override_pft_sequence(uint16_t pseq) +void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now) { - edi_pft.OverridePSeq(pseq); -} + unique_lock<mutex> lock(m_mutex); -void Sender::run() -{ - while (m_running) { - unique_lock<mutex> lock(m_mutex); - const auto now = chrono::steady_clock::now(); + for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { + const auto& edi_frag = it->second; - // Send over ethernet - for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { - const auto& edi_frag = it->second; - - if (it->first <= now) { - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - it = m_pending_frames.erase(it); - } - else { - ++it; - } + if (it->first <= now) { + sender->send_packet(edi_frag); + it = m_pending_frames.erase(it); + } + else { + ++it; } - - lock.unlock(); - this_thread::sleep_for(chrono::microseconds(500)); } } -} +} // namespace edi diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 6a3f229..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,25 +31,23 @@ #include "AFPacket.h" #include "PFT.h" #include "Socket.h" -#include <vector> #include <chrono> #include <map> -#include <unordered_map> -#include <stdexcept> -#include <fstream> #include <cstdint> #include <thread> #include <mutex> +#include <vector> namespace edi { -/** STI sender for EDI output */ - +/** ETI/STI sender for EDI output */ class Sender { public: Sender(const configuration_t& conf); Sender(const Sender&) = delete; - Sender operator=(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(Sender&&) = delete; ~Sender(); // Assemble the tagpacket into an AF packet, and if needed, @@ -66,33 +64,85 @@ class Sender { void override_af_sequence(uint16_t seq); void override_pft_sequence(uint16_t pseq); - private: - void run(); - - bool m_udp_fragmentation_warning_printed = false; + struct stats_t { + uint16_t listen_port; + std::vector<Socket::TCPConnection::stats_t> stats; + }; + std::vector<stats_t> get_tcp_server_stats() const; + private: configuration_t m_conf; - std::ofstream edi_debug_file; // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; + edi::AFPacketiser edi_af_packetiser; - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; + // PFT spreading requires sending UDP packets at specific time, + // independently of time when write() gets called + bool m_running = false; + std::thread m_thread; + virtual void run(); - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; - // PFT spreading requires sending UDP packets at specific time, independently of - // time when write() gets called - std::thread m_thread; - std::mutex m_mutex; - bool m_running = false; - std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; - size_t m_last_num_pft_fragments = 0; -}; -} + struct i_sender { + virtual void send_packet(const std::vector<uint8_t> &frame) = 0; + virtual ~i_sender() { } + }; + + struct udp_sender_t : public i_sender { + udp_sender_t( + std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock); + + std::string dest_addr; + uint16_t dest_port; + Socket::UDPSocket sock; + + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_dispatcher_t : public i_sender { + tcp_dispatcher_t( + uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers); + + uint16_t listen_port; + Socket::TCPDataDispatcher sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_send_client_t : public i_sender { + tcp_send_client_t( + const std::string& dest_addr, + uint16_t dest_port); + + Socket::TCPSendClient sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + class PFTSpreader { + public: + using sender_sp = std::shared_ptr<i_sender>; + PFTSpreader(const pft_settings_t &conf, sender_sp sender); + sender_sp sender; + edi::PFT edi_pft; + + void send_af_packet(const AFPacket &af_packet); + void tick(const std::chrono::steady_clock::time_point& now); + + private: + // send_af_packet() and tick() are called from different threads, both + // are accessing m_pending_frames + std::mutex m_mutex; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + pft_settings_t settings; + size_t last_num_pft_fragments = 0; + }; + + std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} |