aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/chdr_tx_data_xport.cpp
blob: cb28c7ac9a1beebf1316f88597a5dd2a376e668f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <uhdlib/rfnoc/mgmt_portal.hpp>
#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/transport/io_service.hpp>
#include <uhdlib/transport/link_if.hpp>

using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::rfnoc::detail;
using namespace uhd::transport;

tx_flow_ctrl_sender::tx_flow_ctrl_sender(
    const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
    : _dst_epid(sep_ids.second)
{
    _fc_packet             = pkt_factory.make_strc();
    _fc_strc_pyld.src_epid = sep_ids.first;
    _fc_strc_pyld.op_code  = chdr::STRC_RESYNC;
}

chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
    uhd::transport::recv_link_if::sptr recv_link,
    uhd::transport::send_link_if::sptr send_link,
    const chdr::chdr_packet_factory& pkt_factory,
    const uhd::rfnoc::sep_id_pair_t& epids,
    const size_t num_send_frames,
    const fc_params_t fc_params)
    : _fc_state(fc_params.buff_capacity)
    , _fc_sender(pkt_factory, epids)
    , _epid(epids.first)
{
    UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
        "Creating tx xport with local epid=" << epids.first
                                             << ", remote epid=" << epids.second);

    _send_header.set_dst_epid(epids.second);
    _send_packet = pkt_factory.make_generic();
    _recv_packet = pkt_factory.make_generic();

    // Calculate max payload size
    const size_t pyld_offset =
        _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
    _max_payload_size = send_link->get_send_frame_size() - pyld_offset;

    // Now create the send I/O we will use for data
    auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
        this->_send_callback(buff, send_link);
    };

    auto recv_cb = [this](buff_t::uptr& buff,
                       transport::recv_link_if* recv_link,
                       transport::send_link_if* send_link) {
        return this->_recv_callback(buff, recv_link, send_link);
    };

    // Needs just a single recv frame for strs packets
    _send_io = io_srv->make_send_client(send_link,
        num_send_frames,
        send_cb,
        recv_link,
        /* num_recv_frames */ 1,
        recv_cb);
}

/*
 * To configure flow control, we need to send an init strc packet, then
 * receive a strs containing the stream endpoint ingress buffer size. We
 * then repeat this (now that we know the buffer size) to configure the flow
 * control frequency. To avoid having this logic within the data packet
 * processing flow, we use temporary send and recv I/O instances with
 * simple callbacks here.
 */
