aboutsummaryrefslogtreecommitdiffstats
path: root/lib/edioutput/Transport.h
blob: b8a90082910c9ddf2b9aa08e0521440dfbc0cb5a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
   Copyright (C) 2025
   Matthias P. Braendli, matthias.braendli@mpb.li

    http://www.opendigitalradio.org

   EDI output,
   UDP and TCP transports and their configuration

   */
/*
   This file is part of the ODR-mmbTools.

   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as
   published by the Free Software Foundation, either version 3 of the
   License, or (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "EDIConfig.h"
#include "AFPacket.h"
#include "PFT.h"
#include "Socket.h"
#include <chrono>
#include <map>
#include <cstdint>
#include <thread>
#include <mutex>
#include <vector>

namespace edi {

/** ETI/STI sender for EDI output */
class Sender {
    public:
        Sender(const configuration_t& conf);
        Sender(const Sender&) = delete;
        Sender& operator=(const Sender&) = delete;
        Sender(Sender&&) = delete;
        Sender& operator=(Sender&&) = delete;
        ~Sender();

        // Assemble the tagpacket into an AF packet, and if needed,
        // apply PFT and then schedule for transmission.
        void write(const TagPacket& tagpacket);

        // Schedule an already assembled AF Packet for transmission,
        // applying PFT if needed.
        void write(const AFPacket& af_packet);

        // Set the sequence numbers to be used for the next call to write()
        // seq is for the AF layer
        // pseq is for the PFT layer
        void override_af_sequence(uint16_t seq);
        void override_pft_sequence(uint16_t pseq);

        struct stats_t {
            uint16_t listen_port;
            std::vector<Socket::TCPConnection::stats_t> stats;
        };
        std::vector<stats_t> get_tcp_server_stats() const;

    private:
        configuration_t m_conf;

        // The TagPacket will then be placed into an AFPacket
        edi::AFPacketiser edi_af_packetiser;

        // PFT spreading requires sending UDP packets at specific time,
        // independently of time when write() gets called
        bool m_running = false;
        std::thread m_thread;
        virtual void run();





        struct i_sender {
            virtual void send_packet(const std::vector<uint8_t> &frame) = 0;
            virtual ~i_sender() { }
        };

        struct udp_sender_t : public i_sender {
            udp_sender_t(
                    std::string dest_addr,
                    uint16_t dest_port,
                    Socket::UDPSocket&& sock);

            std::string dest_addr;
            uint16_t dest_port;
            Socket::UDPSocket sock;

            virtual void send_packet(const std::vector<uint8_t> &frame) override;
        };

        struct tcp_dispatcher_t : public i_sender {
            tcp_dispatcher_t(
                    uint16_t listen_port,
                    size_t max_frames_queued,
                    size_t tcp_server_preroll_buffers);

            uint16_t listen_port;
            Socket::TCPDataDispatcher sock;
            virtual void send_packet(const std::vector<uint8_t> &frame) override;
        };

        struct tcp_send_client_t : public i_sender {
            tcp_send_client_t(
                    const std::string& dest_addr,
                    uint16_t dest_port);

            Socket::TCPSendClient sock;
            virtual void send_packet(const std::vector<uint8_t> &frame) override;
        };

        class PFTSpreader {
            public:
                using sender_sp = std::shared_ptr<i_sender>;
                PFTSpreader(const pft_settings_t &conf, sender_sp sender);
                sender_sp sender;
                edi::PFT edi_pft;

                void send_af_packet(const AFPacket &af_packet);
                void tick(const std::chrono::steady_clock::time_point& now);

            private:
                // send_af_packet() and tick() are called from different threads, both
                // are accessing m_pending_frames
                std::mutex m_mutex;
                std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
                pft_settings_t settings;
                size_t last_num_pft_fragments = 0;
        };

        std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders;
};
}