diff options
Diffstat (limited to 'lib/edioutput/Transport.h')
-rw-r--r-- | lib/edioutput/Transport.h | 93 |
1 files changed, 69 insertions, 24 deletions
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 2ca638e..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -33,8 +33,6 @@ #include "Socket.h" #include <chrono> #include <map> -#include <unordered_map> -#include <fstream> #include <cstdint> #include <thread> #include <mutex> @@ -43,12 +41,13 @@ namespace edi { /** ETI/STI sender for EDI output */ - class Sender { public: Sender(const configuration_t& conf); Sender(const Sender&) = delete; - Sender operator=(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(Sender&&) = delete; ~Sender(); // Assemble the tagpacket into an AF packet, and if needed, @@ -72,32 +71,78 @@ class Sender { std::vector<stats_t> get_tcp_server_stats() const; private: - void run(); - - bool m_udp_fragmentation_warning_printed = false; - configuration_t m_conf; - std::ofstream edi_debug_file; // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; + edi::AFPacketiser edi_af_packetiser; - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; + // PFT spreading requires sending UDP packets at specific time, + // independently of time when write() gets called + bool m_running = false; + std::thread m_thread; + virtual void run(); - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; - // PFT spreading requires sending UDP packets at specific time, independently of - // time when write() gets called - std::thread m_thread; - std::mutex m_mutex; - bool m_running = false; - std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; - size_t m_last_num_pft_fragments = 0; -}; -} + struct i_sender { + virtual void send_packet(const std::vector<uint8_t> &frame) = 0; + virtual ~i_sender() { } + }; + + struct udp_sender_t : public i_sender { + udp_sender_t( + std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock); + + std::string dest_addr; + uint16_t dest_port; + Socket::UDPSocket sock; + + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_dispatcher_t : public i_sender { + tcp_dispatcher_t( + uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers); + + uint16_t listen_port; + Socket::TCPDataDispatcher sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_send_client_t : public i_sender { + tcp_send_client_t( + const std::string& dest_addr, + uint16_t dest_port); + + Socket::TCPSendClient sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + class PFTSpreader { + public: + using sender_sp = std::shared_ptr<i_sender>; + PFTSpreader(const pft_settings_t &conf, sender_sp sender); + sender_sp sender; + edi::PFT edi_pft; + + void send_af_packet(const AFPacket &af_packet); + void tick(const std::chrono::steady_clock::time_point& now); + + private: + // send_af_packet() and tick() are called from different threads, both + // are accessing m_pending_frames + std::mutex m_mutex; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + pft_settings_t settings; + size_t last_num_pft_fragments = 0; + }; + + std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} |