diff options
Diffstat (limited to 'host/lib/rfnoc/mgmt_portal.cpp')
-rw-r--r-- | host/lib/rfnoc/mgmt_portal.cpp | 94 |
1 files changed, 50 insertions, 44 deletions
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp index 79e297407..b490e0baf 100644 --- a/host/lib/rfnoc/mgmt_portal.cpp +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -7,6 +7,7 @@ #include <uhd/exception.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/mgmt_portal.hpp> #include <unordered_set> @@ -174,7 +175,7 @@ mgmt_portal::~mgmt_portal() {} class mgmt_portal_impl : public mgmt_portal { public: - mgmt_portal_impl(const chdr_ctrl_xport_t& xport, + mgmt_portal_impl(chdr_ctrl_xport& xport, const chdr::chdr_packet_factory& pkt_factory, sep_addr_t my_sep_addr, sep_id_t my_epid) @@ -183,14 +184,12 @@ public: , _chdr_w(pkt_factory.get_chdr_w()) , _endianness(pkt_factory.get_endianness()) , _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, my_epid) - , _recv_xport(xport.recv) - , _send_xport(xport.send) , _send_seqnum(0) , _send_pkt(std::move(pkt_factory.make_mgmt())) , _recv_pkt(std::move(pkt_factory.make_mgmt())) { std::lock_guard<std::recursive_mutex> lock(_mutex); - _discover_topology(); + _discover_topology(xport); UHD_LOG_DEBUG("RFNOC::MGMT", "The following endpoints are reachable from " << _my_node_id.to_string()); for (const auto& ep : _discovered_ep_set) { @@ -205,7 +204,8 @@ public: return _discovered_ep_set; } - virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) + virtual void initialize_endpoint( + chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid) { std::lock_guard<std::recursive_mutex> lock(_mutex); @@ -232,7 +232,7 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); // Add/update the entry in the stream endpoint ID map _epid_addr_map[epid] = addr; @@ -280,7 +280,7 @@ public: return retval; } - virtual void setup_local_route(const sep_id_t& dst_epid) + virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid) { std::lock_guard<std::recursive_mutex> lock(_mutex); @@ -339,7 +339,7 @@ public: cfg_xact.add_hop(discover_hop); // Send the transaction and validate that we saw a stream endpoint - const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact); + const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact); const node_id_t sep_node = _pop_node_discovery_hop(sep_info_xact); if (sep_node.type != NODE_TYPE_STRM_EP) { throw uhd::routing_error( @@ -384,7 +384,8 @@ public: return false; } - virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid) + virtual void setup_remote_route( + chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid) { std::lock_guard<std::recursive_mutex> lock(_mutex); @@ -412,8 +413,8 @@ public: // there is a need to optimize for routing table fullness, we can do a software // graph traversal here, find the closest common parent (crossbar) for the two // nodes and only configure the nodes downstream of that. - setup_local_route(dst_epid); - setup_local_route(src_epid); + setup_local_route(xport, dst_epid); + setup_local_route(xport, src_epid); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format( @@ -421,7 +422,8 @@ public: % src_epid % dst_epid)); } - virtual void config_local_rx_stream_start(const sep_id_t& epid, + virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport, + const sep_id_t& epid, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, @@ -464,29 +466,30 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. cfg_xact.add_hop(cfg_hop); - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Initiated RX stream setup for EPID=%d") % epid)); } virtual stream_buff_params_t config_local_rx_stream_commit( - const sep_id_t& epid, const double timeout = 0.2) + chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2) { std::lock_guard<std::recursive_mutex> lock(_mutex); // Wait for stream configuration to finish on the HW side const node_addr_t& node_addr = _lookup_sep_node_addr(epid); - _validate_stream_setup(node_addr, timeout); + _validate_stream_setup(xport, node_addr, timeout); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Finished RX stream setup for EPID=%d") % epid)); // Return discovered buffer parameters - return std::get<1>(_get_ostrm_status(node_addr)); + return std::get<1>(_get_ostrm_status(xport, node_addr)); } - virtual void config_local_tx_stream(const sep_id_t& epid, + virtual void config_local_tx_stream(chdr_ctrl_xport& xport, + const sep_id_t& epid, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, const bool reset = false) @@ -494,7 +497,7 @@ public: std::lock_guard<std::recursive_mutex> lock(_mutex); // First setup a route between to the endpoint - setup_local_route(epid); + setup_local_route(xport, epid); const node_addr_t& node_addr = _lookup_sep_node_addr(epid); @@ -522,13 +525,14 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Finished TX stream setup for EPID=%d") % epid)); } - virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid, + virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport, + const sep_id_t& dst_epid, const sep_id_t& src_epid, const bool lossy_xport, const stream_buff_params_t& fc_freq, @@ -539,7 +543,7 @@ public: std::lock_guard<std::recursive_mutex> lock(_mutex); // First setup a route between the two endpoints - setup_remote_route(dst_epid, src_epid); + setup_remote_route(xport, dst_epid, src_epid); const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid); const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); @@ -557,7 +561,7 @@ public: (i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM))); rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); rst_xact.add_hop(rst_hop); - _send_recv_mgmt_transaction(rst_xact); + _send_recv_mgmt_transaction(xport, rst_xact); } } @@ -579,18 +583,18 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. cfg_xact.add_hop(cfg_hop); - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); } // Wait for stream configuration to finish on the HW side - _validate_stream_setup(src_node_addr, timeout); + _validate_stream_setup(xport, src_node_addr, timeout); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid % dst_epid)); // Return discovered buffer parameters - return std::get<1>(_get_ostrm_status(src_node_addr)); + return std::get<1>(_get_ostrm_status(xport, src_node_addr)); } @@ -605,7 +609,7 @@ public: private: // Functions // Discover all nodes that are reachable from this software stream endpoint - void _discover_topology() + void _discover_topology(chdr_ctrl_xport& xport) { // Initialize a queue of pending paths. We will use this for a breadth-first // traversal of the dataflow graph. The queue consists of a previously discovered @@ -652,7 +656,7 @@ private: // Functions try { // Send the discovery transaction const mgmt_payload disc_resp_xact = - _send_recv_mgmt_transaction(disc_req_xact); + _send_recv_mgmt_transaction(xport, disc_req_xact); new_node = _pop_node_discovery_hop(disc_resp_xact); } catch (uhd::io_error& io_err) { // We received an IO error. This could happen if we have a legitimate @@ -695,7 +699,7 @@ private: // Functions mgmt_payload init_req_xact(route_xact); _push_node_init_hop(init_req_xact, new_node); const mgmt_payload init_resp_xact = - _send_recv_mgmt_transaction(init_req_xact); + _send_recv_mgmt_transaction(xport, init_req_xact); UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string()); // If the new node is a stream endpoint then we are done traversing this @@ -823,7 +827,7 @@ private: // Functions // Send/recv a management transaction that will get the output stream status std::tuple<uint32_t, stream_buff_params_t> _get_ostrm_status( - const node_addr_t& node_addr) + chdr_ctrl_xport& xport, const node_addr_t& node_addr) { // Build a management transaction to first get to the node mgmt_payload status_xact; @@ -844,7 +848,7 @@ private: // Functions status_xact.add_hop(cfg_hop); // Send the transaction, receive a response and validate it - const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact); + const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact); if (resp_xact.get_num_hops() != 1) { throw uhd::op_failed("Management operation failed. Incorrect format (hops)."); } @@ -875,13 +879,14 @@ private: // Functions } // Make sure that stream setup is complete and successful, else throw exception - void _validate_stream_setup(const node_addr_t& node_addr, const double timeout) + void _validate_stream_setup( + chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout) { // Get the status of the output stream uint32_t ostrm_status = 0; double sleep_s = 0.05; for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) { - ostrm_status = std::get<0>(_get_ostrm_status(node_addr)); + ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr)); if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) { // Wait and retry std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000)); @@ -975,7 +980,8 @@ private: // Functions } // Send the specified management transaction to the device - void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1) + void _send_mgmt_transaction( + chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1) { chdr_header header; header.set_pkt_type(PKT_TYPE_MGMT); @@ -984,18 +990,19 @@ private: // Functions header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8)); header.set_dst_epid(0); - managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout); + auto send_buff = xport.get_send_buff(timeout * 1000); if (not send_buff) { UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction"); throw uhd::io_error("Timed out getting send buff for management transaction"); } - _send_pkt->refresh(send_buff->cast<void*>(), header, payload); - send_buff->commit(header.get_length()); + _send_pkt->refresh(send_buff->data(), header, payload); + send_buff->set_packet_size(header.get_length()); + xport.release_send_buff(std::move(send_buff)); } // Send the specified management transaction to the device and receive a response const mgmt_payload _send_recv_mgmt_transaction( - const mgmt_payload& transaction, double timeout = 0.1) + chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1) { mgmt_payload send(transaction); send.set_header(_my_epid, _protover, _chdr_w); @@ -1005,17 +1012,18 @@ private: // Functions nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); send.add_hop(nop_hop); // Send the transaction over the wire - _send_mgmt_transaction(send); + _send_mgmt_transaction(xport, send); - managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout); + auto recv_buff = xport.get_mgmt_buff(timeout * 1000); if (not recv_buff) { throw uhd::io_error("Timed out getting recv buff for management transaction"); } - _recv_pkt->refresh(recv_buff->cast<void*>()); + _recv_pkt->refresh(recv_buff->data()); mgmt_payload recv; recv.set_header(_my_epid, _protover, _chdr_w); _recv_pkt->fill_payload(recv); - return std::move(recv); + xport.release_recv_buff(std::move(recv_buff)); + return recv; } private: // Members @@ -1039,8 +1047,6 @@ private: // Members // endpoint std::map<sep_id_t, sep_addr_t> _epid_addr_map; // Send/recv transports - uhd::transport::zero_copy_if::sptr _recv_xport; - uhd::transport::zero_copy_if::sptr _send_xport; size_t _send_seqnum; // Management packet containers chdr_mgmt_packet::uptr _send_pkt; @@ -1053,7 +1059,7 @@ private: // Members }; // namespace mgmt -mgmt_portal::uptr mgmt_portal::make(const chdr_ctrl_xport_t& xport, +mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport, const chdr::chdr_packet_factory& pkt_factory, sep_addr_t my_sep_addr, sep_id_t my_epid) |