aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/graph_stream_manager.cpp
blob: fdf294006b24a93a367426284e70f5b278577c32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/graph_stream_manager.hpp>
#include <uhdlib/rfnoc/link_stream_manager.hpp>
#include <uhdlib/transport/links.hpp>
#include <boost/format.hpp>
#include <map>
#include <memory>
#include <set>

using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::rfnoc::chdr;

graph_stream_manager::~graph_stream_manager() = default;

class graph_stream_manager_impl : public graph_stream_manager
{
public:
    graph_stream_manager_impl(const chdr::chdr_packet_factory& pkt_factory,
        const epid_allocator::sptr& epid_alloc,
        const std::vector<std::pair<device_id_t, mb_iface*>>& links)
        : _epid_alloc(epid_alloc)
    {
        for (const auto& lnk : links) {
            UHD_ASSERT_THROW(lnk.second);
            _link_mgrs.emplace(lnk.first,
                link_stream_manager::make(
                    pkt_factory, *lnk.second, epid_alloc, lnk.first));
            auto adapter = _link_mgrs.at(lnk.first)->get_adapter_id();
            if (_alloc_map.count(adapter) == 0) {
                _alloc_map[adapter] = allocation_info{0, 0};
            }
        }
        for (const auto& mgr_pair : _link_mgrs) {
            for (const auto& ep : mgr_pair.second->get_reachable_endpoints()) {
                // Add the (potential) destinations to the
                _reachable_endpoints.insert(ep);
                // Add entry to source map
                if (_src_map.count(ep) == 0) {
                    _src_map[ep] = std::vector<device_id_t>();
                }
                _src_map[ep].push_back(mgr_pair.first);
            }
        }
    }

    ~graph_stream_manager_impl() override = default;

    const std::set<sep_addr_t>& get_reachable_endpoints() const override
    {
        return _reachable_endpoints;
    }

    std::vector<device_id_t> get_local_devices() const override
    {
        std::vector<device_id_t> retval;
        for (const auto& mgr_pair : _link_mgrs) {
            retval.push_back(mgr_pair.first);
        }
        return retval;
    }

    sep_id_pair_t connect_host_to_device(sep_addr_t dst_addr,
        uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID) override
    {
        UHD_LOGGER_DEBUG("RFNOC::GRAPH")
            << boost::format("Connecting the Host to Endpoint %d:%d through Adapter "
                             "%d (0 = no preference)... ")
                   % dst_addr.first % dst_addr.second % adapter;

        // When we connect, we setup a route and fire up a control stream between
        // the endpoints
        device_id_t gateway =
            _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
        sep_id_pair_t epid_pair =
            _link_mgrs.at(gateway)->connect_host_to_device(dst_addr);
        UHD_LOGGER_DEBUG("RFNOC::GRAPH")
            << boost::format("Connection to Endpoint %d:%d completed through Device %d. "
                             "Using EPIDs %d -> %d.")
                   % dst_addr.first % dst_addr.second % gateway % epid_pair.first
                   % epid_pair.second;

        return epid_pair;
    }

    sep_id_pair_t connect_device_to_device(
        sep_addr_t dst_addr, sep_addr_t src_addr) override
    {
        UHD_LOGGER_DEBUG("RFNOC::GRAPH")
            << boost::format("Connecting the Endpoint %d:%d to Endpoint %d:%d...")
                   % src_addr.first % src_addr.second % dst_addr.first % dst_addr.second;

        // Iterate through all link managers and check if they are capable of connecting
        // the requested endpoints. If no one can connect then the endpoints may actually
        // not share a common crossbar or we don't have enough connectivity in the
        // software session to reach the common crossbar.
        for (auto& kv : _link_mgrs) {
            if (kv.second->can_connect_device_to_device(dst_addr, src_addr)) {
                sep_id_pair_t epid_pair =
                    kv.second->connect_device_to_device(dst_addr, src_addr);
                UHD_LOGGER_DEBUG("RFNOC::GRAPH")
                    << boost::format("Connection from Endpoint %d:%d to Endpoint %d:%d "
                                     "completed through Device %d. Using "
                                     "EPIDs %d -> %d.")
                           % src_addr.first % src_addr.second % dst_addr.first
                           % dst_addr.second % kv.first % epid_pair.first
                           % epid_pair.second;
                return epid_pair;
            }
        }
        throw uhd::routing_error("The specified destination is unreachable from the "
                                 "specified source endpoint");
    }

    ctrlport_endpoint::sptr get_block_register_iface(sep_addr_t dst_addr,
        uint16_t block_index,
        const clock_iface& client_clk,
        const clock_iface& timebase_clk,
        uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID) override
    {
        // We must be connected to dst_addr before getting a register iface
        sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
        auto dev =
            _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
        return _link_mgrs.at(dev)->get_block_register_iface(
            dst_epid, block_index, client_clk, timebase_clk);
    }

    detail::client_zero::sptr get_client_zero(sep_addr_t dst_addr,
        uhd::transport::adapter_id_t adapter =
            uhd::transport::NULL_ADAPTER_ID) const override
    {
        // We must be connected to dst_addr before getting a client zero
        sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
        auto dev =
            _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
        return _link_mgrs.at(dev)->get_client_zero(dst_epid);
    }

    std::tuple<sep_id_pair_t, stream_buff_params_t> create_device_to_device_data_stream(
        const sep_addr_t dst_addr,
        const sep_addr_t src_addr,
        const bool lossy_xport,
        const double fc_freq_ratio,
        const double fc_headroom_ratio,
        const bool reset = false) override
    {
        UHD_LOGGER_DEBUG("RFNOC::GRAPH")
            << boost::format(
                   "Initializing data stream from Endpoint %d:%d to Endpoint %d:%d...")
                   % src_addr.first % src_addr.second % dst_addr.first % dst_addr.second;

        // Iterate through all link managers and check if they are capable of connecting
        // the requested endpoints. If no one can connect then the endpoints may actually
        // not share a common crossbar or we don't have enough connectivity in the
        // software session to reach the common crossbar.
        for (auto& kv : _link_mgrs) {
            if (kv.second->can_connect_device_to_device(dst_addr, src_addr)) {
                sep_id_pair_t epid_pair =
                    kv.second->connect_device_to_device(dst_addr, src_addr);
                UHD_LOGGER_DEBUG("RFNOC::GRAPH")
                    << boost::format("Connection from Endpoint %d:%d to Endpoint %d:%d "
                                     "completed through Device %d. Using "
                                     "EPIDs %d -> %d.")
                           % src_addr.first % src_addr.second % dst_addr.first
                           % dst_addr.second % kv.first % epid_pair.first
                           % epid_pair.second;
                stream_buff_params_t buff_params =
                    kv.second->create_device_to_device_data_stream(epid_pair.second,
                        epid_pair.first,
                        lossy_xport,
                        fc_freq_ratio,
                        fc_headroom_ratio,
                        reset);
                return std::make_tuple(epid_pair, buff_params);
            }
        }
        throw uhd::routing_error("The specified destination is unreachable from the "
                                 "specified source endpoint");
    }

