aboutsummaryrefslogtreecommitdiffstats
path: root/lib/edioutput/Transport.h
diff options
context:
space:
mode:
Diffstat (limited to 'lib/edioutput/Transport.h')
-rw-r--r--lib/edioutput/Transport.h93
1 files changed, 69 insertions, 24 deletions
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index 2ca638e..b8a9008 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -33,8 +33,6 @@
#include "Socket.h"
#include <chrono>
#include <map>
-#include <unordered_map>
-#include <fstream>
#include <cstdint>
#include <thread>
#include <mutex>
@@ -43,12 +41,13 @@
namespace edi {
/** ETI/STI sender for EDI output */
-
class Sender {
public:
Sender(const configuration_t& conf);
Sender(const Sender&) = delete;
- Sender operator=(const Sender&) = delete;
+ Sender& operator=(const Sender&) = delete;
+ Sender(Sender&&) = delete;
+ Sender& operator=(Sender&&) = delete;
~Sender();
// Assemble the tagpacket into an AF packet, and if needed,
@@ -72,32 +71,78 @@ class Sender {
std::vector<stats_t> get_tcp_server_stats() const;
private:
- void run();
-
- bool m_udp_fragmentation_warning_printed = false;
-
configuration_t m_conf;
- std::ofstream edi_debug_file;
// The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
+ edi::AFPacketiser edi_af_packetiser;
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
+ // PFT spreading requires sending UDP packets at specific time,
+ // independently of time when write() gets called
+ bool m_running = false;
+ std::thread m_thread;
+ virtual void run();
- std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
- std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
- std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
- // PFT spreading requires sending UDP packets at specific time, independently of
- // time when write() gets called
- std::thread m_thread;
- std::mutex m_mutex;
- bool m_running = false;
- std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
- size_t m_last_num_pft_fragments = 0;
-};
-}
+ struct i_sender {
+ virtual void send_packet(const std::vector<uint8_t> &frame) = 0;
+ virtual ~i_sender() { }
+ };
+
+ struct udp_sender_t : public i_sender {
+ udp_sender_t(
+ std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock);
+
+ std::string dest_addr;
+ uint16_t dest_port;
+ Socket::UDPSocket sock;
+
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_dispatcher_t : public i_sender {
+ tcp_dispatcher_t(
+ uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers);
+
+ uint16_t listen_port;
+ Socket::TCPDataDispatcher sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_send_client_t : public i_sender {
+ tcp_send_client_t(
+ const std::string& dest_addr,
+ uint16_t dest_port);
+
+ Socket::TCPSendClient sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ class PFTSpreader {
+ public:
+ using sender_sp = std::shared_ptr<i_sender>;
+ PFTSpreader(const pft_settings_t &conf, sender_sp sender);
+ sender_sp sender;
+ edi::PFT edi_pft;
+
+ void send_af_packet(const AFPacket &af_packet);
+ void tick(const std::chrono::steady_clock::time_point& now);
+
+ private:
+ // send_af_packet() and tick() are called from different threads, both
+ // are accessing m_pending_frames
+ std::mutex m_mutex;
+ std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
+ pft_settings_t settings;
+ size_t last_num_pft_fragments = 0;
+ };
+
+ std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders;
+};
+}