aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/rfnoc/graph_stream_manager.cpp77
1 files changed, 65 insertions, 12 deletions
diff --git a/host/lib/rfnoc/graph_stream_manager.cpp b/host/lib/rfnoc/graph_stream_manager.cpp
index fe28eab54..f3c42c87f 100644
--- a/host/lib/rfnoc/graph_stream_manager.cpp
+++ b/host/lib/rfnoc/graph_stream_manager.cpp
@@ -8,7 +8,7 @@
#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/graph_stream_manager.hpp>
#include <uhdlib/rfnoc/link_stream_manager.hpp>
-#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/transport/links.hpp>
#include <boost/format.hpp>
#include <boost/make_shared.hpp>
#include <map>
@@ -18,6 +18,9 @@ using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::rfnoc::chdr;
+namespace uhd { namespace rfnoc { namespace detail {
+}}} // uhd::rfnoc::detail
+
graph_stream_manager::~graph_stream_manager() = default;
class graph_stream_manager_impl : public graph_stream_manager
@@ -33,6 +36,9 @@ public:
_link_mgrs.emplace(lnk.first,
std::move(link_stream_manager::make(
pkt_factory, *lnk.second, epid_alloc, lnk.first)));
+ if (_alloc_map.count(lnk.first) == 0) {
+ _alloc_map[lnk.first] = allocation_info{0, 0};
+ }
}
for (const auto& mgr_pair : _link_mgrs) {
for (const auto& ep : mgr_pair.second->get_reachable_endpoints()) {
@@ -73,7 +79,8 @@ public:
// When we connect, we setup a route and fire up a control stream between
// the endpoints
- device_id_t gateway = _check_dst_and_find_src(dst_addr, host_device);
+ device_id_t gateway = _check_dst_and_find_src(dst_addr, host_device,
+ uhd::transport::link_type_t::CTRL);
sep_id_pair_t epid_pair =
_link_mgrs.at(gateway)->connect_host_to_device(dst_addr);
UHD_LOGGER_DEBUG("RFNOC::GRAPH")
@@ -122,8 +129,10 @@ public:
{
// We must be connected to dst_addr before getting a register iface
sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
- return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device))
- ->get_block_register_iface(dst_epid, block_index, client_clk, timebase_clk);
+ auto dev = _check_dst_and_find_src(dst_addr, via_device,
+ uhd::transport::link_type_t::CTRL);
+ return _link_mgrs.at(dev)->get_block_register_iface(dst_epid, block_index,
+ client_clk, timebase_clk);
}
virtual detail::client_zero::sptr get_client_zero(
@@ -131,8 +140,9 @@ public:
{
// We must be connected to dst_addr before getting a client zero
sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
- return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device))
- ->get_client_zero(dst_epid);
+ auto dev = _check_dst_and_find_src(dst_addr, via_device,
+ uhd::transport::link_type_t::CTRL);
+ return _link_mgrs.at(dev)->get_client_zero(dst_epid);
}
virtual std::tuple<sep_id_pair_t, stream_buff_params_t>
@@ -184,8 +194,12 @@ public:
const device_id_t via_device,
const device_addr_t& xport_args)
{
- return _link_mgrs.at(_check_dst_and_find_src(src_addr, via_device))
- ->create_device_to_host_data_stream(src_addr,
+ auto dev = _check_dst_and_find_src(src_addr, via_device,
+ uhd::transport::link_type_t::RX_DATA);
+ auto allocs = _alloc_map.at(dev);
+ allocs.rx++;
+ _alloc_map[dev] = allocs;
+ return _link_mgrs.at(dev)->create_device_to_host_data_stream(src_addr,
pyld_buff_fmt,
mdata_buff_fmt,
xport_args);
@@ -198,8 +212,12 @@ public:
const device_id_t via_device,
const device_addr_t& xport_args)
{
- return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device))
- ->create_host_to_device_data_stream(dst_addr,
+ auto dev = _check_dst_and_find_src(dst_addr, via_device,
+ uhd::transport::link_type_t::TX_DATA);
+ auto allocs = _alloc_map.at(dev);
+ allocs.tx++;
+ _alloc_map[dev] = allocs;
+ return _link_mgrs.at(dev)->create_host_to_device_data_stream(dst_addr,
pyld_buff_fmt,
mdata_buff_fmt,
xport_args);
@@ -214,14 +232,37 @@ public:
}
}
private:
- device_id_t _check_dst_and_find_src(sep_addr_t dst_addr, device_id_t via_device) const
+ device_id_t _check_dst_and_find_src(sep_addr_t dst_addr, device_id_t via_device,
+ uhd::transport::link_type_t link_type) const
{
if (_src_map.count(dst_addr) > 0) {
const auto& src_devs = _src_map.at(dst_addr);
if (via_device == NULL_DEVICE_ID) {
// TODO: Maybe we can come up with a better heuristic for when the user
// gives no preference
- return src_devs[0];
+ auto dev = src_devs[0];
+ auto dev_alloc = _alloc_map.at(dev);
+ for (auto candidate : src_devs) {
+ auto candidate_alloc = _alloc_map.at(candidate);
+ switch (link_type) {
+ case uhd::transport::link_type_t::TX_DATA:
+ if (candidate_alloc.tx < dev_alloc.tx) {
+ dev = candidate;
+ dev_alloc = candidate_alloc;
+ }
+ break;
+ case uhd::transport::link_type_t::RX_DATA:
+ if (candidate_alloc.rx < dev_alloc.rx) {
+ dev = candidate;
+ dev_alloc = candidate_alloc;
+ }
+ break;
+ default:
+ // Just accept first device for CTRL and ASYNC_MSG
+ break;
+ }
+ }
+ return dev;
} else {
for (const auto& src : src_devs) {
if (src == via_device) {
@@ -244,6 +285,18 @@ private:
std::set<sep_addr_t> _reachable_endpoints;
// A map of addresses that can be taken to reach a particular destination
std::map<sep_addr_t, std::vector<device_id_t>> _src_map;
+
+ // Data used for heuristic to determine which link to use
+ struct allocation_info {
+ size_t rx;
+ size_t tx;
+ };
+
+ // A map of allocations for each local_device
+ // NOTE: Multiple local device IDs can refer to the same local xport adapter.
+ // The scheme does not account for a NIC that is used to connect to multiple
+ // remote devices, for example.
+ std::map<device_id_t, allocation_info> _alloc_map;
};
graph_stream_manager::uptr graph_stream_manager::make(