aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2019-06-04 15:35:44 -0700
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:27 -0800
commit20baa413a08cdf42ec30d6bc0aeb0c665ee590fe (patch)
treefe0cc0baa5658688d0690c07d4cf9bfc2e6094b5 /host/lib
parent53477bfc7ffb2dc09143e1c9e8e431efc8ada957 (diff)
downloaduhd-20baa413a08cdf42ec30d6bc0aeb0c665ee590fe.tar.gz
uhd-20baa413a08cdf42ec30d6bc0aeb0c665ee590fe.tar.bz2
uhd-20baa413a08cdf42ec30d6bc0aeb0c665ee590fe.zip
rfnoc: Make a chdr_ctrl_xport using the new link APIs
These changes add APIs to instantiate the new transports. However, only the control/management transport is currently implemented. It uses the chdr_ctrl_xport. Also update the mgmt_portal to use an ephemeral reference to the shared transport, to indicate that it has no ownership of the transport's memory.
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp5
-rw-r--r--host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp10
-rw-r--r--host/lib/include/uhdlib/rfnoc/mb_iface.hpp51
-rw-r--r--host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp28
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp16
-rw-r--r--host/lib/rfnoc/chdr_ctrl_endpoint.cpp34
-rw-r--r--host/lib/rfnoc/link_stream_manager.cpp79
-rw-r--r--host/lib/rfnoc/mgmt_portal.cpp94
8 files changed, 182 insertions, 135 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp
index b3c3e0108..1281cc0ea 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP
#define INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP
+#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <functional>
@@ -46,11 +47,11 @@ public:
//! Creates a control endpoint object
//
- // \param xports The transports used to send and recv packets
+ // \param xport The transport used to send and recv packets
// \param pkt_factor An instance of the CHDR packet factory
// \param my_epid The endpoint ID of this software endpoint
//
- static uptr make(const chdr_ctrl_xport_t& xports,
+ static uptr make(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid);
diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
index 4ff69bb3e..79121a498 100644
--- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
@@ -17,8 +17,8 @@
namespace uhd { namespace rfnoc {
-/*! A class that is responsible managing all data endpoints, control endpoints and client
- * zero instances accessible via a logical link between the host device and
+/*! A class that is responsible for managing all data endpoints, control endpoints and
+ * client zero instances accessible via a logical link between the host device and
* motherboard.
*
* Note that each transport adapter on the host has its own set of streaming endpoints,
@@ -120,7 +120,8 @@ public:
* \param xport_args The transport arguments
* \return An transport instance
*/
- virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr,
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ const sep_addr_t dst_addr,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -139,7 +140,8 @@ public:
* \param xport_args The transport arguments
* \return An transport instance
*/
- virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr,
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ const sep_addr_t src_addr,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
index ce1106c4c..cca8dcab8 100644
--- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
@@ -7,11 +7,20 @@
#ifndef INCLUDED_LIBUHD_MB_IFACE_HPP
#define INCLUDED_LIBUHD_MB_IFACE_HPP
+#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
+// FIXME: Update this
+class chdr_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_rx_data_xport>;
+};
+
+using chdr_tx_data_xport = chdr_rx_data_xport;
/*! Motherboard (backchannel) interface
*
@@ -59,14 +68,46 @@ public:
virtual void reset_network() = 0;
/*! Create a control transport
+ *
+ * This is usually called once per motherboard, since there is only one
+ * control transport required to talk to all the blocks on the control
+ * crossbar.
+ *
+ * \param local_device_id ID for the host transport adapter to use
+ * \param local_epid Host streaming endpoint ID
+ * \return A CHDR control transport
*/
- virtual chdr_ctrl_xport_t make_ctrl_transport(
- device_id_t local_device_id, const sep_id_t& src_epid) = 0;
+ virtual chdr_ctrl_xport::sptr make_ctrl_transport(
+ device_id_t local_device_id, const sep_id_t& local_epid) = 0;
- /*! Create a data transport
+ /*! Create an RX data transport
+ *
+ * This is typically called once per streaming channel.
+ *
+ * \param local_device_id ID for the host transport adapter to use
+ * \param local_epid Host (sink) streaming endpoint ID
+ * \param remote_epid Remote device (source) streaming endpoint ID
+ * \param xport_args Transport configuration args
+ * \return A CHDR RX data transport
+ */
+ virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id,
+ const sep_id_t& local_epid,
+ const sep_id_t& remote_epid,
+ const device_addr_t& xport_args) = 0;
+
+ /*! Create an TX data transport
+ *
+ * This is typically called once per streaming channel.
+ *
+ * \param local_device_id ID for the host transport adapter to use
+ * \param local_epid Host (source) streaming endpoint ID
+ * \param remote_epid Remote device (sink) streaming endpoint ID
+ * \param xport_args Transport configuration args
+ * \return A CHDR TX data transport
*/
- virtual chdr_data_xport_t make_data_transport(device_id_t local_device_id,
- const sep_id_t& src_epid,
+ virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id,
+ const sep_id_t& local_epid,
+ const sep_id_t& remote_epid,
const device_addr_t& xport_args) = 0;
};
diff --git a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp
index 1412d0e3d..ac72931bf 100644
--- a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_LIBUHD_MGMT_PORTAL_HPP
#define INCLUDED_LIBUHD_MGMT_PORTAL_HPP
+#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <memory>
#include <set>
@@ -54,10 +55,12 @@ public:
//! Initialize a stream endpoint and assign an endpoint ID to it
//
+ // \param xport The host stream endpoint's CTRL transport
// \param addr The physical address of the stream endpoint
// \param epid The endpoint ID to assign to this endpoint
//
- virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) = 0;
+ virtual void initialize_endpoint(
+ chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid) = 0;
//! Get information about a discovered (reachable) stream endpoint
//
@@ -77,9 +80,10 @@ public:
// destination simply by setting the DstEPID in the CHDR header to the specified
// dst_epid
//
+ // \param xport The host stream endpoint's CTRL transport
// \param dst_epid The endpoint ID of the destination
//
- virtual void setup_local_route(const sep_id_t& dst_epid) = 0;
+ virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid) = 0;
//! Can a route from between the source and destination endpoints be established?
//
@@ -95,11 +99,12 @@ public:
// to the destination simply by setting the DstEPID in the CHDR header to the
// specified dst_epid
//
+ // \param xport The host stream endpoint's CTRL transport
// \param dst_epid The endpoint ID of the destination
// \param src_epid The endpoint ID of the source
//
virtual void setup_remote_route(
- const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0;
+ chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0;
//! Start configuring a flow controlled receive data stream from the endpoint with the
// specified ID to this SW mgmt portal.
@@ -108,6 +113,7 @@ public:
// control handler needs to acknoweledge the setup transaction then call the commit
// function below.
//
+ // \param xport The host stream endpoint's CTRL transport (same EPID as RX stream)
// \param epid The endpoint ID of the data source
// \param lossy_xport Is the transport lossy? (e.g. UDP, not liberio)
// \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
@@ -116,7 +122,8 @@ public:
// \param fc_freq Flow control headroom parameters
// \param reset Reset ingress stream endpoint state
//
- virtual void config_local_rx_stream_start(const sep_id_t& epid,
+ virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -127,19 +134,22 @@ public:
//! Finish configuring a flow controlled receive data stream from the endpoint with
// the specified ID to this SW mgmt portal.
//
+ // \param xport The host stream endpoint's CTRL transport (same EPID as RX stream)
// \param epid The endpoint ID of the data source
//
virtual stream_buff_params_t config_local_rx_stream_commit(
- const sep_id_t& epid, const double timeout = 0.2) = 0;
+ chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2) = 0;
//! Configure a flow controlled transmit data stream from this SW mgmt portal to the
// endpoint with the specified ID.
//
+ // \param xport The host stream endpoint's CTRL transport (same EPID as TX stream)
// \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
// \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
// \param reset Reset ingress stream endpoint state
//
- virtual void config_local_tx_stream(const sep_id_t& epid,
+ virtual void config_local_tx_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const bool reset = false) = 0;
@@ -147,6 +157,7 @@ public:
//! Configure a flow controlled data stream from the endpoint with ID src_epid to the
// endpoint with ID dst_epid
//
+ // \param xport The host stream endpoint's CTRL transport
// \param dst_epid The endpoint ID of the destination
// \param src_epid The endpoint ID of the source
// \param lossy_xport Is the transport lossy?
@@ -155,7 +166,8 @@ public:
// \param fc_freq Flow control response frequency parameters
// \param fc_freq Flow control headroom parameters
//
- virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid,
+ virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& dst_epid,
const sep_id_t& src_epid,
const bool lossy_xport,
const stream_buff_params_t& fc_freq,
@@ -176,7 +188,7 @@ public:
//! Create an endpoint manager object
//
- static uptr make(const chdr_ctrl_xport_t& xport,
+ static uptr make(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid);
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
index c08c8d74a..7ec1b7bb2 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
@@ -7,7 +7,7 @@
#ifndef INCLUDED_RFNOC_RFNOC_COMMON_HPP
#define INCLUDED_RFNOC_RFNOC_COMMON_HPP
-#include <uhd/transport/zero_copy.hpp>
+#include <uhdlib/transport/link_if.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
@@ -66,20 +66,6 @@ struct stream_buff_params_t
//! The data type of the buffer used to capture/generate data
enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
-// TODO: Update these
-struct chdr_ctrl_xport_t
-{
- chdr_ctrl_xport_t() = default;
- uhd::transport::zero_copy_if::sptr recv;
- uhd::transport::zero_copy_if::sptr send;
- stream_buff_params_t recv_buff_params{0, 0};
- stream_buff_params_t send_buff_params{0, 0};
- sep_id_t src_epid{0};
- sep_id_t dst_epid{0};
-};
-
-using chdr_data_xport_t = chdr_ctrl_xport_t;
-
//----------------------------------------------
// Constants
//----------------------------------------------
diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
index d1d1dccca..d3c7cd58f 100644
--- a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
+++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
@@ -27,7 +27,7 @@ chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;
class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
{
public:
- chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport,
+ chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
: _my_epid(my_epid)
@@ -57,7 +57,14 @@ public:
// there are no timed blocks on the underlying.
_recv_thread.join();
// Flush base transport
- while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/;
+ while (true) {
+ auto buff = _xport->get_recv_buff(100);
+ if (buff) {
+ _xport->release_recv_buff(std::move(buff));
+ } else {
+ break;
+ }
+ }
// Release child endpoints
_endpoint_map.clear(););
}
@@ -82,9 +89,10 @@ public:
header.set_dst_epid(dst_epid);
// Acquire send buffer and send the packet
std::lock_guard<std::mutex> lock(_send_mutex);
- auto send_buff = _xport.send->get_send_buff(timeout);
- _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
- send_buff->commit(header.get_length());
+ auto send_buff = _xport->get_send_buff(timeout * 1000);
+ _send_pkt->refresh(send_buff->data(), header, payload);
+ send_buff->set_packet_size(header.get_length());
+ _xport->release_send_buff(std::move(send_buff));
};
if (_endpoint_map.find(key) == _endpoint_map.end()) {
@@ -118,11 +126,14 @@ private:
// - Route them based on the dst_port
// - Pass them to the ctrlport_endpoint for additional processing
while (not _stop_recv_thread) {
- auto buff = _xport.recv->get_recv_buff(0.0);
+ // FIXME Move lock back once have threaded_io_service
+ std::unique_lock<std::mutex> lock(_mutex);
+ auto buff = _xport->get_recv_buff(0);
if (buff) {
- std::lock_guard<std::mutex> lock(_mutex);
+ // FIXME Move lock back to here once have threaded_io_service
+ // std::lock_guard<std::mutex> lock(_mutex);
try {
- _recv_pkt->refresh(buff->cast<void*>());
+ _recv_pkt->refresh(buff->data());
const ctrl_payload payload = _recv_pkt->get_payload();
ep_map_key_t key{payload.src_epid, payload.dst_port};
if (_endpoint_map.find(key) != _endpoint_map.end()) {
@@ -131,7 +142,10 @@ private:
} catch (...) {
// Ignore all errors
}
+ _xport->release_recv_buff(std::move(buff));
} else {
+ // FIXME Move lock back to lock_guard once have threaded_io_service
+ lock.unlock();
// Be a good citizen and yield if no packet is processed
static const size_t MIN_DUR = 1;
boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
@@ -154,7 +168,7 @@ private:
// The endpoint ID of this software endpoint
const sep_id_t _my_epid;
// Send/recv transports
- const chdr_ctrl_xport_t _xport;
+ chdr_ctrl_xport::sptr _xport;
// The curent sequence number for a send packet
size_t _send_seqnum = 0;
// The number of packets dropped
@@ -173,7 +187,7 @@ private:
std::mutex _send_mutex;
};
-chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport,
+chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
{
diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp
index bd330e313..4fe183529 100644
--- a/host/lib/rfnoc/link_stream_manager.cpp
+++ b/host/lib/rfnoc/link_stream_manager.cpp
@@ -63,31 +63,10 @@ public:
// chdr_ctrl_endpoint. We have to use the same base transport here to ensure that
// the route setup logic in the FPGA transport works correctly.
// TODO: This needs to be cleaned up. A muxed_zero_copy_if is excessive here
- chdr_ctrl_xport_t base_xport =
- _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid);
- UHD_ASSERT_THROW(base_xport.send.get() == base_xport.recv.get())
-
- auto classify_fn = [&pkt_factory](void* buff, size_t) -> uint32_t {
- if (buff) {
- chdr_packet::cuptr pkt = pkt_factory.make_generic();
- pkt->refresh(buff);
- return (pkt->get_chdr_header().get_pkt_type() == PKT_TYPE_MGMT) ? 0 : 1;
- } else {
- throw uhd::assertion_error("null pointer");
- }
- };
- _muxed_xport = muxed_zero_copy_if::make(base_xport.send, classify_fn, 2);
-
- // Create child transports
- chdr_ctrl_xport_t mgmt_xport = base_xport;
- mgmt_xport.send = _muxed_xport->make_stream(0);
- mgmt_xport.recv = mgmt_xport.send;
- _ctrl_xport = base_xport;
- _ctrl_xport.send = _muxed_xport->make_stream(1);
- _ctrl_xport.recv = _ctrl_xport.send;
+ _ctrl_xport = _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid);
// Create management portal using one of the child transports
- _mgmt_portal = mgmt_portal::make(mgmt_xport,
+ _mgmt_portal = mgmt_portal::make(*_ctrl_xport,
_pkt_factory,
sep_addr_t(_my_device_id, SEP_INST_MGMT_CTRL),
_my_mgmt_ctrl_epid);
@@ -131,8 +110,8 @@ public:
}
// Setup a route to the EPID
- _mgmt_portal->initialize_endpoint(dst_addr, dst_epid);
- _mgmt_portal->setup_local_route(dst_epid);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid);
+ _mgmt_portal->setup_local_route(*_ctrl_xport, dst_epid);
if (!_mgmt_portal->get_endpoint_info(dst_epid).has_ctrl) {
throw uhd::rfnoc_error(
"Downstream endpoint does not support control traffic");
@@ -157,10 +136,10 @@ public:
sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr);
// Initialize endpoints
- _mgmt_portal->initialize_endpoint(dst_addr, dst_epid);
- _mgmt_portal->initialize_endpoint(src_addr, src_epid);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid);
- _mgmt_portal->setup_remote_route(dst_epid, src_epid);
+ _mgmt_portal->setup_remote_route(*_ctrl_xport, dst_epid, src_epid);
return sep_id_pair_t(src_epid, dst_epid);
}
@@ -214,13 +193,15 @@ public:
// EPIDs)
// Setup a stream
- stream_buff_params_t buff_params = _mgmt_portal->config_remote_stream(dst_epid,
- src_epid,
- lossy_xport,
- stream_buff_params_t{1, 1}, // Dummy frequency
- stream_buff_params_t{0, 0}, // Dummy headroom
- false,
- STREAM_SETUP_TIMEOUT);
+ stream_buff_params_t buff_params =
+ _mgmt_portal->config_remote_stream(*_ctrl_xport,
+ dst_epid,
+ src_epid,
+ lossy_xport,
+ stream_buff_params_t{1, 1}, // Dummy frequency
+ stream_buff_params_t{0, 0}, // Dummy headroom
+ false,
+ STREAM_SETUP_TIMEOUT);
// Compute FC frequency and headroom based on buff parameters
stream_buff_params_t fc_freq{
@@ -234,7 +215,8 @@ public:
std::ceil(double(buff_params.packets) * fc_headroom_ratio))};
// Reconfigure flow control using the new frequency and headroom
- return _mgmt_portal->config_remote_stream(dst_epid,
+ return _mgmt_portal->config_remote_stream(*_ctrl_xport,
+ dst_epid,
src_epid,
lossy_xport,
fc_freq,
@@ -243,7 +225,8 @@ public:
STREAM_SETUP_TIMEOUT);
}
- virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr,
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ const sep_addr_t dst_addr,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -261,21 +244,22 @@ public:
sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr);
// Create the data transport that we will return to the client
- chdr_data_xport_t xport =
- _mb_iface.make_data_transport(_my_device_id, src_epid, xport_args);
- xport.src_epid = src_epid;
- xport.dst_epid = dst_epid;
+ chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport(
+ _my_device_id, src_epid, dst_epid, xport_args);
+
+ chdr_ctrl_xport::sptr mgmt_xport =
+ _mb_iface.make_ctrl_transport(_my_device_id, src_epid);
// Create new temporary management portal with the transports used for this stream
// TODO: This is a bit excessive. Maybe we can pair down the functionality of the
// portal just for route setup purposes. Whatever we do, we *must* use xport in it
// though otherwise the transport will not behave correctly.
mgmt_portal::uptr data_mgmt_portal =
- mgmt_portal::make(xport, _pkt_factory, sw_epid_addr, src_epid);
+ mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid);
// Setup a route to the EPID
- data_mgmt_portal->initialize_endpoint(dst_addr, dst_epid);
- data_mgmt_portal->setup_local_route(dst_epid);
+ data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid);
+ data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid);
if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) {
throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");
}
@@ -288,7 +272,8 @@ public:
return xport;
}
- virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr,
+ virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream(
+ const sep_addr_t src_addr,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -297,6 +282,7 @@ public:
const device_addr_t& xport_args)
{
// TODO: Implement me
+ return chdr_tx_data_xport::uptr();
}
private:
@@ -323,8 +309,7 @@ private:
// The software EPID for all management and control traffic
sep_id_t _my_mgmt_ctrl_epid;
// Transports
- muxed_zero_copy_if::sptr _muxed_xport;
- chdr_ctrl_xport_t _ctrl_xport;
+ chdr_ctrl_xport::sptr _ctrl_xport;
// Management portal for control endpoints
mgmt_portal::uptr _mgmt_portal;
// The CHDR control endpoint
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp
index 79e297407..b490e0baf 100644
--- a/host/lib/rfnoc/mgmt_portal.cpp
+++ b/host/lib/rfnoc/mgmt_portal.cpp
@@ -7,6 +7,7 @@
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/mgmt_portal.hpp>
#include <unordered_set>
@@ -174,7 +175,7 @@ mgmt_portal::~mgmt_portal() {}
class mgmt_portal_impl : public mgmt_portal
{
public:
- mgmt_portal_impl(const chdr_ctrl_xport_t& xport,
+ mgmt_portal_impl(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid)
@@ -183,14 +184,12 @@ public:
, _chdr_w(pkt_factory.get_chdr_w())
, _endianness(pkt_factory.get_endianness())
, _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, my_epid)
- , _recv_xport(xport.recv)
- , _send_xport(xport.send)
, _send_seqnum(0)
, _send_pkt(std::move(pkt_factory.make_mgmt()))
, _recv_pkt(std::move(pkt_factory.make_mgmt()))
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
- _discover_topology();
+ _discover_topology(xport);
UHD_LOG_DEBUG("RFNOC::MGMT",
"The following endpoints are reachable from " << _my_node_id.to_string());
for (const auto& ep : _discovered_ep_set) {
@@ -205,7 +204,8 @@ public:
return _discovered_ep_set;
}
- virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid)
+ virtual void initialize_endpoint(
+ chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -232,7 +232,7 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
// Add/update the entry in the stream endpoint ID map
_epid_addr_map[epid] = addr;
@@ -280,7 +280,7 @@ public:
return retval;
}
- virtual void setup_local_route(const sep_id_t& dst_epid)
+ virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -339,7 +339,7 @@ public:
cfg_xact.add_hop(discover_hop);
// Send the transaction and validate that we saw a stream endpoint
- const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact);
+ const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact);
const node_id_t sep_node = _pop_node_discovery_hop(sep_info_xact);
if (sep_node.type != NODE_TYPE_STRM_EP) {
throw uhd::routing_error(
@@ -384,7 +384,8 @@ public:
return false;
}
- virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid)
+ virtual void setup_remote_route(
+ chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
@@ -412,8 +413,8 @@ public:
// there is a need to optimize for routing table fullness, we can do a software
// graph traversal here, find the closest common parent (crossbar) for the two
// nodes and only configure the nodes downstream of that.
- setup_local_route(dst_epid);
- setup_local_route(src_epid);
+ setup_local_route(xport, dst_epid);
+ setup_local_route(xport, src_epid);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format(
@@ -421,7 +422,8 @@ public:
% src_epid % dst_epid));
}
- virtual void config_local_rx_stream_start(const sep_id_t& epid,
+ virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
@@ -464,29 +466,30 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
cfg_xact.add_hop(cfg_hop);
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Initiated RX stream setup for EPID=%d") % epid));
}
virtual stream_buff_params_t config_local_rx_stream_commit(
- const sep_id_t& epid, const double timeout = 0.2)
+ chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
// Wait for stream configuration to finish on the HW side
const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
- _validate_stream_setup(node_addr, timeout);
+ _validate_stream_setup(xport, node_addr, timeout);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Finished RX stream setup for EPID=%d") % epid));
// Return discovered buffer parameters
- return std::get<1>(_get_ostrm_status(node_addr));
+ return std::get<1>(_get_ostrm_status(xport, node_addr));
}
- virtual void config_local_tx_stream(const sep_id_t& epid,
+ virtual void config_local_tx_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& epid,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const bool reset = false)
@@ -494,7 +497,7 @@ public:
std::lock_guard<std::recursive_mutex> lock(_mutex);
// First setup a route between to the endpoint
- setup_local_route(epid);
+ setup_local_route(xport, epid);
const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
@@ -522,13 +525,14 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Finished TX stream setup for EPID=%d") % epid));
}
- virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid,
+ virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport,
+ const sep_id_t& dst_epid,
const sep_id_t& src_epid,
const bool lossy_xport,
const stream_buff_params_t& fc_freq,
@@ -539,7 +543,7 @@ public:
std::lock_guard<std::recursive_mutex> lock(_mutex);
// First setup a route between the two endpoints
- setup_remote_route(dst_epid, src_epid);
+ setup_remote_route(xport, dst_epid, src_epid);
const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid);
const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid);
@@ -557,7 +561,7 @@ public:
(i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM)));
rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
rst_xact.add_hop(rst_hop);
- _send_recv_mgmt_transaction(rst_xact);
+ _send_recv_mgmt_transaction(xport, rst_xact);
}
}
@@ -579,18 +583,18 @@ public:
// Send the transaction and receive a response.
// We don't care about the contents of the response.
cfg_xact.add_hop(cfg_hop);
- _send_recv_mgmt_transaction(cfg_xact);
+ _send_recv_mgmt_transaction(xport, cfg_xact);
}
// Wait for stream configuration to finish on the HW side
- _validate_stream_setup(src_node_addr, timeout);
+ _validate_stream_setup(xport, src_node_addr, timeout);
UHD_LOG_DEBUG("RFNOC::MGMT",
(boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid
% dst_epid));
// Return discovered buffer parameters
- return std::get<1>(_get_ostrm_status(src_node_addr));
+ return std::get<1>(_get_ostrm_status(xport, src_node_addr));
}
@@ -605,7 +609,7 @@ public:
private: // Functions
// Discover all nodes that are reachable from this software stream endpoint
- void _discover_topology()
+ void _discover_topology(chdr_ctrl_xport& xport)
{
// Initialize a queue of pending paths. We will use this for a breadth-first
// traversal of the dataflow graph. The queue consists of a previously discovered
@@ -652,7 +656,7 @@ private: // Functions
try {
// Send the discovery transaction
const mgmt_payload disc_resp_xact =
- _send_recv_mgmt_transaction(disc_req_xact);
+ _send_recv_mgmt_transaction(xport, disc_req_xact);
new_node = _pop_node_discovery_hop(disc_resp_xact);
} catch (uhd::io_error& io_err) {
// We received an IO error. This could happen if we have a legitimate
@@ -695,7 +699,7 @@ private: // Functions
mgmt_payload init_req_xact(route_xact);
_push_node_init_hop(init_req_xact, new_node);
const mgmt_payload init_resp_xact =
- _send_recv_mgmt_transaction(init_req_xact);
+ _send_recv_mgmt_transaction(xport, init_req_xact);
UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string());
// If the new node is a stream endpoint then we are done traversing this
@@ -823,7 +827,7 @@ private: // Functions
// Send/recv a management transaction that will get the output stream status
std::tuple<uint32_t, stream_buff_params_t> _get_ostrm_status(
- const node_addr_t& node_addr)
+ chdr_ctrl_xport& xport, const node_addr_t& node_addr)
{
// Build a management transaction to first get to the node
mgmt_payload status_xact;
@@ -844,7 +848,7 @@ private: // Functions
status_xact.add_hop(cfg_hop);
// Send the transaction, receive a response and validate it
- const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact);
+ const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact);
if (resp_xact.get_num_hops() != 1) {
throw uhd::op_failed("Management operation failed. Incorrect format (hops).");
}
@@ -875,13 +879,14 @@ private: // Functions
}
// Make sure that stream setup is complete and successful, else throw exception
- void _validate_stream_setup(const node_addr_t& node_addr, const double timeout)
+ void _validate_stream_setup(
+ chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout)
{
// Get the status of the output stream
uint32_t ostrm_status = 0;
double sleep_s = 0.05;
for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) {
- ostrm_status = std::get<0>(_get_ostrm_status(node_addr));
+ ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr));
if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) {
// Wait and retry
std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000));
@@ -975,7 +980,8 @@ private: // Functions
}
// Send the specified management transaction to the device
- void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1)
+ void _send_mgmt_transaction(
+ chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1)
{
chdr_header header;
header.set_pkt_type(PKT_TYPE_MGMT);
@@ -984,18 +990,19 @@ private: // Functions
header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8));
header.set_dst_epid(0);
- managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout);
+ auto send_buff = xport.get_send_buff(timeout * 1000);
if (not send_buff) {
UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction");
throw uhd::io_error("Timed out getting send buff for management transaction");
}
- _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
- send_buff->commit(header.get_length());
+ _send_pkt->refresh(send_buff->data(), header, payload);
+ send_buff->set_packet_size(header.get_length());
+ xport.release_send_buff(std::move(send_buff));
}
// Send the specified management transaction to the device and receive a response
const mgmt_payload _send_recv_mgmt_transaction(
- const mgmt_payload& transaction, double timeout = 0.1)
+ chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1)
{
mgmt_payload send(transaction);
send.set_header(_my_epid, _protover, _chdr_w);
@@ -1005,17 +1012,18 @@ private: // Functions
nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
send.add_hop(nop_hop);
// Send the transaction over the wire
- _send_mgmt_transaction(send);
+ _send_mgmt_transaction(xport, send);
- managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout);
+ auto recv_buff = xport.get_mgmt_buff(timeout * 1000);
if (not recv_buff) {
throw uhd::io_error("Timed out getting recv buff for management transaction");
}
- _recv_pkt->refresh(recv_buff->cast<void*>());
+ _recv_pkt->refresh(recv_buff->data());
mgmt_payload recv;
recv.set_header(_my_epid, _protover, _chdr_w);
_recv_pkt->fill_payload(recv);
- return std::move(recv);
+ xport.release_recv_buff(std::move(recv_buff));
+ return recv;
}
private: // Members
@@ -1039,8 +1047,6 @@ private: // Members
// endpoint
std::map<sep_id_t, sep_addr_t> _epid_addr_map;
// Send/recv transports
- uhd::transport::zero_copy_if::sptr _recv_xport;
- uhd::transport::zero_copy_if::sptr _send_xport;
size_t _send_seqnum;
// Management packet containers
chdr_mgmt_packet::uptr _send_pkt;
@@ -1053,7 +1059,7 @@ private: // Members
}; // namespace mgmt
-mgmt_portal::uptr mgmt_portal::make(const chdr_ctrl_xport_t& xport,
+mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_addr_t my_sep_addr,
sep_id_t my_epid)