    chdr_rx_data_xport::uptr create_device_to_host_data_stream(const sep_addr_t src_addr,
        const sw_buff_t pyld_buff_fmt,
        const sw_buff_t mdata_buff_fmt,
        const uhd::transport::adapter_id_t adapter,
        const device_addr_t& xport_args,
        const std::string& streamer_id) override
    {
        device_id_t dev = _check_dst_and_find_src(
            src_addr, adapter, uhd::transport::link_type_t::RX_DATA);
        uhd::transport::adapter_id_t chosen = _link_mgrs.at(dev)->get_adapter_id();
        auto allocs                         = _alloc_map.at(chosen);
        allocs.rx++;
        _alloc_map[chosen] = allocs;
        return _link_mgrs.at(dev)->create_device_to_host_data_stream(
            src_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
    }

    chdr_tx_data_xport::uptr create_host_to_device_data_stream(sep_addr_t dst_addr,
        const sw_buff_t pyld_buff_fmt,
        const sw_buff_t mdata_buff_fmt,
        const uhd::transport::adapter_id_t adapter,
        const device_addr_t& xport_args,
        const std::string& streamer_id) override
    {
        device_id_t dev = _check_dst_and_find_src(
            dst_addr, adapter, uhd::transport::link_type_t::TX_DATA);
        uhd::transport::adapter_id_t chosen = _link_mgrs.at(dev)->get_adapter_id();
        auto allocs                         = _alloc_map.at(chosen);
        allocs.tx++;
        _alloc_map[chosen] = allocs;
        return _link_mgrs.at(dev)->create_host_to_device_data_stream(
            dst_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
    }

    std::vector<uhd::transport::adapter_id_t> get_adapters(sep_addr_t addr) const override
    {
        auto adapters = std::vector<uhd::transport::adapter_id_t>();
        if (_src_map.count(addr) > 0) {
            const auto& src_devs = _src_map.at(addr);
            for (auto src : src_devs) {
                // Returns in order specified in device args
                // Assumption: the device_id_t will be incremented sequentially
                // and the std::map will then provide the link_stream_managers
                // in the same order as the adapters were specified
                adapters.push_back(_link_mgrs.at(src)->get_adapter_id());
            }
            return adapters;
        } else {
            throw uhd::rfnoc_error("Specified address is unreachable. No via_devices.");
        }
    }

private:
    device_id_t _check_dst_and_find_src(sep_addr_t dst_addr,
        uhd::transport::adapter_id_t adapter,
        uhd::transport::link_type_t link_type) const
    {
        if (_src_map.count(dst_addr) > 0) {
            const auto& src_devs = _src_map.at(dst_addr);
            if (adapter == uhd::transport::NULL_ADAPTER_ID) {
                // TODO: Maybe we can come up with a better heuristic for when the user
                // gives no preference
                auto dev       = src_devs[0];
                auto dev_alloc = _alloc_map.at(_link_mgrs.at(dev)->get_adapter_id());
                for (auto candidate : src_devs) {
                    auto candidate_alloc =
                        _alloc_map.at(_link_mgrs.at(candidate)->get_adapter_id());
                    switch (link_type) {
                        case uhd::transport::link_type_t::TX_DATA:
                            if (candidate_alloc.tx < dev_alloc.tx) {
                                dev       = candidate;
                                dev_alloc = candidate_alloc;
                            }
                            break;
                        case uhd::transport::link_type_t::RX_DATA:
                            if (candidate_alloc.rx < dev_alloc.rx) {
                                dev       = candidate;
                                dev_alloc = candidate_alloc;
                            }
                            break;
                        default:
                            // Just accept first device for CTRL and ASYNC_MSG
                            break;
                    }
                }
                return dev;
            } else {
                for (const auto& src : src_devs) {
                    if (_link_mgrs.at(src)->get_adapter_id() == adapter) {
                        return src;
                    }
                }
                throw uhd::rfnoc_error("Specified destination address is unreachable "
                                       "from the via device");
            }
        } else {
            throw uhd::rfnoc_error("Specified destination address is unreachable");
        }
    }

    // The cached EPID allocator object
    epid_allocator::sptr _epid_alloc;
    // A map the contains all link manager indexed by the device ID
    std::map<device_id_t, link_stream_manager::uptr> _link_mgrs;
    // A set of the addresses of all devices reachable from this graph
    std::set<sep_addr_t> _reachable_endpoints;
    // A map of addresses that can be taken to reach a particular destination
    std::map<sep_addr_t, std::vector<device_id_t>> _src_map;

    // Data used for heuristic to determine which link to use
    struct allocation_info
    {
        size_t rx;
        size_t tx;
    };

    // A map of allocations for each host transport adapter
    std::map<uhd::transport::adapter_id_t, allocation_info> _alloc_map;
};

graph_stream_manager::uptr graph_stream_manager::make(
    const chdr::chdr_packet_factory& pkt_factory,
    const epid_allocator::sptr& epid_alloc,
    const std::vector<std::pair<device_id_t, mb_iface*>>& links)
{
    return std::make_unique<graph_stream_manager_impl>(pkt_factory, epid_alloc, links);
}