static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_srv,
    recv_link_if::sptr recv_link,
    send_link_if::sptr send_link,
    const chdr::chdr_packet_factory& pkt_factory,
    const sep_id_pair_t epids,
    const double fc_freq_ratio,
    const double fc_headroom_ratio)
{
    chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
    chdr::chdr_packet::uptr recv_packet      = pkt_factory.make_generic();

    // No flow control at initialization, just release all send buffs
    auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) {
        send_link->release_send_buff(std::move(buff));
        buff = nullptr;
    };

    // For recv, just queue strs packets for recv_io to read
    auto recv_cb = [&recv_packet, epids](frame_buff::uptr& buff,
                       recv_link_if* /*recv_link*/,
                       send_link_if* /*send_link*/) {
        recv_packet->refresh(buff->data());
        const auto header   = recv_packet->get_chdr_header();
        const auto type     = header.get_pkt_type();
        const auto dst_epid = header.get_dst_epid();

        return (dst_epid == epids.first && type == chdr::PKT_TYPE_STRS);
    };

    // No flow control at initialization, just release all recv buffs
    auto fc_cb =
        [](frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* /*send_link*/) {
            recv_link->release_recv_buff(std::move(buff));
        };

    auto send_io = io_srv->make_send_client(send_link,
        1, // num_send_frames
        send_cb,
        nullptr,
        0, // num_recv_frames
        nullptr);

    auto recv_io = io_srv->make_recv_client(recv_link,
        1, // num_recv_frames
        recv_cb,
        nullptr,
        0, // num_send_frames
        fc_cb);

    // Function to send a strc init
    auto send_strc_init = [&send_io, epids, &strc_packet](
                              const stream_buff_params_t fc_freq = {0, 0}) {
        frame_buff::uptr buff = send_io->get_send_buff(0);

        if (!buff) {
            throw uhd::runtime_error(
                "tx xport timed out getting a send buffer for strc init");
        }

        chdr::chdr_header header;
        header.set_seq_num(0);
        header.set_dst_epid(epids.second);

        chdr::strc_payload strc_pyld;
        strc_pyld.src_epid  = epids.first;
        strc_pyld.op_code   = chdr::STRC_INIT;
        strc_pyld.num_bytes = fc_freq.bytes;
        strc_pyld.num_pkts  = fc_freq.packets;
        strc_packet->refresh(buff->data(), header, strc_pyld);

        const size_t size = header.get_length();
        buff->set_packet_size(size);
        send_io->release_send_buff(std::move(buff));
    };

    // Function to receive a strs, returns buffer capacity
    auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
        frame_buff::uptr buff = recv_io->get_recv_buff(200);

        if (!buff) {
            throw uhd::runtime_error(
                "tx xport timed out wating for a strs packet during initialization");
        }

        recv_packet->refresh(buff->data());
        UHD_ASSERT_THROW(
            recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
        chdr::strs_payload strs;
        strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
            recv_packet->get_payload_size() / sizeof(uint64_t),
            recv_packet->conv_to_host<uint64_t>());

        recv_io->release_recv_buff(std::move(buff));

        return {strs.capacity_bytes, static_cast<uint32_t>(strs.capacity_pkts)};
    };

    // Send a strc init to get the buffer size
    send_strc_init();
    stream_buff_params_t capacity = recv_strs();

    UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
        "Received strs initializing buffer capacity to " << capacity.bytes << " bytes");

    // Calculate the requested fc_freq parameters
    stream_buff_params_t fc_freq = {
        static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
        static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_freq_ratio))};

    const size_t headroom_bytes =
        static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
    const size_t headroom_packets =
        static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_headroom_ratio));

    fc_freq.bytes -= headroom_bytes;
    fc_freq.packets -= headroom_packets;

    // Send a strc init to configure fc freq
    send_strc_init(fc_freq);
    recv_strs();

    // Release temporary I/O service interfaces to disconnect from it
    send_io.reset();
    recv_io.reset();

    return {capacity};
}

chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sptr io_srv,
    recv_link_if::sptr recv_link,
    send_link_if::sptr send_link,
    const chdr::chdr_packet_factory& pkt_factory,
    mgmt::mgmt_portal& mgmt_portal,
    const sep_id_pair_t& epids,
    const sw_buff_t pyld_buff_fmt,
    const sw_buff_t mdata_buff_fmt,
    const double fc_freq_ratio,
    const double fc_headroom_ratio)
{
    const sep_id_t remote_epid = epids.second;
    const sep_id_t local_epid  = epids.first;

    // Create a control transport with the tx data links to send mgmt packets
    // needed to setup the stream. Only need one frame for this.
    auto ctrl_xport = chdr_ctrl_xport::make(io_srv,
        send_link,
        recv_link,
        pkt_factory,
        local_epid,
        1, // num_send_frames
        1); // num_recv_frames

    // Setup a route to the EPID
    mgmt_portal.setup_local_route(*ctrl_xport, remote_epid);

    mgmt_portal.config_local_tx_stream(
        *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);

    // We no longer need the control xport, release it so
    // the control xport is no longer connected to the I/O service.
    ctrl_xport.reset();

    return configure_flow_ctrl(io_srv,
        recv_link,
        send_link,
        pkt_factory,
        epids,
        fc_freq_ratio,
        fc_headroom_ratio);
}