diff options
Diffstat (limited to 'host/lib/rfnoc/mgmt_portal.cpp')
-rw-r--r-- | host/lib/rfnoc/mgmt_portal.cpp | 997 |
1 files changed, 997 insertions, 0 deletions
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp new file mode 100644 index 000000000..fcb19997e --- /dev/null +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -0,0 +1,997 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/chdr/chdr_packet.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <unordered_set> +#include <boost/format.hpp> +#include <cmath> +#include <mutex> +#include <queue> + +namespace uhd { namespace rfnoc { namespace mgmt { + +using namespace chdr; +using namespace transport; + +constexpr bool ALLOW_DAISY_CHAINING = false; + +constexpr uint16_t REG_EPID_SELF = 0x00; // RW +constexpr uint16_t REG_RESET_AND_FLUSH = 0x04; // W +constexpr uint16_t REG_OSTRM_CTRL_STATUS = 0x08; // RW +constexpr uint16_t REG_OSTRM_DST_EPID = 0x0C; // W +constexpr uint16_t REG_OSTRM_FC_FREQ_BYTES_LO = 0x10; // W +constexpr uint16_t REG_OSTRM_FC_FREQ_BYTES_HI = 0x14; // W +constexpr uint16_t REG_OSTRM_FC_FREQ_PKTS = 0x18; // W +constexpr uint16_t REG_OSTRM_FC_HEADROOM = 0x1C; // W +constexpr uint16_t REG_OSTRM_BUFF_CAP_BYTES_LO = 0x20; // R +constexpr uint16_t REG_OSTRM_BUFF_CAP_BYTES_HI = 0x24; // R +constexpr uint16_t REG_OSTRM_BUFF_CAP_PKTS = 0x28; // R +constexpr uint16_t REG_OSTRM_SEQ_ERR_CNT = 0x2C; // R +constexpr uint16_t REG_OSTRM_DATA_ERR_CNT = 0x30; // R +constexpr uint16_t REG_OSTRM_ROUTE_ERR_CNT = 0x34; // R +constexpr uint16_t REG_ISTRM_CTRL_STATUS = 0x38; // RW + +constexpr uint32_t RESET_AND_FLUSH_OSTRM = (1 << 0); +constexpr uint32_t RESET_AND_FLUSH_ISTRM = (1 << 1); +constexpr uint32_t RESET_AND_FLUSH_CTRL = (1 << 2); +constexpr uint32_t RESET_AND_FLUSH_ALL = 0x7; + +constexpr uint32_t BUILD_CTRL_STATUS_WORD(bool cfg_start, + bool xport_lossy, + mgmt_portal::sw_buff_t pyld_buff_fmt, + mgmt_portal::sw_buff_t mdata_buff_fmt) +{ + return (cfg_start ? 1 : 0) | (xport_lossy ? 2 : 0) + | (static_cast<uint32_t>(pyld_buff_fmt) << 2) + | (static_cast<uint32_t>(mdata_buff_fmt) << 4); +} + +constexpr uint32_t STRM_STATUS_FC_ENABLED = 0x80000000; +constexpr uint32_t STRM_STATUS_SETUP_ERR = 0x40000000; +constexpr uint32_t STRM_STATUS_SETUP_PENDING = 0x20000000; + +//! The type of a node in the data-flow graph +enum node_type { + //! Invalid type. The FPGA will never have a node with type = 0 + NODE_TYPE_INVALID = 0, + //! CHDR Crossbar + NODE_TYPE_XBAR = 1, + //! Stream Endpoint + NODE_TYPE_STRM_EP = 2, + //! Transport + NODE_TYPE_XPORT = 3 +}; + +//! A unique identifier for a node +struct node_id_t +{ + //! A unique ID for device that houses this node + device_id_t device_id; + //! The type of this node + node_type type; + //! The instance number of this node in the device + sep_inst_t inst; + //! Extended info about node (not used for comparisons) + uint32_t extended_info; + + // ctors and operators + node_id_t() = delete; + node_id_t(const node_id_t& rhs) = default; + node_id_t(device_id_t device_id_, node_type type_, sep_inst_t inst_) + : device_id(device_id_), type(type_), inst(inst_), extended_info(0) + { + } + node_id_t(device_id_t device_id_, + node_type type_, + sep_inst_t inst_, + uint32_t extended_info_) + : device_id(device_id_), type(type_), inst(inst_), extended_info(extended_info_) + { + } + node_id_t(const sep_addr_t& sep_addr) + : device_id(sep_addr.first) + , type(NODE_TYPE_STRM_EP) + , inst(sep_addr.second) + , extended_info(0) + { + } + + inline uint64_t unique_id() const + { + return (static_cast<uint64_t>(inst) + (static_cast<uint64_t>(device_id) << 16) + + (static_cast<uint64_t>(type) << 32)); + } + inline std::string to_string() const + { + static const std::map<node_type, std::string> NODE_STR = { + {NODE_TYPE_INVALID, "unknown"}, + {NODE_TYPE_XBAR, "xbar"}, + {NODE_TYPE_STRM_EP, "sep"}, + {NODE_TYPE_XPORT, "xport"}}; + return str( + boost::format("device:%d/%s:%d") % device_id % NODE_STR.at(type) % inst); + } + + inline friend bool operator<(const node_id_t& lhs, const node_id_t& rhs) + { + return (lhs.unique_id() < rhs.unique_id()); + } + inline friend bool operator==(const node_id_t& lhs, const node_id_t& rhs) + { + return (lhs.unique_id() == rhs.unique_id()); + } + inline friend bool operator!=(const node_id_t& lhs, const node_id_t& rhs) + { + return (lhs.unique_id() != rhs.unique_id()); + } + inline node_id_t& operator=(const node_id_t&) = default; +}; + +//! The local destination to take at the current node to reach the next node +// - If negative, then no specific action necessary +// - If non-negative, then route (select destination) to the value +using next_dest_t = int32_t; + +//! An address that allows locating a node in a data-flow network starting from +// a specific stream endpoint. The address is a collection (vector) of nodes and +// the respective routing decisions to get to the final node. +using node_addr_t = std::vector<std::pair<node_id_t, next_dest_t>>; + +std::string to_string(const node_addr_t& node_addr) +{ + if (!node_addr.empty()) { + std::string str(""); + for (const auto& hop : node_addr) { + str += hop.first.to_string() + std::string(",") + std::to_string(hop.second) + std::string("->"); + } + return str; + } else { + return std::string("<empty>"); + } +} + +// Empty dtor for stream_manager +mgmt_portal::~mgmt_portal() {} + +//--------------------------------------------------------------- +// Stream Manager Implementation +//--------------------------------------------------------------- +class mgmt_portal_impl : public mgmt_portal +{ +public: + mgmt_portal_impl(const both_xports_t& xports, + const chdr::chdr_packet_factory& pkt_factory, + uint16_t protover, + chdr_w_t chdr_w, + sep_id_t my_epid, + device_id_t my_device_id) + : _my_epid(my_epid) + , _protover(protover) + , _chdr_w(chdr_w) + , _my_node_id(my_device_id, NODE_TYPE_STRM_EP, my_epid) + , _recv_xport(xports.recv) + , _send_xport(xports.send) + , _send_seqnum(0) + , _send_pkt(std::move(pkt_factory.make_mgmt())) + , _recv_pkt(std::move(pkt_factory.make_mgmt())) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + _discover_topology(); + } + + virtual ~mgmt_portal_impl() {} + + virtual const std::vector<sep_addr_t>& get_reachable_endpoints() const + { + return _discovered_ep_vtr; + } + + virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // Create a node ID from lookup info + node_id_t lookup_node(addr.first, NODE_TYPE_STRM_EP, addr.second); + if (_node_addr_map.count(lookup_node) == 0) { + throw uhd::lookup_error( + "initialize_endpoint(): Cannot reach node with specified address."); + } + const node_addr_t& node_addr = _node_addr_map.at(lookup_node); + + // Build a management transaction to first get to the node + mgmt_payload cfg_xact; + cfg_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(cfg_xact, node_addr); + + mgmt_hop_t cfg_hop; + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_ALL))); + cfg_hop.add_op(mgmt_op_t( + mgmt_op_t::MGMT_OP_CFG_WR_REQ, mgmt_op_t::cfg_payload(REG_EPID_SELF, epid))); + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + cfg_xact.add_hop(cfg_hop); + + // Send the transaction and receive a response. + // We don't care about the contents of the response. + _send_recv_mgmt_transaction(cfg_xact); + + // Add/update the entry in the stream endpoint ID map + _epid_addr_map[epid] = addr; + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format("Bound stream endpoint with Addr=(%d,%d) to EPID=%d") + % addr.first % addr.second % epid)); + UHD_LOG_TRACE("RFNOC::MGMT", + (boost::format( + "Stream endpoint with EPID=%d can be reached by taking the path: %s") + % epid % to_string(node_addr))); + } + + virtual bool is_endpoint_initialized(const sep_id_t& epid) const + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + return (_epid_addr_map.count(epid) > 0); + } + + virtual sep_info_t get_endpoint_info(const sep_id_t& epid) const + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // Lookup the destination node address using the endpoint ID + if (_epid_addr_map.count(epid) == 0) { + throw uhd::lookup_error( + "get_endpoint_info(): Could not find a stream with specified ID."); + } + node_id_t lookup_node(_epid_addr_map.at(epid)); + // If a node is in _epid_addr_map then it must be in _node_addr_map + UHD_ASSERT_THROW(_node_addr_map.count(lookup_node) > 0); + // Why is key_node different from lookup_node? + // Because it has additional extended info (look at operator< def) + const node_id_t& key_node = _node_addr_map.find(lookup_node)->first; + + // Build a return val + sep_info_t retval; + retval.has_ctrl = (key_node.extended_info >> 0) & 0x1; + retval.has_data = (key_node.extended_info >> 1) & 0x1; + retval.num_input_ports = retval.has_data ? ((key_node.extended_info >> 2) & 0x3F) + : 0; + retval.num_output_ports = retval.has_data ? ((key_node.extended_info >> 8) & 0x3F) + : 0; + retval.reports_strm_errs = (key_node.extended_info >> 14) & 0x1; + retval.addr = _epid_addr_map.at(epid); + return retval; + } + + virtual void setup_local_route(const sep_id_t& dst_epid) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // Lookup the physical stream endpoint address using the endpoint ID + const node_addr_t& node_addr = _lookup_sep_node_addr(dst_epid); + + // Build a management transaction to configure all the nodes in the path going to + // dst_epid + mgmt_payload cfg_xact; + cfg_xact.set_header(_my_epid, _protover, _chdr_w); + for (const auto& addr_pair : node_addr) { + const node_id_t& curr_node = addr_pair.first; + const next_dest_t& curr_dest = addr_pair.second; + mgmt_hop_t curr_cfg_hop; + switch (curr_node.type) { + case NODE_TYPE_XBAR: { + // Configure the routing table to route all packets going to dst_epid + // to the port with index next_dest_t + curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(dst_epid, curr_dest))); + curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_SEL_DEST, + mgmt_op_t::sel_dest_payload(static_cast<uint16_t>(curr_dest)))); + } break; + case NODE_TYPE_XPORT: { + uint8_t node_subtype = + static_cast<uint8_t>(curr_node.extended_info & 0xFF); + // Run a hop configuration function for custom transports + if (_rtcfg_cfg_fns.count(node_subtype)) { + _rtcfg_cfg_fns.at(node_subtype)(curr_node.device_id, + curr_node.inst, + node_subtype, + curr_cfg_hop); + } else { + curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); + } + } break; + case NODE_TYPE_STRM_EP: { + // Stream are not involved in routing, so do nothing + } break; + default: { + UHD_THROW_INVALID_CODE_PATH(); + } break; + } + // Add this hop to the trancation only if it's not empty + if (curr_cfg_hop.get_num_ops() > 0) { + cfg_xact.add_hop(curr_cfg_hop); + } + } + + // If we follow this route then we should end up at a stream endpoint + // so add an extra hop and return the packet back with the node info we will + // sanity check it later + mgmt_hop_t discover_hop; + discover_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_INFO_REQ)); + discover_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + cfg_xact.add_hop(discover_hop); + + // Send the transaction and validate that we saw a stream endpoint + const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact); + const node_id_t sep_node = _pop_node_discovery_hop(sep_info_xact); + if (sep_node.type != NODE_TYPE_STRM_EP) { + throw uhd::routing_error( + "Route setup failed. Could not confirm terminal stream endpoint"); + } + + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format("Established a route from EPID=%d (SW) to EPID=%d") % _my_epid + % dst_epid)); + UHD_LOG_TRACE("RFNOC::MGMT", + (boost::format("The destination for EPID=%d has been added to all routers in " + "the path: %s") + % dst_epid % to_string(node_addr))); + } + + virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // Lookup the src and dst node address using the endpoint ID + const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid); + const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); + + // Find a common parent (could be faster than n^2 but meh, this is easier) + bool found_common_parent = false; + for (const auto& dnode : dst_node_addr) { + for (const auto& snode : src_node_addr) { + found_common_parent = + ((dnode.first == snode.first) && dnode.first.type == NODE_TYPE_XBAR); + if (found_common_parent) + break; + } + if (found_common_parent) + break; + } + if (!found_common_parent) { + throw uhd::routing_error("setup_remote_route: Route setup failed. The " + "endpoints don't share a common crossbar parent."); + } + + // If we setup local routes from this host to both the source and destination + // endpoints then the routing algorithm will guarantee that packet between src and + // dst will have a path between them as long as they share a common parent + // (crossbar). The assumption is verified above. It is also guaranteed that the + // path between them will be the shortest one. It is possible that we are + // configuring more crossbars than necessary but we do this for simplicity. If + // there is a need to optimize for routing table fullness, we can do a software + // graph traversal here, find the closest common parent (crossbar) for the two + // nodes and only configure the nodes downstream of that. + setup_local_route(dst_epid); + setup_local_route(src_epid); + + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format( + "The two routes above now enable a route from EPID=%d to EPID=%s") + % src_epid % dst_epid)); + } + + virtual void config_local_rx_stream_start(const sep_id_t& epid, + const bool lossy_xport, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const buff_params_t& fc_freq, + const buff_params_t& fc_headroom, + const bool reset = false) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // The discovery process has already setup a route from the + // destination to us. No additional action is necessary. + + const node_addr_t& node_addr = _lookup_sep_node_addr(epid); + + // Build a management transaction to first get to the node + mgmt_payload cfg_xact; + cfg_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(cfg_xact, node_addr); + + mgmt_hop_t cfg_hop; + // Assert reset if requested + if (reset) { + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_OSTRM))); + } + // Set destination of the stream to us (this endpoint) + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_DST_EPID, _my_epid))); + // Configure flow control parameters + _push_ostrm_flow_control_config( + lossy_xport, pyld_buff_fmt, mdata_buff_fmt, fc_freq, fc_headroom, cfg_hop); + // Return the packet back to us + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + + // Send the transaction and receive a response. + // We don't care about the contents of the response. + cfg_xact.add_hop(cfg_hop); + _send_recv_mgmt_transaction(cfg_xact); + + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format("Initiated RX stream setup for EPID=%d") % epid)); + } + + virtual buff_params_t config_local_rx_stream_commit( + const sep_id_t& epid, const double timeout = 0.2) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // Wait for stream configuration to finish on the HW side + const node_addr_t& node_addr = _lookup_sep_node_addr(epid); + _validate_stream_setup(node_addr, timeout); + + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format("Finished RX stream setup for EPID=%d") % epid)); + + // Return discovered buffer parameters + return std::get<1>(_get_ostrm_status(node_addr)); + } + + virtual void config_local_tx_stream(const sep_id_t& epid, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const bool reset = false) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // First setup a route between to the endpoint + setup_local_route(epid); + + const node_addr_t& node_addr = _lookup_sep_node_addr(epid); + + // Build a management transaction to first get to the node + mgmt_payload cfg_xact; + cfg_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(cfg_xact, node_addr); + + mgmt_hop_t cfg_hop; + // Assert reset if requested + if (reset) { + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_ISTRM))); + } + // Configure buffer types + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_ISTRM_CTRL_STATUS, + BUILD_CTRL_STATUS_WORD(false, false, pyld_buff_fmt, mdata_buff_fmt)))); + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + cfg_xact.add_hop(cfg_hop); + + // Send the transaction and receive a response. + // We don't care about the contents of the response. + _send_recv_mgmt_transaction(cfg_xact); + + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format("Finished TX stream setup for EPID=%d") % epid)); + } + + virtual buff_params_t config_remote_stream(const sep_id_t& dst_epid, + const sep_id_t& src_epid, + const bool lossy_xport, + const buff_params_t& fc_freq, + const buff_params_t& fc_headroom, + const bool reset = false, + const double timeout = 0.2) + { + std::lock_guard<std::recursive_mutex> lock(_mutex); + + // First setup a route between the two endpoints + setup_remote_route(dst_epid, src_epid); + + const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid); + const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); + + // If requested, send transactions to reset and flush endpoints + if (reset) { + // Reset source and destination (in that order) + for (size_t i = 0; i < 2; i++) { + mgmt_payload rst_xact; + rst_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(rst_xact, (i == 0) ? src_node_addr : dst_node_addr); + mgmt_hop_t rst_hop; + rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, + (i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM))); + rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + rst_xact.add_hop(rst_hop); + _send_recv_mgmt_transaction(rst_xact); + } + } + + // Build a management transaction to configure the source node + { + mgmt_payload cfg_xact; + cfg_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(cfg_xact, src_node_addr); + mgmt_hop_t cfg_hop; + // Set destination of the stream to dst_epid + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_DST_EPID, dst_epid))); + // Configure flow control parameters + _push_ostrm_flow_control_config( + lossy_xport, BUFF_U64, BUFF_U64, fc_freq, fc_headroom, cfg_hop); + // Return the packet back to us + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + + // Send the transaction and receive a response. + // We don't care about the contents of the response. + cfg_xact.add_hop(cfg_hop); + _send_recv_mgmt_transaction(cfg_xact); + } + + // Wait for stream configuration to finish on the HW side + _validate_stream_setup(src_node_addr, timeout); + + UHD_LOG_DEBUG("RFNOC::MGMT", + (boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid + % dst_epid)); + + // Return discovered buffer parameters + return std::get<1>(_get_ostrm_status(src_node_addr)); + } + + + virtual void register_xport_hop_cfg_fns(uint8_t xport_subtype, + xport_cfg_fn_t init_hop_cfg_fn, + xport_cfg_fn_t rtcfg_hop_cfg_fn) + { + _init_cfg_fns[xport_subtype] = init_hop_cfg_fn; + _rtcfg_cfg_fns[xport_subtype] = rtcfg_hop_cfg_fn; + } + + +private: // Functions + // Discover all nodes that are reachable from this software stream endpoint + void _discover_topology() + { + // Initialize a queue of pending paths. We will use this for a breadth-first + // traversal of the dataflow graph. The queue consists of a previously discovered + // node and the next destination to take from that node. + std::queue<std::pair<node_id_t, next_dest_t>> pending_paths; + + // Add ourselves to the the pending queue to kick off the search + UHD_LOG_DEBUG("RFNOC::MGMT", + "Starting topology discovery from " << _my_node_id.to_string()); + bool is_first_path = true; + pending_paths.push(std::make_pair(_my_node_id, next_dest_t(-1))); + + while (not pending_paths.empty()) { + // Pop the next path to discover from the pending queue + const auto& next_path = pending_paths.front(); + pending_paths.pop(); + + // We need to build a node_addr_t to allow us to get to next_path + // To do so we first lookup how to get to next_path.first. This location has + // already been discovered so we should just be able to look it up in + // _node_addr_map. The only exception for that is when we are just starting + // out, in which case our previous node is "us". + node_addr_t next_addr = is_first_path ? node_addr_t() + : _node_addr_map.at(next_path.first); + // Once we know how to get to the base node, then add the next destination + next_addr.push_back(next_path); + is_first_path = false; + + // Build a management transaction to first get to our destination so that we + // can ask it to identify itself + mgmt_payload route_xact; + route_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(route_xact, next_addr); + + // Discover downstream node (we ask the node to identify itself) + mgmt_payload disc_req_xact(route_xact); + // Push a node discovery hop + mgmt_hop_t disc_hop; + disc_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_INFO_REQ)); + disc_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + disc_req_xact.add_hop(disc_hop); + const mgmt_payload disc_resp_xact = + _send_recv_mgmt_transaction(disc_req_xact); + const node_id_t new_node = _pop_node_discovery_hop(disc_resp_xact); + + // We found a node! + // First check if we have already seen this node in the past. If not, we have + // to add it to our internal data structures. If we have already seen it then + // we just skip it. It is OK to skip the node because we are doing a BFS, + // which means that the first time a node is discovered during the traversal, + // the distance from this EP to that node will be the shortest path. The core + // design philosophy for RFNoC is that the data will always take the shortest + // path, because we make the assumption that a shorter path *always* has + // better QoS compared to a longer one. If this assumption is not true, we + // have to handle ordering by QoS for which we need to modify this search a + // bit and provide QoS preferences in the API. That may be a future feature. + if (_node_addr_map.count(new_node) > 0) { + UHD_LOG_DEBUG("RFNOC::MGMT", + "Re-discovered node " << new_node.to_string() << ". Skipping it"); + } else { + UHD_LOG_DEBUG("RFNOC::MGMT", "Discovered node " << new_node.to_string()); + _node_addr_map[new_node] = next_addr; + + // Initialize the node (first time config) + mgmt_payload init_req_xact(route_xact); + _push_node_init_hop(init_req_xact, new_node); + const mgmt_payload init_resp_xact = + _send_recv_mgmt_transaction(init_req_xact); + UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string()); + + // If the new node is a stream endpoint then we are done traversing this + // path. If not, then check all ports downstream of the new node and add + // them to pending_paths for further traversal + switch (new_node.type) { + case NODE_TYPE_XBAR: { + // Total ports on this crossbar + size_t nports = + static_cast<size_t>(new_node.extended_info & 0xFF); + // Total transport ports on this crossbar (the first nports_xport + // ports are transport ports) + size_t nports_xport = + static_cast<size_t>((new_node.extended_info >> 8) & 0xFF); + // When we allow daisy chaining, we need to recursively check + // other transports + size_t start_port = ALLOW_DAISY_CHAINING ? 0 : nports_xport; + for (size_t i = start_port; i < nports; i++) { + // Skip the current port because it's the input + if (i != static_cast<size_t>(new_node.inst)) { + // If there is a single downstream port then do nothing + pending_paths.push(std::make_pair( + new_node, static_cast<next_dest_t>(i))); + } + } + UHD_LOG_TRACE("RFNOC::MGMT", + "Discovered crossbar has " + << nports << " ports, " << nports_xport + << " transports and we are hooked up on port " + << new_node.inst); + } break; + case NODE_TYPE_STRM_EP: { + // Stop searching when we find a stream endpoint + // Add the endpoint to the discovered endpoint vector + _discovered_ep_vtr.push_back( + std::make_pair(new_node.device_id, new_node.inst)); + } break; + case NODE_TYPE_XPORT: { + // A transport has only one output. We don't need to take + // any action to reach + pending_paths.push(std::make_pair(new_node, -1)); + } break; + default: { + UHD_THROW_INVALID_CODE_PATH(); + break; + } + } + } + } + } + + // Add hops to the management transaction to reach the specified node + void _traverse_to_node(mgmt_payload& transaction, const node_addr_t& node_addr) + { + for (const auto& addr_pair : node_addr) { + const node_id_t& curr_node = addr_pair.first; + const next_dest_t& curr_dest = addr_pair.second; + if (curr_node.type != NODE_TYPE_STRM_EP) { + // If a node is a crossbar, then it have have a non-negative destination + UHD_ASSERT_THROW((curr_node.type != NODE_TYPE_XBAR || curr_dest >= 0)); + _push_advance_hop(transaction, curr_dest); + } else { + // This is a stream endpoint. Nothing needs to be done to advance + // here. The behavior of this operation is identical whether or + // not the stream endpoint is in software or not. + } + } + } + + // Add a hop to the transaction simply to get to the next node + void _push_advance_hop(mgmt_payload& transaction, const next_dest_t& next_dst) + { + if (next_dst >= 0) { + mgmt_hop_t sel_dest_hop; + sel_dest_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_SEL_DEST, + mgmt_op_t::sel_dest_payload(static_cast<uint16_t>(next_dst)))); + transaction.add_hop(sel_dest_hop); + } else { + mgmt_hop_t nop_hop; + nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); + transaction.add_hop(nop_hop); + } + } + + // Add operations to a hop to configure flow control for an output stream + void _push_ostrm_flow_control_config(const bool lossy_xport, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const buff_params_t& fc_freq, + const buff_params_t& fc_headroom, + mgmt_hop_t& hop) + { + // Validate flow control parameters + if (fc_freq.bytes >= (uint64_t(1) << 40) + || fc_freq.packets >= (uint64_t(1) << 24)) { + throw uhd::value_error("Flow control frequency parameters out of bounds"); + } + if (fc_headroom.bytes >= (uint64_t(1) << 16) + || fc_headroom.packets >= (uint64_t(1) << 8)) { + throw uhd::value_error("Flow control headroom parameters out of bounds"); + } + + // Add flow control parameters to hop + hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_FC_FREQ_BYTES_LO, + static_cast<uint32_t>(fc_freq.bytes & uint64_t(0xFFFFFFFF))))); + hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload( + REG_OSTRM_FC_FREQ_BYTES_HI, static_cast<uint32_t>(fc_freq.bytes >> 32)))); + hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload( + REG_OSTRM_FC_FREQ_PKTS, static_cast<uint32_t>(fc_freq.packets)))); + const uint32_t headroom_reg = + (static_cast<uint32_t>(fc_headroom.bytes) & 0xFFFF) + | ((static_cast<uint32_t>(fc_headroom.packets) & 0xFF) << 16); + hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_FC_HEADROOM, headroom_reg))); + // Configure buffer types and lossy_xport, then start configuration + hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_CTRL_STATUS, + BUILD_CTRL_STATUS_WORD( + true, lossy_xport, pyld_buff_fmt, mdata_buff_fmt)))); + } + + // Send/recv a management transaction that will get the output stream status + std::tuple<uint32_t, buff_params_t> _get_ostrm_status(const node_addr_t& node_addr) + { + // Build a management transaction to first get to the node + mgmt_payload status_xact; + status_xact.set_header(_my_epid, _protover, _chdr_w); + _traverse_to_node(status_xact, node_addr); + + // Read all the status registers + mgmt_hop_t cfg_hop; + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_CTRL_STATUS))); + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_BYTES_LO))); + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_BYTES_HI))); + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ, + mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_PKTS))); + cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + status_xact.add_hop(cfg_hop); + + // Send the transaction, receive a response and validate it + const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact); + if (resp_xact.get_num_hops() != 1) { + throw uhd::op_failed("Management operation failed. Incorrect format (hops)."); + } + const mgmt_hop_t& rhop = resp_xact.get_hop(0); + if (rhop.get_num_ops() <= 1 + || rhop.get_op(0).get_op_code() != mgmt_op_t::MGMT_OP_NOP) { + throw uhd::op_failed( + "Management operation failed. Incorrect format (operations)."); + } + for (size_t i = 1; i < rhop.get_num_ops(); i++) { + if (rhop.get_op(i).get_op_code() != mgmt_op_t::MGMT_OP_CFG_RD_RESP) { + throw uhd::op_failed( + "Management operation failed. Incorrect format (operations)."); + } + } + + // Extract peek data from transaction + mgmt_op_t::cfg_payload status_pl = rhop.get_op(1).get_op_payload(); + mgmt_op_t::cfg_payload cap_bytes_lo = rhop.get_op(2).get_op_payload(); + mgmt_op_t::cfg_payload cap_bytes_hi = rhop.get_op(3).get_op_payload(); + mgmt_op_t::cfg_payload cap_pkts = rhop.get_op(4).get_op_payload(); + + buff_params_t buff_params; + buff_params.bytes = static_cast<uint64_t>(cap_bytes_lo.data) + | (static_cast<uint64_t>(cap_bytes_hi.data) << 32); + buff_params.packets = static_cast<uint32_t>(cap_pkts.data); + return std::make_tuple(status_pl.data, buff_params); + } + + // Make sure that stream setup is complete and successful, else throw exception + void _validate_stream_setup(const node_addr_t& node_addr, const double timeout) + { + // Get the status of the output stream + uint32_t ostrm_status = 0; + double sleep_s = 0.05; + for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) { + ostrm_status = std::get<0>(_get_ostrm_status(node_addr)); + if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) { + // Wait and retry + std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000)); + } else { + // Configuration is done + break; + } + } + + if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) { + throw uhd::op_timeout("config_stream: Operation timed out"); + } + if ((ostrm_status & STRM_STATUS_SETUP_ERR) != 0) { + throw uhd::op_failed("config_stream: Setup failure"); + } + if ((ostrm_status & STRM_STATUS_FC_ENABLED) == 0) { + throw uhd::op_failed("config_stream: Flow control negotiation failed"); + } + } + + + // Pop a node discovery response from a transaction and parse it + const node_id_t _pop_node_discovery_hop(const mgmt_payload& transaction) + { + if (transaction.get_num_hops() != 1) { + throw uhd::op_failed("Management operation failed. Incorrect format (hops)."); + } + const mgmt_hop_t& rhop = transaction.get_hop(0); + const mgmt_op_t& nop_resp = rhop.get_op(0); + const mgmt_op_t& info_resp = rhop.get_op(1); + if (rhop.get_num_ops() <= 1 || nop_resp.get_op_code() != mgmt_op_t::MGMT_OP_NOP + || info_resp.get_op_code() != mgmt_op_t::MGMT_OP_INFO_RESP) { + throw uhd::op_failed( + "Management operation failed. Incorrect format (operations)."); + } + mgmt_op_t::node_info_payload resp_pl(info_resp.get_op_payload()); + return std::move(node_id_t(resp_pl.device_id, + static_cast<node_type>(resp_pl.node_type), + resp_pl.node_inst, + resp_pl.ext_info)); + } + + // Push a hop onto a transaction to initialize the current node + void _push_node_init_hop(mgmt_payload& transaction, const node_id_t& node) + { + mgmt_hop_t init_hop; + switch (node.type) { + case NODE_TYPE_XBAR: { + // Configure the routing table to route all packets going to _my_epid back + // to the port where the packet is entering + // The address for the transaction is the EPID and the data is the port # + init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ, + mgmt_op_t::cfg_payload(_my_epid, node.inst))); + } break; + case NODE_TYPE_STRM_EP: { + // Do nothing + init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); + } break; + case NODE_TYPE_XPORT: { + uint8_t node_subtype = static_cast<uint8_t>(node.extended_info & 0xFF); + // Run a hop configuration function for custom transports + if (_rtcfg_cfg_fns.count(node_subtype)) { + _rtcfg_cfg_fns.at(node_subtype)( + node.device_id, node.inst, node_subtype, init_hop); + } else { + // For a generic transport, just advertise the transaction to the + // outside world. The generic xport adapter will do the rest + init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_ADVERTISE)); + } + } break; + default: { + UHD_THROW_INVALID_CODE_PATH(); + } break; + } + init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); + transaction.add_hop(init_hop); + } + + // Lookup the full address of a stream endpoint node given the EPID + const node_addr_t& _lookup_sep_node_addr(const sep_id_t& epid) + { + // Lookup the destination node address using the endpoint ID + if (_epid_addr_map.count(epid) == 0) { + throw uhd::lookup_error( + "Could not find a stream endpoint with the requested ID."); + } + node_id_t sep_node(_epid_addr_map.at(epid)); + // If a node is in _epid_addr_map then it must be in _node_addr_map + UHD_ASSERT_THROW(_node_addr_map.count(sep_node) > 0); + return _node_addr_map.at(sep_node); + } + + // Send the specified management transaction to the device + void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1) + { + chdr_header header; + header.set_pkt_type(PKT_TYPE_MGMT); + header.set_num_mdata(0); + header.set_seq_num(_send_seqnum++); + header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8)); + header.set_dst_epid(0); + + managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout); + if (not send_buff) { + UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction"); + throw uhd::io_error("Timed out getting send buff for management transaction"); + } + _send_pkt->refresh(send_buff->cast<void*>(), header, payload); + send_buff->commit(header.get_length()); + } + + // Send the specified management transaction to the device and receive a response + const mgmt_payload _send_recv_mgmt_transaction( + const mgmt_payload& transaction, double timeout = 0.1) + { + mgmt_payload send(transaction); + send.set_header(_my_epid, _protover, _chdr_w); + // If we are expecting to receive a response then we have to add an additional + // NO-OP hop for the receive endpoint. All responses will be appended to this hop. + mgmt_hop_t nop_hop; + nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); + send.add_hop(nop_hop); + // Send the transaction over the wire + _send_mgmt_transaction(send); + + managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout); + if (not recv_buff) { + throw uhd::io_error("Timed out getting recv buff for management transaction"); + } + _recv_pkt->refresh(recv_buff->cast<void*>()); + mgmt_payload recv; + recv.set_header(_my_epid, _protover, _chdr_w); + _recv_pkt->fill_payload(recv); + return std::move(recv); + } + +private: // Members + // The endpoint ID of this software endpoint + const sep_id_t _my_epid; + // The software RFNoC protocol version + const uint16_t _protover; + // CHDR Width for this design/application + const chdr_w_t _chdr_w; + // The node ID for this software endpoint + const node_id_t _my_node_id; + // A table that maps a node_id_t to a node_addr_t. This map allows looking up the + // address of a node given the node ID. There may be multiple ways to get to the + // node but we only store the shortest path here. + std::map<node_id_t, node_addr_t> _node_addr_map; + // A list of all discovered endpoints + std::vector<sep_addr_t> _discovered_ep_vtr; + // A table that maps a stream endpoint ID to the physical address of the stream + // endpoint + std::map<sep_id_t, sep_addr_t> _epid_addr_map; + // Send/recv transports + uhd::transport::zero_copy_if::sptr _recv_xport; + uhd::transport::zero_copy_if::sptr _send_xport; + size_t _send_seqnum; + // Management packet containers + chdr_mgmt_packet::uptr _send_pkt; + chdr_mgmt_packet::cuptr _recv_pkt; + // Hop configuration function maps + std::map<uint8_t, xport_cfg_fn_t> _init_cfg_fns; + std::map<uint8_t, xport_cfg_fn_t> _rtcfg_cfg_fns; + // Mutex that protects all state in this class + mutable std::recursive_mutex _mutex; +}; // namespace mgmt + + +mgmt_portal::uptr mgmt_portal::make(const both_xports_t& xports, + const chdr::chdr_packet_factory& pkt_factory, + uint16_t protover, + chdr_w_t chdr_w, + sep_id_t epid, + device_id_t device_id) +{ + return std::make_unique<mgmt_portal_impl>( + xports, pkt_factory, protover, chdr_w, epid, device_id); +} + +}}} // namespace uhd::rfnoc::mgmt |