diff options
Diffstat (limited to 'lib/edioutput/Transport.h')
-rw-r--r-- | lib/edioutput/Transport.h | 106 |
1 files changed, 78 insertions, 28 deletions
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 6a3f229..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,25 +31,23 @@ #include "AFPacket.h" #include "PFT.h" #include "Socket.h" -#include <vector> #include <chrono> #include <map> -#include <unordered_map> -#include <stdexcept> -#include <fstream> #include <cstdint> #include <thread> #include <mutex> +#include <vector> namespace edi { -/** STI sender for EDI output */ - +/** ETI/STI sender for EDI output */ class Sender { public: Sender(const configuration_t& conf); Sender(const Sender&) = delete; - Sender operator=(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(Sender&&) = delete; ~Sender(); // Assemble the tagpacket into an AF packet, and if needed, @@ -66,33 +64,85 @@ class Sender { void override_af_sequence(uint16_t seq); void override_pft_sequence(uint16_t pseq); - private: - void run(); - - bool m_udp_fragmentation_warning_printed = false; + struct stats_t { + uint16_t listen_port; + std::vector<Socket::TCPConnection::stats_t> stats; + }; + std::vector<stats_t> get_tcp_server_stats() const; + private: configuration_t m_conf; - std::ofstream edi_debug_file; // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; + edi::AFPacketiser edi_af_packetiser; - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; + // PFT spreading requires sending UDP packets at specific time, + // independently of time when write() gets called + bool m_running = false; + std::thread m_thread; + virtual void run(); - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; - // PFT spreading requires sending UDP packets at specific time, independently of - // time when write() gets called - std::thread m_thread; - std::mutex m_mutex; - bool m_running = false; - std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; - size_t m_last_num_pft_fragments = 0; -}; -} + struct i_sender { + virtual void send_packet(const std::vector<uint8_t> &frame) = 0; + virtual ~i_sender() { } + }; + + struct udp_sender_t : public i_sender { + udp_sender_t( + std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock); + + std::string dest_addr; + uint16_t dest_port; + Socket::UDPSocket sock; + + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_dispatcher_t : public i_sender { + tcp_dispatcher_t( + uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers); + + uint16_t listen_port; + Socket::TCPDataDispatcher sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_send_client_t : public i_sender { + tcp_send_client_t( + const std::string& dest_addr, + uint16_t dest_port); + + Socket::TCPSendClient sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + class PFTSpreader { + public: + using sender_sp = std::shared_ptr<i_sender>; + PFTSpreader(const pft_settings_t &conf, sender_sp sender); + sender_sp sender; + edi::PFT edi_pft; + + void send_af_packet(const AFPacket &af_packet); + void tick(const std::chrono::steady_clock::time_point& now); + + private: + // send_af_packet() and tick() are called from different threads, both + // are accessing m_pending_frames + std::mutex m_mutex; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + pft_settings_t settings; + size_t last_num_pft_fragments = 0; + }; + + std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} |