aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2019-06-04 15:35:44 -0700
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:27 -0800
commit20baa413a08cdf42ec30d6bc0aeb0c665ee590fe (patch)
treefe0cc0baa5658688d0690c07d4cf9bfc2e6094b5 /host/lib/rfnoc
parent53477bfc7ffb2dc09143e1c9e8e431efc8ada957 (diff)
downloaduhd-20baa413a08cdf42ec30d6bc0aeb0c665ee590fe.tar.gz
uhd-20baa413a08cdf42ec30d6bc0aeb0c665ee590fe.tar.bz2
uhd-20baa413a08cdf42ec30d6bc0aeb0c665ee590fe.zip
rfnoc: Make a chdr_ctrl_xport using the new link APIs
These changes add APIs to instantiate the new transports. However, only the control/management transport is currently implemented. It uses the chdr_ctrl_xport. Also update the mgmt_portal to use an ephemeral reference to the shared transport, to indicate that it has no ownership of the transport's memory.
Diffstat (limited to 'host/lib/rfnoc')
-rw-r--r--host/lib/rfnoc/chdr_ctrl_endpoint.cpp34
-rw-r--r--host/lib/rfnoc/link_stream_manager.cpp79
-rw-r--r--host/lib/rfnoc/mgmt_portal.cpp94
3 files changed, 106 insertions, 101 deletions
diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
index d1d1dccca..d3c7cd58f 100644
--- a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
+++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
@@ -27,7 +27,7 @@ chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;
class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
{
public:
- chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport,
+ chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
: _my_epid(my_epid)
@@ -57,7 +57,14 @@ public:
// there are no timed blocks on the underlying.
_recv_thread.join();
// Flush base transport
- while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/;
+ while (true) {
+ auto buff = _xport->get_recv_buff(100);
+ if (buff) {
+ _xport->release_recv_buff(std::move(buff));
+ } else {
+ break;
+ }
+ }
// Release child endpoints
_endpoint_map.clear(););
}
@@ -82,9 +89,10 @@ public:
header.set_dst_epid(dst_epid);
// Acquire send buffer and send the packet
std::lock_guard<std::mutex> lock(_send_mutex);
- auto send_buff = _xport.send->get_send_buff(timeout);
- _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
- send_buff->commit(header.get_length());
+ auto send_buff = _xport->get_send_buff(timeout * 1000);
+ _send_pkt->refresh(send_buff->data(), header, payload);
+ send_buff->set_packet_size(header.get_length());
+ _xport->release_send_buff(std::move(send_buff));
};
if (_endpoint_map.find(key) == _endpoint_map.end()) {
@@ -118,11 +126,14 @@ private:
// - Route them based on the dst_port
// - Pass them to the ctrlport_endpoint for additional processing
while (not _stop_recv_thread) {
- auto buff = _xport.recv->get_recv_buff(0.0);
+ // FIXME Move lock back once have threaded_io_service
+ std::unique_lock<std::mutex> lock(_mutex);
+ auto buff = _xport->get_recv_buff(0);
if (buff) {
- std::lock_guard<std::mutex> lock(_mutex);
+ // FIXME Move lock back to here once have threaded_io_service
+ // std::lock_guard<std::mutex> lock(_mutex);
try {
- _recv_pkt->refresh(buff->cast<void*>());
+ _recv_pkt->refresh(buff->data());
const ctrl_payload payload = _recv_pkt->get_payload();
ep_map_key_t key{payload.src_epid, payload.dst_port};
if (_endpoint_map.find(key) != _endpoint_map.end()) {
@@ -131,7 +142,10 @@ private:
} catch (...) {
// Ignore all errors
}
+ _xport->release_recv_buff(std::move(buff));
} else {
+ // FIXME Move lock back to lock_guard once have threaded_io_service
+ lock.unlock();
// Be a good citizen and yield if no packet is processed
static const size_t MIN_DUR = 1;
boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
@@ -154,7 +168,7 @@ private:
// The endpoint ID of this software endpoint
const sep_id_t _my_epid;
// Send/recv transports
- const chdr_ctrl_xport_t _xport;
+ chdr_ctrl_xport::sptr _xport;
// The curent sequence number for a send packet
size_t _send_seqnum = 0;
// The number of packets dropped
@@ -173,7 +187,7 @@ private:
std::mutex _send_mutex;
};
-chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport,
+chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
{
diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp
index bd330e313..4fe183529 100644
--- a/host/lib/rfnoc/link_stream_manager.cpp
+++ b/host/lib/rfnoc/link_stream_manager.cpp
@@ -63,31 +63,10 @@ public:
// chdr_ctrl_endpoint. We have to use the same base transport here to ensure that
// the route setup logic in the FPGA transport works correctly.
// TODO: This needs to be cleaned up. A muxed_zero_copy_if is excessive here
- chdr_ctrl_xport_t base_xport =
- _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid);
- UHD_ASSERT_THROW(base_xport.send.get() == base_xport.recv.get())
-
- auto classify_fn = [&pkt_factory](void* buff, size_t) -> uint32_t {
- if (buff) {
- chdr_packet::cuptr pkt = pkt_factory.make_generic();
- pkt->refresh(buff);
- return (pkt->get_chdr_header().get_pkt_type() == PKT_TYPE_MGMT) ? 0 : 1;
- } else {
- throw uhd::assertion_error("null pointer");
- }
- };
- _muxed_xport = muxed_zero_copy_if::make(base_xport.send, classify_fn, 2);
-
- // Create child transports
- chdr_ctrl_xport_t mgmt_xport = base_xport;
- mgmt_xport.send = _muxed_xport->make_stream(0);
- mgmt_xport.recv = mgmt_xport.send;
- _ctrl_xport = base_xport;
- _ctrl_xport.send = _muxed_xport->make_stream(1);
- _ctrl_xport.recv = _ctrl_xport.send;
+ _ctrl_xport = _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid);
// Create management portal using one of the child transports
- _mgmt_portal = mgmt_portal::make(mgmt_xport,
+ _mgmt_portal = mgmt_portal::make(*_ctrl_xport,
_pkt_factory,
sep_addr_t(_my_device_id, SEP_INST_MGMT_CTRL),
_my_mgmt_ctrl_epid);
@@ -131,8 +110,8 @@ public:
}
// Setup a route to the EPID
- _mgmt_portal->initialize_endpoint(dst_addr, dst_epid);
- _mgmt_portal->setup_local_route(dst_epid);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid);
+ _mgmt_portal->setup_local_route(*_ctrl_xport, dst_epid);
if (!_mgmt_portal->get_endpoint_info(dst_epid).has_ctrl) {
throw uhd::rfnoc_error(
"Downstream endpoint does not support control traffic");
@@ -157,10 +136,10 @@ public:
sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr);
// Initialize endpoints
- _mgmt_portal->initialize_endpoint(dst_addr, dst_epid);
- _mgmt_portal->initialize_endpoint(src_addr, src_epid);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid);
- _mgmt_portal->setup_remote_route(dst_epid, src_epid);
+ _mgmt_portal->setup_remote_route(*_ctrl_xport, dst_epid, src_epid);
return sep_id_pair_t(src_epid, dst_epid);
}
@@ -214,13 +193,15 @@ public:
// EPIDs)
// Setup a stream
- stream_buff_params_t buff_params = _mgmt_portal->config_remote_stream(dst_epid,
- src_epid,
- lossy_xport,
- stream_buff_params_t{1, 1}, // Dummy frequency
- stream_buff_params_t{0, 0}, // Dummy headroom
- false,
- STREAM_SETUP_TIMEOUT);
+ stream_buff_params_t buff_params =
+ _mgmt_portal->config_remote_stream(*_ctrl_xport,
+ dst_epid,
+ src_epid,
+ lossy_xport,
+ stream_buff_params_t{1, 1}, // Dummy frequency
+ stream_buff_params_t{0, 0}, // Dummy headroom
+ false,
+ STREAM_SETUP_TIMEOUT);
// Compute FC frequency and headroom based on buff parameters
stream_buff_params_t fc_freq{
@@ -234,7 +215,8 @@ public:
std::ceil(double(buff_params.packets) * fc_headroom_ratio))};
// Reconfigure flow control using the new frequency and headroom
- return _mgmt_portal->config_remote_stream(dst_epid,
+ return _mgmt_portal->config_remote_stream(*_ctrl_xport,
+ dst_epid,
src_epid,
lossy_xport,
fc_freq,
@@ -243,7 +225,8 @@ public:
STREAM_SETUP_TIMEOUT);
}
- virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr,
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ const sep_addr_t dst_addr,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -261,21 +244,22 @@ public:
sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr);
// Create the data transport that we will return to the client
- chdr_data_xport_t xport =
- _mb_iface.make_data_transport(_my_device_id, src_epid, xport_args);
- xport.src_epid = src_epid;
- xport.dst_epid = dst_epid;
+ chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport(
+ _my_device_id, src_epid, dst_epid, xport_args);
+
+ chdr_ctrl_xport::sptr mgmt_xport =
+ _mb_iface.make_ctrl_transport(_my_device_id, src_epid);
// Create new temporary management portal with the transports used for this stream
// TODO: This is a bit excessive. Maybe we can pair down the functionality of the
// portal just for route setup purposes. Whatever we do, we *must* use xport in it
// though otherwise the transport will not behave correctly.
mgmt_portal::uptr data_mgmt_portal =
- mgmt_portal::make(xport, _pkt_factory, sw_epid_addr, src_epid);
+ mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid);
// Setup a route to the EPID
- data_mgmt_portal->initialize_endpoint(dst_addr, dst_epid);
- data_mgmt_portal->setup_local_route(dst_epid);
+ data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid);
+ data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid);
if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) {
throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");
}
@@ -288,7 +272,8 @@ public:
return xport;
}
- virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr,
+ virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream(
+ const sep_addr_t src_addr,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -297,6 +282,7 @@ public:
const device_addr_t& xport_args)
{
// TODO: Implement me
+ return chdr_tx_data_xport::uptr();
}
private:
@@ -323,8 +309,7 @@ private:
// The software EPID for all management and control traffic
sep_id_t _my_mgmt_ctrl_epid;
// Transports
- muxed_zero_copy_if::sptr _muxed_xport;
- chdr_ctrl_xport_t _ctrl_xport;
+ chdr_ctrl_xport::sptr _ctrl_xport;
// Management portal for control endpoints
mgmt_portal::uptr _mgmt_portal;
// The CHDR control endpoint
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp
index 79e297407..b490e0baf 100644
--- a/host/lib/rfnoc/mgmt_portal.cpp
+++ b/host/lib/rfnoc/mgmt_portal.cpp
@@ -7,6 +7,7 @@
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/mgmt_portal.hpp>
#include <unordered_set>
@@ -174,7 +175,7 @@ mgmt_portal::~mgmt_portal() {}
class mgmt_portal_impl : public mgmt_portal
{
public:
- mgmt_portal_impl(const chdr_ctrl_xport_t& xport,
+ mgmt_portal_impl(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid)
@@ -183,14 +184,12 @@ public:
, _chdr_w(pkt_factory.get_chdr_w())
, _endianness(pkt_factory.get_endianness())
, _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, my_epid)
- , _recv_xport(xport.recv)
- , _send_xport(xport.send)
, _send_seqnum(0)
, _send_pkt(std::move(pkt_factory.make_mgmt()))
, _recv_pkt(std::move(pkt_factory.make_mgmt()))
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
- _discover_topology();
+ _discover_topology(xport);
UHD_LOG_DEBUG("RFNOC::MGMT",
"The following endpoints are reachable from " << _my_node_id.to_string());
for (const auto& ep : _discovered_ep_set) {
@@ -205,7 +204,8 @@ public:
return _discovered_ep_set;
}
- virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid)
+ virtual void initialize_endpoint(
+ chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -232,7 +232,7 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
// Add/update the entry in the stream endpoint ID map
_epid_addr_map[epid] = addr;
@@ -280,7 +280,7 @@ public:
return retval;
}
- virtual void setup_local_route(const sep_id_t& dst_epid)
+ virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -339,7 +339,7 @@ public:
cfg_xact.add_hop(discover_hop);
// Send the transaction and validate that we saw a stream endpoint
- const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact);
+ const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact);
const node_id_t sep_node = _pop_node_discovery_hop(sep_info_xact);
if (sep_node.type != NODE_TYPE_STRM_EP) {
throw uhd::routing_error(
@@ -384,7 +384,8 @@ public:
return false;
}
- virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid)
+ virtual void setup_remote_route(
+ chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -412,8 +413,8 @@ public:
// there is a need to optimize for routing table fullness, we can do a software
// graph traversal here, find the closest common parent (crossbar) for the two
// nodes and only configure the nodes downstream of that.
- setup_local_route(dst_epid);
- setup_local_route(src_epid);
+ setup_local_route(xport, dst_epid);
+ setup_local_route(xport, src_epid);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format(
@@ -421,7 +422,8 @@ public:
% src_epid % dst_epid));
}
- virtual void config_local_rx_stream_start(const sep_id_t& epid,
+ virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -464,29 +466,30 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
cfg_xact.add_hop(cfg_hop);
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Initiated RX stream setup for EPID=%d") % epid));
}
virtual stream_buff_params_t config_local_rx_stream_commit(
- const sep_id_t& epid, const double timeout = 0.2)
+ chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
// Wait for stream configuration to finish on the HW side
const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
- _validate_stream_setup(node_addr, timeout);
+ _validate_stream_setup(xport, node_addr, timeout);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Finished RX stream setup for EPID=%d") % epid));
// Return discovered buffer parameters
- return std::get<1>(_get_ostrm_status(node_addr));
+ return std::get<1>(_get_ostrm_status(xport, node_addr));
}
- virtual void config_local_tx_stream(const sep_id_t& epid,
+ virtual void config_local_tx_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const bool reset = false)
@@ -494,7 +497,7 @@ public:
std::lock_guard<std::recursive_mutex> lock(_mutex);
// First setup a route between to the endpoint
- setup_local_route(epid);
+ setup_local_route(xport, epid);
const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
@@ -522,13 +525,14 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Finished TX stream setup for EPID=%d") % epid));
}
- virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid,
+ virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& dst_epid,
const sep_id_t& src_epid,
const bool lossy_xport,
const stream_buff_params_t& fc_freq,
@@ -539,7 +543,7 @@ public:
std::lock_guard<std::recursive_mutex> lock(_mutex);
// First setup a route between the two endpoints
- setup_remote_route(dst_epid, src_epid);
+ setup_remote_route(xport, dst_epid, src_epid);
const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid);
const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid);
@@ -557,7 +561,7 @@ public:
(i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM)));
rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
rst_xact.add_hop(rst_hop);
- _send_recv_mgmt_transaction(rst_xact);
+ _send_recv_mgmt_transaction(xport, rst_xact);
}
}
@@ -579,18 +583,18 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
cfg_xact.add_hop(cfg_hop);
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
}
// Wait for stream configuration to finish on the HW side
- _validate_stream_setup(src_node_addr, timeout);
+ _validate_stream_setup(xport, src_node_addr, timeout);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid
% dst_epid));
// Return discovered buffer parameters
- return std::get<1>(_get_ostrm_status(src_node_addr));
+ return std::get<1>(_get_ostrm_status(xport, src_node_addr));
}
@@ -605,7 +609,7 @@ public:
private: // Functions
// Discover all nodes that are reachable from this software stream endpoint
- void _discover_topology()
+ void _discover_topology(chdr_ctrl_xport& xport)
{
// Initialize a queue of pending paths. We will use this for a breadth-first
// traversal of the dataflow graph. The queue consists of a previously discovered
@@ -652,7 +656,7 @@ private: // Functions
try {
// Send the discovery transaction
const mgmt_payload disc_resp_xact =
- _send_recv_mgmt_transaction(disc_req_xact);
+ _send_recv_mgmt_transaction(xport, disc_req_xact);
new_node = _pop_node_discovery_hop(disc_resp_xact);
} catch (uhd::io_error& io_err) {
// We received an IO error. This could happen if we have a legitimate
@@ -695,7 +699,7 @@ private: // Functions
mgmt_payload init_req_xact(route_xact);
_push_node_init_hop(init_req_xact, new_node);
const mgmt_payload init_resp_xact =
- _send_recv_mgmt_transaction(init_req_xact);
+ _send_recv_mgmt_transaction(xport, init_req_xact);
UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string());
// If the new node is a stream endpoint then we are done traversing this
@@ -823,7 +827,7 @@ private: // Functions
// Send/recv a management transaction that will get the output stream status
std::tuple<uint32_t, stream_buff_params_t> _get_ostrm_status(
- const node_addr_t& node_addr)
+ chdr_ctrl_xport& xport, const node_addr_t& node_addr)
{
// Build a management transaction to first get to the node
mgmt_payload status_xact;
@@ -844,7 +848,7 @@ private: // Functions
status_xact.add_hop(cfg_hop);
// Send the transaction, receive a response and validate it
- const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact);
+ const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact);
if (resp_xact.get_num_hops() != 1) {
throw uhd::op_failed("Management operation failed. Incorrect format (hops).");
}
@@ -875,13 +879,14 @@ private: // Functions
}
// Make sure that stream setup is complete and successful, else throw exception
- void _validate_stream_setup(const node_addr_t& node_addr, const double timeout)
+ void _validate_stream_setup(
+ chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout)
{
// Get the status of the output stream
uint32_t ostrm_status = 0;
double sleep_s = 0.05;
for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) {
- ostrm_status = std::get<0>(_get_ostrm_status(node_addr));
+ ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr));
if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) {
// Wait and retry
std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000));
@@ -975,7 +980,8 @@ private: // Functions
}
// Send the specified management transaction to the device
- void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1)
+ void _send_mgmt_transaction(
+ chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1)
{
chdr_header header;
header.set_pkt_type(PKT_TYPE_MGMT);
@@ -984,18 +990,19 @@ private: // Functions
header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8));
header.set_dst_epid(0);
- managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout);
+ auto send_buff = xport.get_send_buff(timeout * 1000);
if (not send_buff) {
UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction");
throw uhd::io_error("Timed out getting send buff for management transaction");
}
- _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
- send_buff->commit(header.get_length());
+ _send_pkt->refresh(send_buff->data(), header, payload);
+ send_buff->set_packet_size(header.get_length());
+ xport.release_send_buff(std::move(send_buff));
}
// Send the specified management transaction to the device and receive a response
const mgmt_payload _send_recv_mgmt_transaction(
- const mgmt_payload& transaction, double timeout = 0.1)
+ chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1)
{
mgmt_payload send(transaction);
send.set_header(_my_epid, _protover, _chdr_w);
@@ -1005,17 +1012,18 @@ private: // Functions
nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
send.add_hop(nop_hop);
// Send the transaction over the wire
- _send_mgmt_transaction(send);
+ _send_mgmt_transaction(xport, send);
- managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout);
+ auto recv_buff = xport.get_mgmt_buff(timeout * 1000);
if (not recv_buff) {
throw uhd::io_error("Timed out getting recv buff for management transaction");
}
- _recv_pkt->refresh(recv_buff->cast<void*>());
+ _recv_pkt->refresh(recv_buff->data());
mgmt_payload recv;
recv.set_header(_my_epid, _protover, _chdr_w);
_recv_pkt->fill_payload(recv);
- return std::move(recv);
+ xport.release_recv_buff(std::move(recv_buff));
+ return recv;
}
private: // Members
@@ -1039,8 +1047,6 @@ private: // Members
// endpoint
std::map<sep_id_t, sep_addr_t> _epid_addr_map;
// Send/recv transports
- uhd::transport::zero_copy_if::sptr _recv_xport;
- uhd::transport::zero_copy_if::sptr _send_xport;
size_t _send_seqnum;
// Management packet containers
chdr_mgmt_packet::uptr _send_pkt;
@@ -1053,7 +1059,7 @@ private: // Members
}; // namespace mgmt
-mgmt_portal::uptr mgmt_portal::make(const chdr_ctrl_xport_t& xport,
+mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid)