aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/mgmt_portal.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/mgmt_portal.cpp')
-rw-r--r--host/lib/rfnoc/mgmt_portal.cpp94
1 files changed, 50 insertions, 44 deletions
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp
index 79e297407..b490e0baf 100644
--- a/host/lib/rfnoc/mgmt_portal.cpp
+++ b/host/lib/rfnoc/mgmt_portal.cpp
@@ -7,6 +7,7 @@
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/mgmt_portal.hpp>
#include <unordered_set>
@@ -174,7 +175,7 @@ mgmt_portal::~mgmt_portal() {}
class mgmt_portal_impl : public mgmt_portal
{
public:
- mgmt_portal_impl(const chdr_ctrl_xport_t& xport,
+ mgmt_portal_impl(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid)
@@ -183,14 +184,12 @@ public:
, _chdr_w(pkt_factory.get_chdr_w())
, _endianness(pkt_factory.get_endianness())
, _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, my_epid)
- , _recv_xport(xport.recv)
- , _send_xport(xport.send)
, _send_seqnum(0)
, _send_pkt(std::move(pkt_factory.make_mgmt()))
, _recv_pkt(std::move(pkt_factory.make_mgmt()))
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
- _discover_topology();
+ _discover_topology(xport);
UHD_LOG_DEBUG("RFNOC::MGMT",
"The following endpoints are reachable from " << _my_node_id.to_string());
for (const auto& ep : _discovered_ep_set) {
@@ -205,7 +204,8 @@ public:
return _discovered_ep_set;
}
- virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid)
+ virtual void initialize_endpoint(
+ chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -232,7 +232,7 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
// Add/update the entry in the stream endpoint ID map
_epid_addr_map[epid] = addr;
@@ -280,7 +280,7 @@ public:
return retval;
}
- virtual void setup_local_route(const sep_id_t& dst_epid)
+ virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -339,7 +339,7 @@ public:
cfg_xact.add_hop(discover_hop);
// Send the transaction and validate that we saw a stream endpoint
- const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact);
+ const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact);
const node_id_t sep_node = _pop_node_discovery_hop(sep_info_xact);
if (sep_node.type != NODE_TYPE_STRM_EP) {
throw uhd::routing_error(
@@ -384,7 +384,8 @@ public:
return false;
}
- virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid)
+ virtual void setup_remote_route(
+ chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -412,8 +413,8 @@ public:
// there is a need to optimize for routing table fullness, we can do a software
// graph traversal here, find the closest common parent (crossbar) for the two
// nodes and only configure the nodes downstream of that.
- setup_local_route(dst_epid);
- setup_local_route(src_epid);
+ setup_local_route(xport, dst_epid);
+ setup_local_route(xport, src_epid);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format(
@@ -421,7 +422,8 @@ public:
% src_epid % dst_epid));
}
- virtual void config_local_rx_stream_start(const sep_id_t& epid,
+ virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -464,29 +466,30 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
cfg_xact.add_hop(cfg_hop);
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Initiated RX stream setup for EPID=%d") % epid));
}
virtual stream_buff_params_t config_local_rx_stream_commit(
- const sep_id_t& epid, const double timeout = 0.2)
+ chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
// Wait for stream configuration to finish on the HW side
const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
- _validate_stream_setup(node_addr, timeout);
+ _validate_stream_setup(xport, node_addr, timeout);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Finished RX stream setup for EPID=%d") % epid));
// Return discovered buffer parameters
- return std::get<1>(_get_ostrm_status(node_addr));
+ return std::get<1>(_get_ostrm_status(xport, node_addr));
}
- virtual void config_local_tx_stream(const sep_id_t& epid,
+ virtual void config_local_tx_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const bool reset = false)
@@ -494,7 +497,7 @@ public:
std::lock_guard<std::recursive_mutex> lock(_mutex);
// First setup a route between to the endpoint
- setup_local_route(epid);
+ setup_local_route(xport, epid);
const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
@@ -522,13 +525,14 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Finished TX stream setup for EPID=%d") % epid));
}
- virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid,
+ virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& dst_epid,
const sep_id_t& src_epid,
const bool lossy_xport,
const stream_buff_params_t& fc_freq,
@@ -539,7 +543,7 @@ public:
std::lock_guard<std::recursive_mutex> lock(_mutex);
// First setup a route between the two endpoints
- setup_remote_route(dst_epid, src_epid);
+ setup_remote_route(xport, dst_epid, src_epid);
const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid);
const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid);
@@ -557,7 +561,7 @@ public:
(i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM)));
rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
rst_xact.add_hop(rst_hop);
- _send_recv_mgmt_transaction(rst_xact);
+ _send_recv_mgmt_transaction(xport, rst_xact);
}
}
@@ -579,18 +583,18 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
cfg_xact.add_hop(cfg_hop);
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
}
// Wait for stream configuration to finish on the HW side
- _validate_stream_setup(src_node_addr, timeout);
+ _validate_stream_setup(xport, src_node_addr, timeout);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid
% dst_epid));
// Return discovered buffer parameters
- return std::get<1>(_get_ostrm_status(src_node_addr));
+ return std::get<1>(_get_ostrm_status(xport, src_node_addr));
}
@@ -605,7 +609,7 @@ public:
private: // Functions
// Discover all nodes that are reachable from this software stream endpoint
- void _discover_topology()
+ void _discover_topology(chdr_ctrl_xport& xport)
{
// Initialize a queue of pending paths. We will use this for a breadth-first
// traversal of the dataflow graph. The queue consists of a previously discovered
@@ -652,7 +656,7 @@ private: // Functions
try {
// Send the discovery transaction
const mgmt_payload disc_resp_xact =
- _send_recv_mgmt_transaction(disc_req_xact);
+ _send_recv_mgmt_transaction(xport, disc_req_xact);
new_node = _pop_node_discovery_hop(disc_resp_xact);
} catch (uhd::io_error& io_err) {
// We received an IO error. This could happen if we have a legitimate
@@ -695,7 +699,7 @@ private: // Functions
mgmt_payload init_req_xact(route_xact);
_push_node_init_hop(init_req_xact, new_node);
const mgmt_payload init_resp_xact =
- _send_recv_mgmt_transaction(init_req_xact);
+ _send_recv_mgmt_transaction(xport, init_req_xact);
UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string());
// If the new node is a stream endpoint then we are done traversing this
@@ -823,7 +827,7 @@ private: // Functions
// Send/recv a management transaction that will get the output stream status
std::tuple<uint32_t, stream_buff_params_t> _get_ostrm_status(
- const node_addr_t& node_addr)
+ chdr_ctrl_xport& xport, const node_addr_t& node_addr)
{
// Build a management transaction to first get to the node
mgmt_payload status_xact;
@@ -844,7 +848,7 @@ private: // Functions
status_xact.add_hop(cfg_hop);
// Send the transaction, receive a response and validate it
- const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact);
+ const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact);
if (resp_xact.get_num_hops() != 1) {
throw uhd::op_failed("Management operation failed. Incorrect format (hops).");
}
@@ -875,13 +879,14 @@ private: // Functions
}
// Make sure that stream setup is complete and successful, else throw exception
- void _validate_stream_setup(const node_addr_t& node_addr, const double timeout)
+ void _validate_stream_setup(
+ chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout)
{
// Get the status of the output stream
uint32_t ostrm_status = 0;
double sleep_s = 0.05;
for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) {
- ostrm_status = std::get<0>(_get_ostrm_status(node_addr));
+ ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr));
if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) {
// Wait and retry
std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000));
@@ -975,7 +980,8 @@ private: // Functions
}
// Send the specified management transaction to the device
- void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1)
+ void _send_mgmt_transaction(
+ chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1)
{
chdr_header header;
header.set_pkt_type(PKT_TYPE_MGMT);
@@ -984,18 +990,19 @@ private: // Functions
header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8));
header.set_dst_epid(0);
- managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout);
+ auto send_buff = xport.get_send_buff(timeout * 1000);
if (not send_buff) {
UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction");
throw uhd::io_error("Timed out getting send buff for management transaction");
}
- _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
- send_buff->commit(header.get_length());
+ _send_pkt->refresh(send_buff->data(), header, payload);
+ send_buff->set_packet_size(header.get_length());
+ xport.release_send_buff(std::move(send_buff));
}
// Send the specified management transaction to the device and receive a response
const mgmt_payload _send_recv_mgmt_transaction(
- const mgmt_payload& transaction, double timeout = 0.1)
+ chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1)
{
mgmt_payload send(transaction);
send.set_header(_my_epid, _protover, _chdr_w);
@@ -1005,17 +1012,18 @@ private: // Functions
nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
send.add_hop(nop_hop);
// Send the transaction over the wire
- _send_mgmt_transaction(send);
+ _send_mgmt_transaction(xport, send);
- managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout);
+ auto recv_buff = xport.get_mgmt_buff(timeout * 1000);
if (not recv_buff) {
throw uhd::io_error("Timed out getting recv buff for management transaction");
}
- _recv_pkt->refresh(recv_buff->cast<void*>());
+ _recv_pkt->refresh(recv_buff->data());
mgmt_payload recv;
recv.set_header(_my_epid, _protover, _chdr_w);
_recv_pkt->fill_payload(recv);
- return std::move(recv);
+ xport.release_recv_buff(std::move(recv_buff));
+ return recv;
}
private: // Members
@@ -1039,8 +1047,6 @@ private: // Members
// endpoint
std::map<sep_id_t, sep_addr_t> _epid_addr_map;
// Send/recv transports
- uhd::transport::zero_copy_if::sptr _recv_xport;
- uhd::transport::zero_copy_if::sptr _send_xport;
size_t _send_seqnum;
// Management packet containers
chdr_mgmt_packet::uptr _send_pkt;
@@ -1053,7 +1059,7 @@ private: // Members
}; // namespace mgmt
-mgmt_portal::uptr mgmt_portal::make(const chdr_ctrl_xport_t& xport,
+mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid)