aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp17
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp175
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp323
-rw-r--r--host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp17
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp9
-rw-r--r--host/lib/rfnoc/CMakeLists.txt2
-rw-r--r--host/lib/rfnoc/chdr_ctrl_xport.cpp19
-rw-r--r--host/lib/rfnoc/chdr_rx_data_xport.cpp203
-rw-r--r--host/lib/rfnoc/chdr_tx_data_xport.cpp249
9 files changed, 597 insertions, 417 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp
index 9cf2f305f..726ea7f6c 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_RFNOC_CHDR_CTRL_XPORT_HPP
#define INCLUDED_RFNOC_CHDR_CTRL_XPORT_HPP
+#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <uhdlib/transport/io_service.hpp>
#include <mutex>
@@ -41,12 +42,18 @@ public:
static sptr make(io_service::sptr io_srv,
send_link_if::sptr send_link,
recv_link_if::sptr recv_link,
+ const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid,
size_t num_send_frames,
size_t num_recv_frames)
{
- return std::make_shared<chdr_ctrl_xport>(
- io_srv, send_link, recv_link, my_epid, num_send_frames, num_recv_frames);
+ return std::make_shared<chdr_ctrl_xport>(io_srv,
+ send_link,
+ recv_link,
+ pkt_factory,
+ my_epid,
+ num_send_frames,
+ num_recv_frames);
}
/*!
@@ -62,6 +69,7 @@ public:
chdr_ctrl_xport(io_service::sptr io_srv,
send_link_if::sptr send_link,
recv_link_if::sptr recv_link,
+ const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid,
size_t num_send_frames,
size_t num_recv_frames);
@@ -122,10 +130,15 @@ private:
chdr_ctrl_xport(const chdr_ctrl_xport&) = delete;
sep_id_t _my_epid;
+
+ // Packet for received data
+ chdr::chdr_packet::uptr _recv_packet;
+
send_io_if::sptr _send_if;
recv_io_if::sptr _ctrl_recv_if;
recv_io_if::sptr _mgmt_recv_if;
+
// FIXME: Remove this when have threaded_io_service
std::mutex _mutex;
};
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
index 41af766b4..9c5d88066 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -8,9 +8,9 @@
#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
-#include <uhdlib/rfnoc/mgmt_portal.hpp>
#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp>
#include <uhdlib/transport/io_service.hpp>
@@ -19,6 +19,10 @@
namespace uhd { namespace rfnoc {
+namespace mgmt {
+class mgmt_portal;
+}
+
namespace detail {
/*!
@@ -29,21 +33,12 @@ class rx_flow_ctrl_sender
public:
//! Constructor
rx_flow_ctrl_sender(
- const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
- : _dst_epid(sep_ids.first)
- {
- _fc_packet = pkt_factory.make_strs();
- _fc_strs_pyld.src_epid = sep_ids.second;
- }
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids);
- /*! Configure buffer capacity
+ /*! configure buffer capacity
* \param recv_capacity The buffer capacity of the receive link
*/
- void set_capacity(const stream_buff_params_t& recv_capacity)
- {
- _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
- _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
- }
+ void set_capacity(const stream_buff_params_t& recv_capacity);
/*! Send a flow control response packet
*
@@ -119,113 +114,59 @@ public:
const void* payload = nullptr;
};
- /*! Constructor
+ //! Flow control parameters
+ struct fc_params_t
+ {
+ stream_buff_params_t buff_capacity;
+ stream_buff_params_t freq;
+ };
+
+ /*! Configure stream endpoint route and flow control
*
* \param io_srv The service that will schedule the xport I/O
* \param recv_link The recv link, already attached to the I/O service
* \param send_link The send link, already attached to the I/O service
* \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param mgmt_portal Management portal to configure stream endpoint
* \param epids Source and destination endpoint IDs
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param num_recv_frames Num frames to reserve from the recv link
* \param recv_capacity Total capacity of the recv link
* \param fc_freq Frequency of flow control status messages
* \param fc_headroom Headroom for flow control status messages
* \param lossy_xport Whether the xport is lossy, for flow control configuration
+ * \return Parameters for xport flow control
*/
- chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv,
uhd::transport::recv_link_if::sptr recv_link,
uhd::transport::send_link_if::sptr send_link,
- uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const chdr::chdr_packet_factory& pkt_factory,
+ uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const size_t num_recv_frames,
const stream_buff_params_t& recv_capacity,
const stream_buff_params_t& fc_freq,
const stream_buff_params_t& fc_headroom,
- const bool lossy_xport)
- : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second)
- {
- const sep_id_t remote_epid = epids.first;
- const sep_id_t local_epid = epids.second;
-
- UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
- "Creating rx xport with local epid=" << local_epid
- << ", remote epid=" << remote_epid);
-
- _recv_packet = pkt_factory.make_generic();
- _fc_sender.set_capacity(recv_capacity);
-
- // Calculate max payload size
- const size_t pyld_offset =
- _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
- _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
-
- // Make data transport
- auto recv_cb = [this](buff_t::uptr& buff,
- transport::recv_link_if* recv_link,
- transport::send_link_if* send_link) {
- return this->_recv_callback(buff, recv_link, send_link);
- };
-
- auto fc_cb = [this](buff_t::uptr buff,
- transport::recv_link_if* recv_link,
- transport::send_link_if* send_link) {
- this->_fc_callback(std::move(buff), recv_link, send_link);
- };
-
- // Needs just a single send frame for responses
- _recv_io = io_srv->make_recv_client(recv_link,
- num_recv_frames,
- recv_cb,
- send_link,
- /* num_send_frames*/ 1,
- fc_cb);
-
- // Create a control transport with the rx data links to send mgmt packets
- // needed to setup the stream
- // Piggyback on frames from the recv_io_if
- auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
- send_link,
- recv_link,
- local_epid,
- 0, // num_send_frames
- 0); // num_recv_frames
-
- // Setup a route to the EPID
- // Note that this may be gratuitous--The endpoint may already have been set up
- mgmt_portal.setup_local_route(*ctrl_xport, remote_epid);
-
- // Initialize flow control - management portal sends a stream command
- // containing its requested flow control frequency, the rx transport
- // responds with a stream status containing its buffer capacity.
- mgmt_portal.config_local_rx_stream_start(*ctrl_xport,
- remote_epid,
- lossy_xport,
- pyld_buff_fmt,
- mdata_buff_fmt,
- fc_freq,
- fc_headroom);
-
- mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid);
-
- UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
- "Stream endpoint was configured with:"
- << std::endl
- << "capacity bytes=" << recv_capacity.bytes
- << ", packets=" << recv_capacity.packets << std::endl
- << "fc headroom bytes=" << fc_headroom.bytes
- << ", packets=" << fc_headroom.packets << std::endl
- << "fc frequency bytes=" << fc_freq.bytes
- << ", packets=" << fc_freq.packets);
-
- // We no longer need the control xport, release it so
- // the control xport is no longer connected to the I/O service.
- ctrl_xport.reset();
- }
+ const bool lossy_xport);
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param epids Source and destination endpoint IDs
+ * \param num_recv_frames Num frames to reserve from the recv link
+ * \param fc_params Parameters for flow control
+ */
+ chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const size_t num_recv_frames,
+ const fc_params_t& fc_params);
/*! Returns maximum number payload bytes
*
@@ -286,15 +227,16 @@ private:
transport::send_link_if* send_link)
{
_recv_packet->refresh(buff->data());
- const auto header = _recv_packet->get_chdr_header();
- const auto type = header.get_pkt_type();
- const auto dst_epid = header.get_dst_epid();
- const auto packet_size = buff->packet_size();
+ const auto header = _recv_packet->get_chdr_header();
+ const auto dst_epid = header.get_dst_epid();
if (dst_epid != _epid) {
return false;
}
+ const auto type = header.get_pkt_type();
+ const auto packet_size = header.get_length();
+
if (type == chdr::PKT_TYPE_STRC) {
chdr::strc_payload strc;
strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
@@ -316,26 +258,6 @@ private:
buff = buff_t::uptr();
_fc_state.xfer_done(packet_size);
_send_fc_response(send_link);
- } else if (strc.op_code == chdr::STRC_INIT) {
- _fc_state.initialize(
- {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)});
-
- UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
- "Received strc init with fc freq"
- << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
-
- // Make sure flow control was initialized
- assert(_fc_state.get_fc_freq().bytes > 0);
- assert(_fc_state.get_fc_freq().packets > 0);
-
- // Send a strs response to configure flow control on the sender
- _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
-
- // Reset counts, since mgmt_portal will do it to FPGA
- _fc_state.reset_counts();
-
- recv_link->release_recv_buff(std::move(buff));
- buff = buff_t::uptr();
} else {
throw uhd::value_error("Unexpected opcode value in STRC packet.");
}
@@ -373,7 +295,9 @@ private:
transport::recv_link_if* recv_link,
transport::send_link_if* send_link)
{
- const size_t packet_size = buff->packet_size();
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const size_t packet_size = header.get_length();
recv_link->release_recv_buff(std::move(buff));
_fc_state.xfer_done(packet_size);
_send_fc_response(send_link);
@@ -393,8 +317,8 @@ private:
}
/*!
- * Checks if the sequence number is out of sequence, prints 'D' if it is
- * and returns result of check.
+ * Checks if the sequence number is out of sequence, increments sequence
+ * number for next packet.
*
* \return true if a sequence error occurred
*/
@@ -404,7 +328,6 @@ private:
_data_seq_num = seq_num + 1;
if (expected_packet_count != seq_num) {
- UHD_LOG_FASTPATH("D");
return true;
}
return false;
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 63c2b24cb..1e54a2f7a 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -7,10 +7,11 @@
#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#include <uhd/exception.hpp>
#include <uhd/types/metadata.hpp>
+#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
-#include <uhdlib/rfnoc/mgmt_portal.hpp>
#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp>
#include <uhdlib/transport/io_service.hpp>
@@ -19,6 +20,10 @@
namespace uhd { namespace rfnoc {
+namespace mgmt {
+class mgmt_portal;
+}
+
namespace detail {
/*!
@@ -29,13 +34,7 @@ class tx_flow_ctrl_sender
public:
//! Constructor
tx_flow_ctrl_sender(
- const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
- : _dst_epid(sep_ids.second)
- {
- _fc_packet = pkt_factory.make_strc();
- _fc_strc_pyld.src_epid = sep_ids.first;
- _fc_strc_pyld.op_code = chdr::STRC_RESYNC;
- }
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids);
/*!
* Sends a flow control resync packet
@@ -104,9 +103,10 @@ private:
class chdr_tx_data_xport
{
public:
- using uptr = std::unique_ptr<chdr_tx_data_xport>;
- using buff_t = transport::frame_buff;
- using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>;
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+ using enqueue_async_msg_fn_t =
+ std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>;
//! Information about data packet
struct packet_info_t
@@ -117,84 +117,54 @@ public:
size_t payload_bytes = 0;
};
- /*! Constructor
+ //! Flow control parameters
+ struct fc_params_t
+ {
+ stream_buff_params_t buff_capacity;
+ };
+
+ /*! Configure route to the sep and flow control
*
- * \param io_srv The service that will schedule the xport I/O
+ * \param io_srv The I/O service to be used with the transport
* \param recv_link The recv link, already attached to the I/O service
* \param send_link The send link, already attached to the I/O service
* \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param mgmt_portal Management portal to configure stream endpoint
* \param epids Source and destination endpoint IDs
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param num_send_frames Num frames to reserve from the send link
* \param fc_freq_ratio Ratio to use to configure the device fc frequency
* \param fc_headroom_ratio Ratio to use to configure the device fc headroom
+ * \return Parameters for xport flow control
*/
- chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv,
uhd::transport::recv_link_if::sptr recv_link,
uhd::transport::send_link_if::sptr send_link,
- uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const chdr::chdr_packet_factory& pkt_factory,
+ uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const size_t num_send_frames,
const double fc_freq_ratio,
- const double fc_headroom_ratio)
- : _fc_sender(pkt_factory, epids), _epid(epids.first)
- {
- const sep_id_t remote_epid = epids.second;
- const sep_id_t local_epid = epids.first;
-
- UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
- "Creating tx xport with local epid=" << local_epid
- << ", remote epid=" << remote_epid);
-
- _send_header.set_dst_epid(epids.second);
- _send_packet = pkt_factory.make_generic();
- _recv_packet = pkt_factory.make_generic();
-
- // Calculate max payload size
- const size_t pyld_offset =
- _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
- _max_payload_size = send_link->get_send_frame_size() - pyld_offset;
-
- _configure_sep(io_srv,
- recv_link,
- send_link,
- mgmt_portal,
- local_epid,
- remote_epid,
- pyld_buff_fmt,
- mdata_buff_fmt);
-
- _initialize_flow_ctrl(io_srv,
- recv_link,
- send_link,
- pkt_factory,
- epids,
- fc_freq_ratio,
- fc_headroom_ratio);
-
- // Now create the send I/O we will use for data
- auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
- this->_send_callback(buff, send_link);
- };
-
- auto recv_cb = [this](buff_t::uptr& buff,
- transport::recv_link_if* recv_link,
- transport::send_link_if* send_link) {
- return this->_recv_callback(buff, recv_link, send_link);
- };
-
- // Needs just a single recv frame for strs packets
- _send_io = io_srv->make_send_client(send_link,
- num_send_frames,
- send_cb,
- recv_link,
- /* num_recv_frames */ 1,
- recv_cb);
- }
+ const double fc_headroom_ratio);
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param epids Source and destination endpoint IDs
+ * \param num_send_frames Num frames to reserve from the send link
+ * \param fc_params Parameters for flow control
+ */
+ chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const size_t num_send_frames,
+ const fc_params_t fc_params);
/*! Returns maximum number of payload bytes
*
@@ -300,23 +270,27 @@ private:
if (strs.status != chdr::STRS_OKAY) {
switch (strs.status) {
- case chdr::STRS_SEQERR:
- UHD_LOG_FASTPATH("S");
- if (_enqueue_async_msg) {
- _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0);
- }
- break;
- case chdr::STRS_DATAERR:
- UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!");
- break;
- case chdr::STRS_RTERR:
- UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!");
- break;
- case chdr::STRS_CMDERR:
- UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!");
- break;
- default:
- break;
+ case chdr::STRS_SEQERR:
+ UHD_LOG_FASTPATH("S");
+ if (_enqueue_async_msg) {
+ _enqueue_async_msg(
+ async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0);
+ }
+ break;
+ case chdr::STRS_DATAERR:
+ UHD_LOG_WARNING(
+ "XPORT::TX_DATA_XPORT", "Received data error in tx stream!");
+ break;
+ case chdr::STRS_RTERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT",
+ "Received routing error in tx stream!");
+ break;
+ case chdr::STRS_CMDERR:
+ UHD_LOG_WARNING("XPORT::TX_DATA_XPORT",
+ "Received command error in tx stream!");
+ break;
+ default:
+ break;
}
}
@@ -359,177 +333,6 @@ private:
}
}
- /*!
- * Configures the stream endpoint using mgmt_portal
- */
- void _configure_sep(uhd::transport::io_service::sptr io_srv,
- uhd::transport::recv_link_if::sptr recv_link,
- uhd::transport::send_link_if::sptr send_link,
- uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
- const uhd::rfnoc::sep_id_t& local_epid,
- const uhd::rfnoc::sep_id_t& remote_epid,
- const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
- const uhd::rfnoc::sw_buff_t mdata_buff_fmt)
- {
- // Create a control transport with the tx data links to send mgmt packets
- // needed to setup the stream. Only need one frame for this.
- auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
- send_link,
- recv_link,
- local_epid,
- 1, // num_send_frames
- 1); // num_recv_frames
-
- // Setup a route to the EPID
- mgmt_portal.setup_local_route(*ctrl_xport, remote_epid);
-
- mgmt_portal.config_local_tx_stream(
- *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);
-
- // We no longer need the control xport, release it so
- // the control xport is no longer connected to the I/O service.
- ctrl_xport.reset();
- }
-
- /*!
- * Initializes flow control
- *
- * To initialize flow control, we need to send an init strc packet, then
- * receive a strs containing the stream endpoint ingress buffer size. We
- * then repeat this (now that we know the buffer size) to configure the flow
- * control frequency. To avoid having this logic within the data packet
- * processing flow, we use temporary send and recv I/O instances with
- * simple callbacks here.
- */
- void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv,
- uhd::transport::recv_link_if::sptr recv_link,
- uhd::transport::send_link_if::sptr send_link,
- const chdr::chdr_packet_factory& pkt_factory,
- const sep_id_pair_t sep_ids,
- const double fc_freq_ratio,
- const double fc_headroom_ratio)
- {
- // No flow control at initialization, just release all send buffs
- auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
- send_link->release_send_buff(std::move(buff));
- buff = nullptr;
- };
-
- // For recv, just queue strs packets for recv_io to read
- auto recv_cb = [this](buff_t::uptr& buff,
- transport::recv_link_if* /*recv_link*/,
- transport::send_link_if* /*send_link*/) {
- _recv_packet->refresh(buff->data());
- const auto header = _recv_packet->get_chdr_header();
- const auto type = header.get_pkt_type();
- const auto dst_epid = header.get_dst_epid();
-
- return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS);
- };
-
- // No flow control at initialization, just release all recv buffs
- auto fc_cb = [this](buff_t::uptr buff,
- transport::recv_link_if* recv_link,
- transport::send_link_if* /*send_link*/) {
- recv_link->release_recv_buff(std::move(buff));
- };
-
- auto send_io = io_srv->make_send_client(send_link,
- 1, // num_send_frames
- send_cb,
- nullptr,
- 0, // num_recv_frames
- nullptr);
-
- auto recv_io = io_srv->make_recv_client(recv_link,
- 1, // num_recv_frames
- recv_cb,
- nullptr,
- 0, // num_send_frames
- fc_cb);
-
- chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
- chdr::chdr_packet::uptr& recv_packet = _recv_packet;
-
- // Function to send a strc init
- auto send_strc_init = [&send_io, sep_ids, &strc_packet](
- const stream_buff_params_t fc_freq = {0, 0}) {
- transport::frame_buff::uptr buff = send_io->get_send_buff(0);
-
- if (!buff) {
- throw uhd::runtime_error(
- "tx xport timed out getting a send buffer for strc init");
- }
-
- chdr::chdr_header header;
- header.set_seq_num(0);
- header.set_dst_epid(sep_ids.second);
-
- chdr::strc_payload strc_pyld;
- strc_pyld.src_epid = sep_ids.first;
- strc_pyld.op_code = chdr::STRC_INIT;
- strc_pyld.num_bytes = fc_freq.bytes;
- strc_pyld.num_pkts = fc_freq.packets;
- strc_packet->refresh(buff->data(), header, strc_pyld);
-
- const size_t size = header.get_length();
- buff->set_packet_size(size);
- send_io->release_send_buff(std::move(buff));
- };
-
- // Function to receive a strs, returns buffer capacity
- auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
- transport::frame_buff::uptr buff = recv_io->get_recv_buff(200);
-
- if (!buff) {
- throw uhd::runtime_error(
- "tx xport timed out wating for a strs packet during initialization");
- }
-
- recv_packet->refresh(buff->data());
- UHD_ASSERT_THROW(
- recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
- chdr::strs_payload strs;
- strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
- recv_packet->get_payload_size() / sizeof(uint64_t),
- recv_packet->conv_to_host<uint64_t>());
-
- recv_io->release_recv_buff(std::move(buff));
-
- return {strs.capacity_bytes, static_cast<uint32_t>(strs.capacity_pkts)};
- };
-
- // Send a strc init to get the buffer size
- send_strc_init();
- stream_buff_params_t capacity = recv_strs();
- _fc_state.set_dest_capacity(capacity);
-
- UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
- "Received strs initializing buffer capacity to " << capacity.bytes
- << " bytes");
-
- // Calculate the requested fc_freq parameters
- uhd::rfnoc::stream_buff_params_t fc_freq = {
- static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
- static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_freq_ratio))};
-
- const size_t headroom_bytes =
- static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
- const size_t headroom_packets = static_cast<uint32_t>(
- std::ceil(double(capacity.packets) * fc_headroom_ratio));
-
- fc_freq.bytes -= headroom_bytes;
- fc_freq.packets -= headroom_packets;
-
- // Send a strc init to configure fc freq
- send_strc_init(fc_freq);
- recv_strs();
-
- // Release temporary I/O service interfaces to disconnect from it
- send_io.reset();
- recv_io.reset();
- }
-
// Interface to the I/O service
transport::send_io_if::sptr _send_io;
diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
index 937baf982..ed6553bf3 100644
--- a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
@@ -17,12 +17,10 @@ class rx_flow_ctrl_state
{
public:
//! Constructor
- rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {}
-
- //! Initialize frequency parameters
- void initialize(const stream_buff_params_t fc_freq)
+ rx_flow_ctrl_state(
+ const rfnoc::sep_id_pair_t epids, const stream_buff_params_t fc_freq)
+ : _fc_freq(fc_freq), _epids(epids)
{
- _fc_freq = fc_freq;
}
//! Resynchronize with transfer counts from the sender
@@ -50,15 +48,6 @@ public:
}
}
- //! Reset the transfer counts (happens during init)
- void reset_counts()
- {
- UHD_LOGGER_TRACE("rx_flow_ctrl_state")
- << "Resetting transfer counts" << std::endl;
- _recv_counts = {0, 0};
- _xfer_counts = {0, 0};
- }
-
//! Update state when data is received
void data_received(const size_t bytes)
{
diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
index 65fc1b093..0005e7584 100644
--- a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
+++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
@@ -15,11 +15,8 @@ namespace uhd { namespace rfnoc {
class tx_flow_ctrl_state
{
public:
- //! Updates destination capacity
- void set_dest_capacity(const stream_buff_params_t& capacity)
- {
- _dest_capacity = capacity;
- }
+ //! Constructor
+ tx_flow_ctrl_state(const stream_buff_params_t& capacity) : _dest_capacity(capacity) {}
//! Updates destination received count
void update_dest_recv_count(const stream_buff_params_t& recv_count)
@@ -67,7 +64,7 @@ public:
//! Clears fc resync request pending status
void clear_fc_resync_req_pending()
{
- _fc_resync_req = false;
+ _fc_resync_req = false;
_last_fc_resync_bytes = _xfer_counts.bytes;
}
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index 73de394e3..2892e0d6d 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -23,6 +23,8 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/chdr_types.cpp
${CMAKE_CURRENT_SOURCE_DIR}/chdr_packet.cpp
${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_xport.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/chdr_rx_data_xport.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/chdr_tx_data_xport.cpp
${CMAKE_CURRENT_SOURCE_DIR}/client_zero.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ctrl_iface.cpp
${CMAKE_CURRENT_SOURCE_DIR}/device_id.cpp
diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp
index a6cfda0de..929875dbd 100644
--- a/host/lib/rfnoc/chdr_ctrl_xport.cpp
+++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp
@@ -16,10 +16,11 @@ using namespace uhd::transport;
chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv,
send_link_if::sptr send_link,
recv_link_if::sptr recv_link,
+ const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid,
size_t num_send_frames,
size_t num_recv_frames)
- : _my_epid(my_epid)
+ : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic())
{
/* Make dumb send pipe */
send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff,
@@ -34,10 +35,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv,
recv_link_if* /*recv_link*/,
send_link_if
* /*send_link*/) -> bool {
- uint64_t* data = (uint64_t*)buff->data();
- auto hdr = chdr_header(uhd::ntohx<uint64_t>(*data));
- auto pkt_type = hdr.get_pkt_type();
- auto dst_epid = hdr.get_dst_epid();
+ _recv_packet->refresh(buff->data());
+ auto hdr = _recv_packet->get_chdr_header();
+ auto pkt_type = hdr.get_pkt_type();
+ auto dst_epid = hdr.get_dst_epid();
/* Check type and destination EPID */
if ((pkt_type == PKT_TYPE_CTRL) && (dst_epid == _my_epid)) {
@@ -59,10 +60,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv,
recv_link_if* /*recv_link*/,
send_link_if
* /*send_link*/) -> bool {
- uint64_t* data = (uint64_t*)buff->data();
- auto hdr = chdr_header(uhd::ntohx<uint64_t>(*data));
- auto pkt_type = hdr.get_pkt_type();
- auto dst_epid = hdr.get_dst_epid();
+ _recv_packet->refresh(buff->data());
+ auto hdr = _recv_packet->get_chdr_header();
+ auto pkt_type = hdr.get_pkt_type();
+ auto dst_epid = hdr.get_dst_epid();
/* Check type and destination EPID */
if ((pkt_type == PKT_TYPE_MGMT) && (dst_epid == _my_epid)) {
diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp
new file mode 100644
index 000000000..bcd9f7ea9
--- /dev/null
+++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp
@@ -0,0 +1,203 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+using namespace uhd::rfnoc::detail;
+using namespace uhd::transport;
+
+rx_flow_ctrl_sender::rx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.first)
+{
+ _fc_packet = pkt_factory.make_strs();
+ _fc_strs_pyld.src_epid = sep_ids.second;
+}
+
+void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity)
+{
+ _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
+ _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
+}
+
+chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const size_t num_recv_frames,
+ const fc_params_t& fc_params)
+ : _fc_state(epids, fc_params.freq)
+ , _fc_sender(pkt_factory, epids)
+ , _epid(epids.second)
+{
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Creating rx xport with local epid=" << epids.second
+ << ", remote epid=" << epids.first);
+
+ _recv_packet = pkt_factory.make_generic();
+ _fc_sender.set_capacity(fc_params.buff_capacity);
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
+
+ // Make data transport
+ auto recv_cb =
+ [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ auto fc_cb =
+ [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) {
+ this->_fc_callback(std::move(buff), recv_link, send_link);
+ };
+
+ // Needs just a single send frame for responses
+ _recv_io = io_srv->make_recv_client(recv_link,
+ num_recv_frames,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Stream endpoint was configured with:"
+ << std::endl
+ << "capacity bytes=" << fc_params.buff_capacity.bytes
+ << ", packets=" << fc_params.buff_capacity.packets << std::endl
+ << "fc frequency bytes=" << fc_params.freq.bytes
+ << ", packets=" << fc_params.freq.packets);
+}
+
+chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv,
+ recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ mgmt::mgmt_portal& mgmt_portal,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const stream_buff_params_t& recv_capacity,
+ const stream_buff_params_t& fc_freq,
+ const stream_buff_params_t& fc_headroom,
+ const bool lossy_xport)
+{
+ const sep_id_t remote_epid = epids.first;
+ const sep_id_t local_epid = epids.second;
+
+ rx_flow_ctrl_sender fc_sender(pkt_factory, epids);
+ chdr::chdr_packet::uptr pkt = pkt_factory.make_generic();
+ fc_sender.set_capacity(recv_capacity);
+ chdr::strc_payload strc;
+
+ auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff,
+ recv_link_if* /*recv_link*/,
+ send_link_if* /*send_link*/) {
+ pkt->refresh(buff->data());
+ const auto header = pkt->get_chdr_header();
+ const auto dst_epid = header.get_dst_epid();
+
+ if (dst_epid != local_epid) {
+ return false;
+ }
+
+ const auto type = header.get_pkt_type();
+
+ if (type != chdr::PKT_TYPE_STRC) {
+ return false;
+ }
+
+ strc.deserialize(pkt->get_payload_const_ptr_as<uint64_t>(),
+ pkt->get_payload_size() / sizeof(uint64_t),
+ pkt->conv_to_host<uint64_t>());
+
+ if (strc.op_code != chdr::STRC_INIT) {
+ throw uhd::value_error("Unexpected opcode value in STRC packet.");
+ }
+
+ return true;
+ };
+
+ auto fc_cb = [&fc_sender](buff_t::uptr buff,
+ recv_link_if* recv_link,
+ send_link_if* send_link) {
+ recv_link->release_recv_buff(std::move(buff));
+
+ // Send a strs response to configure flow control on the sender. The
+ // byte and packet counts are not important since they are reset by
+ // the stream endpoint on receipt of this packet.
+ fc_sender.send_strs(send_link, {0, 0});
+ };
+
+ // Create a temporary recv_io to receive the strc init
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ /* num_recv_frames*/ 1,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ // Create a control transport with the rx data links to send mgmt packets
+ // needed to setup the stream
+ // Piggyback on frames from the recv_io_if
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ pkt_factory,
+ local_epid,
+ 0, // num_send_frames
+ 0); // num_recv_frames
+
+ // Setup a route to the EPID
+ // Note that this may be gratuitous--The endpoint may already have been set up
+ mgmt_portal.setup_local_route(*ctrl_xport, remote_epid);
+
+ // Initialize flow control - first, the management portal sends a stream
+ // command containing its requested flow control frequency
+ mgmt_portal.config_local_rx_stream_start(*ctrl_xport,
+ remote_epid,
+ lossy_xport,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ fc_freq,
+ fc_headroom);
+
+ // Now, release the buffer. In the flow control callback for the recv_io
+ // (fc_cb above), we send a stream status containing the xport buffer
+ // capacity.
+ auto buff = recv_io->get_recv_buff(100);
+ if (!buff) {
+ throw uhd::runtime_error(
+ "rx xport timed out getting a response from mgmt_portal");
+ }
+ recv_io->release_recv_buff(std::move(buff));
+
+ // Finally, let the management portal know the setup is complete
+ mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid);
+
+ // The flow control frequency requested is contained in the strc
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Received strc init with fc freq"
+ << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
+
+ fc_params_t fc_params;
+ fc_params.buff_capacity = recv_capacity;
+ fc_params.freq = {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)};
+
+ recv_io.reset();
+ ctrl_xport.reset();
+
+ return fc_params;
+}
diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp
new file mode 100644
index 000000000..cb28c7ac9
--- /dev/null
+++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp
@@ -0,0 +1,249 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+using namespace uhd::rfnoc::detail;
+using namespace uhd::transport;
+
+tx_flow_ctrl_sender::tx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.second)
+{
+ _fc_packet = pkt_factory.make_strc();
+ _fc_strc_pyld.src_epid = sep_ids.first;
+ _fc_strc_pyld.op_code = chdr::STRC_RESYNC;
+}
+
+chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const size_t num_send_frames,
+ const fc_params_t fc_params)
+ : _fc_state(fc_params.buff_capacity)
+ , _fc_sender(pkt_factory, epids)
+ , _epid(epids.first)
+{
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Creating tx xport with local epid=" << epids.first
+ << ", remote epid=" << epids.second);
+
+ _send_header.set_dst_epid(epids.second);
+ _send_packet = pkt_factory.make_generic();
+ _recv_packet = pkt_factory.make_generic();
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = send_link->get_send_frame_size() - pyld_offset;
+
+ // Now create the send I/O we will use for data
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ this->_send_callback(buff, send_link);
+ };
+
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ // Needs just a single recv frame for strs packets
+ _send_io = io_srv->make_send_client(send_link,
+ num_send_frames,
+ send_cb,
+ recv_link,
+ /* num_recv_frames */ 1,
+ recv_cb);
+}
+
+/*
+ * To configure flow control, we need to send an init strc packet, then
+ * receive a strs containing the stream endpoint ingress buffer size. We
+ * then repeat this (now that we know the buffer size) to configure the flow
+ * control frequency. To avoid having this logic within the data packet
+ * processing flow, we use temporary send and recv I/O instances with
+ * simple callbacks here.
+ */
+static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_srv,
+ recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const sep_id_pair_t epids,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+{
+ chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
+ chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic();
+
+ // No flow control at initialization, just release all send buffs
+ auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+ };
+
+ // For recv, just queue strs packets for recv_io to read
+ auto recv_cb = [&recv_packet, epids](frame_buff::uptr& buff,
+ recv_link_if* /*recv_link*/,
+ send_link_if* /*send_link*/) {
+ recv_packet->refresh(buff->data());
+ const auto header = recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ return (dst_epid == epids.first && type == chdr::PKT_TYPE_STRS);
+ };
+
+ // No flow control at initialization, just release all recv buffs
+ auto fc_cb =
+ [](frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* /*send_link*/) {
+ recv_link->release_recv_buff(std::move(buff));
+ };
+
+ auto send_io = io_srv->make_send_client(send_link,
+ 1, // num_send_frames
+ send_cb,
+ nullptr,
+ 0, // num_recv_frames
+ nullptr);
+
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ 1, // num_recv_frames
+ recv_cb,
+ nullptr,
+ 0, // num_send_frames
+ fc_cb);
+
+ // Function to send a strc init
+ auto send_strc_init = [&send_io, epids, &strc_packet](
+ const stream_buff_params_t fc_freq = {0, 0}) {
+ frame_buff::uptr buff = send_io->get_send_buff(0);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out getting a send buffer for strc init");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(0);
+ header.set_dst_epid(epids.second);
+
+ chdr::strc_payload strc_pyld;
+ strc_pyld.src_epid = epids.first;
+ strc_pyld.op_code = chdr::STRC_INIT;
+ strc_pyld.num_bytes = fc_freq.bytes;
+ strc_pyld.num_pkts = fc_freq.packets;
+ strc_packet->refresh(buff->data(), header, strc_pyld);
+
+ const size_t size = header.get_length();
+ buff->set_packet_size(size);
+ send_io->release_send_buff(std::move(buff));
+ };
+
+ // Function to receive a strs, returns buffer capacity
+ auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
+ frame_buff::uptr buff = recv_io->get_recv_buff(200);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out wating for a strs packet during initialization");
+ }
+
+ recv_packet->refresh(buff->data());
+ UHD_ASSERT_THROW(
+ recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
+ chdr::strs_payload strs;
+ strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ recv_packet->get_payload_size() / sizeof(uint64_t),
+ recv_packet->conv_to_host<uint64_t>());
+
+ recv_io->release_recv_buff(std::move(buff));
+
+ return {strs.capacity_bytes, static_cast<uint32_t>(strs.capacity_pkts)};
+ };
+
+ // Send a strc init to get the buffer size
+ send_strc_init();
+ stream_buff_params_t capacity = recv_strs();
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Received strs initializing buffer capacity to " << capacity.bytes << " bytes");
+
+ // Calculate the requested fc_freq parameters
+ stream_buff_params_t fc_freq = {
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
+ static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_freq_ratio))};
+
+ const size_t headroom_bytes =
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
+ const size_t headroom_packets =
+ static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_headroom_ratio));
+
+ fc_freq.bytes -= headroom_bytes;
+ fc_freq.packets -= headroom_packets;
+
+ // Send a strc init to configure fc freq
+ send_strc_init(fc_freq);
+ recv_strs();
+
+ // Release temporary I/O service interfaces to disconnect from it
+ send_io.reset();
+ recv_io.reset();
+
+ return {capacity};
+}
+
+chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sptr io_srv,
+ recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ mgmt::mgmt_portal& mgmt_portal,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+{
+ const sep_id_t remote_epid = epids.second;
+ const sep_id_t local_epid = epids.first;
+
+ // Create a control transport with the tx data links to send mgmt packets
+ // needed to setup the stream. Only need one frame for this.
+ auto ctrl_xport = chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ pkt_factory,
+ local_epid,
+ 1, // num_send_frames
+ 1); // num_recv_frames
+
+ // Setup a route to the EPID
+ mgmt_portal.setup_local_route(*ctrl_xport, remote_epid);
+
+ mgmt_portal.config_local_tx_stream(
+ *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);
+
+ // We no longer need the control xport, release it so
+ // the control xport is no longer connected to the I/O service.
+ ctrl_xport.reset();
+
+ return configure_flow_ctrl(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ epids,
+ fc_freq_ratio,
+ fc_headroom_ratio);
+}