aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-05-23 20:38:07 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:29 -0800
commit75a090543b8fb8e7c875387eee6d3fe7227e4450 (patch)
tree2904b48607cc07158aa6b068ada35ab56c4da516 /host/lib
parentd8e9705bc6c34b8d015b56a76955ee2f15426bd8 (diff)
downloaduhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.gz
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.bz2
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.zip
rfnoc: add rx and tx transports, and amend rfnoc_graph
transports: Transports build on I/O service and implements flow control and sequence number checking. The rx streamer subclass extends the streamer implementation to connect it to the rfnoc graph. It receives configuration values from property propagation and configures the streamer accordingly. It also implements the issue_stream_cmd rx_streamer API method. Add implementation of rx streamer creation and method to connect it to an rfnoc block. rfnoc_graph: Cache more connection info, clarify contract Summary of changes: - rfnoc_graph stores more information about static connections at the beginning. Some search algorithms are replaced by simpler lookups. - The contract for connect() was clarified. It is required to call connect, even for static connections.
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_packet.hpp9
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp481
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp550
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_types.hpp2
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp32
-rw-r--r--host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp13
-rw-r--r--host/lib/include/uhdlib/rfnoc/mb_iface.hpp44
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp21
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp95
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp90
-rw-r--r--host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp130
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp99
-rw-r--r--host/lib/include/uhdlib/transport/get_aligned_buffs.hpp175
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp341
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp207
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp307
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp147
-rw-r--r--host/lib/rfnoc/CMakeLists.txt2
-rw-r--r--host/lib/rfnoc/chdr_packet.cpp18
-rw-r--r--host/lib/rfnoc/graph_stream_manager.cpp34
-rw-r--r--host/lib/rfnoc/link_stream_manager.cpp94
-rw-r--r--host/lib/rfnoc/mgmt_portal.cpp10
-rw-r--r--host/lib/rfnoc/rfnoc_graph.cpp494
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp141
-rw-r--r--host/lib/rfnoc/rfnoc_tx_streamer.cpp124
25 files changed, 3334 insertions, 326 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
index 770c6cf6f..cc729de6c 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
@@ -114,6 +114,15 @@ public:
*/
virtual void* get_payload_ptr() = 0;
+ /*! Return the payload offset in bytes for a given type and num_mdata
+ *
+ * \param pkt_type The packet type for calculation
+ * \param num_mdata The number of metadata words for calculation
+ * \return The offset of the payload in a packet with the given params
+ */
+ virtual size_t calculate_payload_offset(const packet_type_t pkt_type,
+ const uint8_t num_mdata = 0) const = 0;
+
//! Shortcut to return the const metadata pointer cast as a specific type
template <typename data_t>
inline const data_t* get_mdata_const_ptr_as() const
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
new file mode 100644
index 000000000..69dceebe1
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -0,0 +1,481 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+
+#include <uhd/config.hpp>
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send rx flow control responses
+ */
+class rx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.first)
+ {
+ _fc_packet = pkt_factory.make_strs();
+ _fc_strs_pyld.src_epid = sep_ids.second;
+ }
+
+ /*! Configure buffer capacity
+ * \param recv_capacity The buffer capacity of the receive link
+ */
+ void set_capacity(const stream_buff_params_t& recv_capacity)
+ {
+ _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
+ _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
+ }
+
+ /*! Send a flow control response packet
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strs_payload fc_payload(_fc_strs_pyld);
+ fc_payload.xfer_count_bytes = counts.bytes;
+ fc_payload.xfer_count_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strs_packet::uptr _fc_packet;
+
+ // Pre-configured strs payload to hold values that don't change
+ chdr::strs_payload _fc_strs_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for RX chdr data
+ *
+ * This transport provides the streamer an interface to read RX data packets.
+ * The transport implements flow control and sequence number checking.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For an rx transport, the device sends data packets, and the host sends strs
+ * packets letting the device know that buffer space in the host has been freed.
+ * For lossy links, the device also sends strc packets to resynchronize the
+ * transfer counts between host and device, to correct for any dropped packets
+ * in the link.
+ */
+class chdr_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_rx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_recv_frames Num frames to reserve from the recv link
+ * \param recv_capacity Total capacity of the recv link
+ * \param fc_freq Frequency of flow control status messages
+ * \param fc_headroom Headroom for flow control status messages
+ * \param lossy_xport Whether the xport is lossy, for flow control configuration
+ */
+ chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_recv_frames,
+ const stream_buff_params_t& recv_capacity,
+ const stream_buff_params_t& fc_freq,
+ const stream_buff_params_t& fc_headroom,
+ const bool lossy_xport)
+ : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second)
+ {
+ const sep_addr_t remote_sep_addr = addrs.first;
+ const sep_addr_t local_sep_addr = addrs.second;
+ const sep_id_t remote_epid = epids.first;
+ const sep_id_t local_epid = epids.second;
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Creating rx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _recv_packet = pkt_factory.make_generic();
+ _fc_sender.set_capacity(recv_capacity);
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
+
+ // Make data transport
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ this->_fc_callback(std::move(buff), recv_link, send_link);
+ };
+
+ // Needs just a single send frame for responses
+ _recv_io = io_srv->make_recv_client(recv_link,
+ num_recv_frames,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ // Create a control transport with the rx data links to send mgmt packets
+ // needed to setup the stream
+ // Piggyback on frames from the recv_io_if
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 0, // num_send_frames
+ 0); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ // Note that this may be gratuitous--The endpoint may already have been set up
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ // Initialize flow control - management portal sends a stream command
+ // containing its requested flow control frequency, the rx transport
+ // responds with a stream status containing its buffer capacity.
+ data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport,
+ remote_epid,
+ lossy_xport,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ fc_freq,
+ fc_headroom);
+
+ data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid);
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Stream endpoint was configured with:"
+ << std::endl
+ << "capacity bytes=" << recv_capacity.bytes
+ << ", packets=" << recv_capacity.packets << std::endl
+ << "fc headroom bytes=" << fc_headroom.bytes
+ << ", packets=" << fc_headroom.packets << std::endl
+ << "fc frequency bytes=" << fc_freq.bytes
+ << ", packets=" << fc_freq.packets);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*! Returns maximum number payload bytes
+ *
+ * \return maximum payload bytes per packet
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets an RX frame buffer containing a recv packet
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return returns a tuple containing:
+ * - a frame_buff, or null if timeout occurs
+ * - info struct corresponding to the packet
+ * - whether the packet was out of sequence
+ */
+ std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms);
+
+ if (!buff) {
+ return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false);
+ }
+
+ auto info = _read_data_packet_info(buff);
+ bool seq_error = _is_out_of_sequence(std::get<1>(info));
+
+ return std::make_tuple(std::move(buff), std::get<0>(info), seq_error);
+ }
+
+ /*!
+ * Releases an RX frame buffer
+ *
+ * \param buff the frame buffer to release
+ */
+ void release_recv_buff(typename buff_t::uptr buff)
+ {
+ _recv_io->release_recv_buff(std::move(buff));
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+ const auto packet_size = buff->packet_size();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRC) {
+ chdr::strc_payload strc;
+ strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ const stream_buff_params_t strc_counts = {
+ strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)};
+
+ if (strc.op_code == chdr::STRC_RESYNC) {
+ // Resynchronize before updating fc_state, the strc payload
+ // contains counts before the strc packet itself
+ _fc_state.resynchronize(strc_counts);
+
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ } else if (strc.op_code == chdr::STRC_INIT) {
+ _fc_state.initialize(
+ {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)});
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Received strc init with fc freq"
+ << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
+
+ // Make sure flow control was initialized
+ assert(_fc_state.get_fc_freq().bytes > 0);
+ assert(_fc_state.get_fc_freq().packets > 0);
+
+ // Send a strs response to configure flow control on the sender
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+
+ // Reset counts, since mgmt_portal will do it to FPGA
+ _fc_state.reset_counts();
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ } else {
+ throw uhd::value_error("Unexpected opcode value in STRC packet.");
+ }
+
+ // For stream commands, we return true (packet was destined to this
+ // client) but release the buffer. The I/O service won't queue this
+ // packet in the recv_io_if.
+ return true;
+
+ } else if (type == chdr::PKT_TYPE_DATA_NO_TS
+ || type == chdr::PKT_TYPE_DATA_WITH_TS) {
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ // If this is a data packet, just claim it by returning true. The
+ // I/O service will queue this packet in the recv_io_if.
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ /*!
+ * Flow control callback for I/O service
+ *
+ * The I/O service invokes this callback when a packet needs to be released
+ * to the recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link to which to release the buffer
+ * \param send_link the send link for flow control messages
+ */
+ void _fc_callback(buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+ recv_link->release_recv_buff(std::move(buff));
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ }
+
+ /*!
+ * Sends a flow control response packet if necessary.
+ *
+ * \param send_link the send link for flow control messages
+ */
+ void _send_fc_response(transport::send_link_if* send_link)
+ {
+ if (_fc_state.fc_resp_due()) {
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+ _fc_state.fc_resp_sent();
+ }
+ }
+
+ /*!
+ * Checks if the sequence number is out of sequence, prints 'D' if it is
+ * and returns result of check.
+ *
+ * \return true if a sequence error occurred
+ */
+ UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num)
+ {
+ const uint16_t expected_packet_count = _data_seq_num;
+ _data_seq_num = seq_num + 1;
+
+ if (expected_packet_count != seq_num) {
+ UHD_LOG_FASTPATH("D");
+ return true;
+ }
+ return false;
+ }
+
+ /*!
+ * Reads packet header and returns information in a struct.
+ *
+ * \return a tuple containing the packet info and packet sequence number
+ */
+ std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff)
+ {
+ const void* data = buff->data();
+ _recv_packet->refresh(data);
+ const auto header = _recv_packet->get_chdr_header();
+ const auto optional_time = _recv_packet->get_timestamp();
+
+ packet_info_t info;
+ info.eob = header.get_eob();
+ info.has_tsf = optional_time.is_initialized();
+ info.tsf = optional_time ? *optional_time : 0;
+ info.payload_bytes = _recv_packet->get_payload_size();
+ info.payload = _recv_packet->get_payload_const_ptr();
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_io->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad CHDR header or invalid packet length.");
+ }
+
+ return std::make_tuple(info, header.get_seq_num());
+ }
+
+ // Interface to the I/O service
+ transport::recv_io_if::sptr _recv_io;
+
+ // Flow control state
+ rx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Packet for received data
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strs flow control response packets
+ detail::rx_flow_ctrl_sender _fc_sender;
+
+ // Local / Sink EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
new file mode 100644
index 000000000..62b811bf5
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -0,0 +1,550 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send tx flow control messages
+ */
+class tx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ tx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.second)
+ {
+ _fc_packet = pkt_factory.make_strc();
+ _fc_strc_pyld.src_epid = sep_ids.first;
+ _fc_strc_pyld.op_code = chdr::STRC_RESYNC;
+ }
+
+ /*!
+ * Sends a flow control resync packet
+ *
+ * Sends a strc packet with the resync opcode to make the device transfer
+ * counts match those of the host, to correct for dropped packets.
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ size_t send_strc_resync(
+ transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strc_payload fc_payload(_fc_strc_pyld);
+ fc_payload.num_bytes = counts.bytes;
+ fc_payload.num_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ return size;
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strc_packet::uptr _fc_packet;
+
+ // Pre-configured strc payload to hold values that don't change
+ chdr::strc_payload _fc_strc_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for TX chdr data
+ *
+ * This transport provides the streamer an interface to send TX data packets.
+ * The transport implements flow control and keeps track of sequence numbers.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For a tx transport, the host sends data packets, and the device sends strs
+ * packets letting the host know that buffer space in the device stream endpoint
+ * has been freed. For lossy links, the host also sends strc packets to
+ * resynchronize the transfer counts between host and device, to correct for
+ * any dropped packets in the link.
+ */
+class chdr_tx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Information about data packet
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_send_frames Num frames to reserve from the send link
+ * \param fc_freq_ratio Ratio to use to configure the device fc frequency
+ * \param fc_headroom_ratio Ratio to use to configure the device fc headroom
+ */
+ chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_send_frames,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ : _fc_sender(pkt_factory, epids), _epid(epids.first)
+ {
+ const sep_addr_t remote_sep_addr = addrs.second;
+ const sep_addr_t local_sep_addr = addrs.first;
+ const sep_id_t remote_epid = epids.second;
+ const sep_id_t local_epid = epids.first;
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Creating tx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _send_header.set_dst_epid(epids.second);
+ _send_packet = pkt_factory.make_generic();
+ _recv_packet = pkt_factory.make_generic();
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = send_link->get_send_frame_size() - pyld_offset;
+
+ _configure_sep(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ local_sep_addr,
+ local_epid,
+ remote_sep_addr,
+ remote_epid,
+ pyld_buff_fmt,
+ mdata_buff_fmt);
+
+ _initialize_flow_ctrl(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ epids,
+ fc_freq_ratio,
+ fc_headroom_ratio);
+
+ // Now create the send I/O we will use for data
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ this->_send_callback(buff, send_link);
+ };
+
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ // Needs just a single recv frame for strs packets
+ _send_io = io_srv->make_send_client(send_link,
+ num_send_frames,
+ send_cb,
+ recv_link,
+ /* num_recv_frames */ 1,
+ recv_cb);
+ }
+
+ /*! Returns maximum number of payload bytes
+ *
+ * \return maximum number of payload bytes
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets a TX frame buffer
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return the frame buffer, or nullptr if timeout occurs
+ */
+ buff_t::uptr get_send_buff(const int32_t timeout_ms)
+ {
+ return _send_io->get_send_buff(timeout_ms);
+ }
+
+ /*!
+ * Sends a TX data packet
+ *
+ * \param buff the frame buffer containing the packet to send
+ */
+ void release_send_buff(buff_t::uptr buff)
+ {
+ _send_io->release_send_buff(std::move(buff));
+ }
+
+ /*!
+ * Writes header into frame buffer and returns payload pointer
+ *
+ * \param buff Frame buffer to write header into
+ * \param info Information to include in the header
+ * \return A pointer to the payload data area and the packet size in bytes
+ */
+ std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff,
+ const packet_info_t& info)
+ {
+ uint64_t tsf = 0;
+
+ if (info.has_tsf) {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS);
+ tsf = info.tsf;
+ } else {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS);
+ }
+
+ _send_header.set_eob(info.eob);
+ _send_header.set_seq_num(_data_seq_num++);
+
+ _send_packet->refresh(buff->data(), _send_header, tsf);
+ _send_packet->update_payload_size(info.payload_bytes);
+
+ return std::make_pair(
+ _send_packet->get_payload_ptr(),
+ _send_packet->get_chdr_header().get_length());
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRS) {
+ chdr::strs_payload strs;
+ strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ _fc_state.update_dest_recv_count({strs.xfer_count_bytes,
+ static_cast<uint32_t>(strs.xfer_count_pkts)});
+
+ // TODO: check strs status here and push into async msg queue
+
+ // Packet belongs to this transport, release buff and return true
+ recv_link->release_recv_buff(std::move(buff));
+ buff = nullptr;
+ return true;
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ }
+
+ /*!
+ * Send callback for I/O service
+ *
+ * The I/O service invokes this callback when it is requested to release
+ * a send buffer to the send link.
+ *
+ * \param buff the frame buffer to release
+ * \param send_link the send link for flow control messages
+ */
+ void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+
+ if (_fc_state.dest_has_space(packet_size)) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+
+ _fc_state.data_sent(packet_size);
+
+ if (_fc_state.get_fc_resync_req_pending()
+ && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) {
+ const auto& xfer_counts = _fc_state.get_xfer_counts();
+ const size_t strc_size =
+ _fc_sender.send_strc_resync(send_link, xfer_counts);
+ _fc_state.clear_fc_resync_req_pending();
+ _fc_state.data_sent(strc_size);
+ }
+ }
+ }
+
+ /*!
+ * Configures the stream endpoint using mgmt_portal
+ */
+ void _configure_sep(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_t& local_sep_addr,
+ const uhd::rfnoc::sep_id_t& local_epid,
+ const uhd::rfnoc::sep_addr_t& remote_sep_addr,
+ const uhd::rfnoc::sep_id_t& remote_epid,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt)
+ {
+ // Create a control transport with the tx data links to send mgmt packets
+ // needed to setup the stream. Only need one frame for this.
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 1, // num_send_frames
+ 1); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ data_mgmt_portal->config_local_tx_stream(
+ *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*!
+ * Initializes flow control
+ *
+ * To initialize flow control, we need to send an init strc packet, then
+ * receive a strs containing the stream endpoint ingress buffer size. We
+ * then repeat this (now that we know the buffer size) to configure the flow
+ * control frequency. To avoid having this logic within the data packet
+ * processing flow, we use temporary send and recv I/O instances with
+ * simple callbacks here.
+ */
+ void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const sep_id_pair_t sep_ids,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ {
+ // No flow control at initialization, just release all send buffs
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+ };
+
+ // For recv, just queue strs packets for recv_io to read
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* /*recv_link*/,
+ transport::send_link_if* /*send_link*/) {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS);
+ };
+
+ // No flow control at initialization, just release all recv buffs
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/) {
+ recv_link->release_recv_buff(std::move(buff));
+ };
+
+ auto send_io = io_srv->make_send_client(send_link,
+ 1, // num_send_frames
+ send_cb,
+ nullptr,
+ 0, // num_recv_frames
+ nullptr);
+
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ 1, // num_recv_frames
+ recv_cb,
+ nullptr,
+ 0, // num_send_frames
+ fc_cb);
+
+ chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
+ chdr::chdr_packet::uptr& recv_packet = _recv_packet;
+
+ // Function to send a strc init
+ auto send_strc_init = [&send_io, sep_ids, &strc_packet](
+ const stream_buff_params_t fc_freq = {0, 0}) {
+ transport::frame_buff::uptr buff = send_io->get_send_buff(0);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out getting a send buffer for strc init");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(0);
+ header.set_dst_epid(sep_ids.second);
+
+ chdr::strc_payload strc_pyld;
+ strc_pyld.src_epid = sep_ids.first;
+ strc_pyld.op_code = chdr::STRC_INIT;
+ strc_pyld.num_bytes = fc_freq.bytes;
+ strc_pyld.num_pkts = fc_freq.packets;
+ strc_packet->refresh(buff->data(), header, strc_pyld);
+
+ const size_t size = header.get_length();
+ buff->set_packet_size(size);
+ send_io->release_send_buff(std::move(buff));
+ };
+
+ // Function to receive a strs, returns buffer capacity
+ auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
+ transport::frame_buff::uptr buff = recv_io->get_recv_buff(200);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out wating for a strs packet during initialization");
+ }
+
+ recv_packet->refresh(buff->data());
+ UHD_ASSERT_THROW(
+ recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
+ chdr::strs_payload strs;
+ strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ recv_packet->get_payload_size() / sizeof(uint64_t),
+ recv_packet->conv_to_host<uint64_t>());
+
+ recv_io->release_recv_buff(std::move(buff));
+
+ return {strs.capacity_bytes,
+ static_cast<uint32_t>(strs.capacity_pkts)};
+ };
+
+ // Send a strc init to get the buffer size
+ send_strc_init();
+ stream_buff_params_t capacity = recv_strs();
+ _fc_state.set_dest_capacity(capacity);
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Received strs initializing buffer capacity to "
+ << capacity.bytes << " bytes");
+
+ // Calculate the requested fc_freq parameters
+ uhd::rfnoc::stream_buff_params_t fc_freq = {
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
+ static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_freq_ratio))};
+
+ const size_t headroom_bytes =
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
+ const size_t headroom_packets = static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_headroom_ratio));
+
+ fc_freq.bytes -= headroom_bytes;
+ fc_freq.packets -= headroom_packets;
+
+ // Send a strc init to configure fc freq
+ send_strc_init(fc_freq);
+ recv_strs();
+
+ // Release temporary I/O service interfaces to disconnect from it
+ send_io.reset();
+ recv_io.reset();
+ }
+
+ // Interface to the I/O service
+ transport::send_io_if::sptr _send_io;
+
+ // Flow control state
+ tx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Header to write into send packets
+ chdr::chdr_header _send_header;
+
+ // Packet for send data
+ chdr::chdr_packet::uptr _send_packet;
+
+ // Packet to receive strs messages
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strc flow control ack packets
+ detail::tx_flow_ctrl_sender _fc_sender;
+
+ // Local / Source EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
index 62b24ab61..1f14ea7d0 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
@@ -482,6 +482,8 @@ public: // Members
uint64_t num_pkts = 0;
//! Number of bytes to use for operation (64 bits)
uint64_t num_bytes = 0;
+ //! Size of a strc packet (including header)
+ static constexpr size_t PACKET_SIZE = 24;
public: // Functions
strc_payload() = default;
diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
index 120b0e0f8..28fa8ec7c 100644
--- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
@@ -7,11 +7,14 @@
#ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
#define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
+#include <uhd/stream.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/client_zero.hpp>
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -84,6 +87,7 @@ public:
virtual detail::client_zero::sptr get_client_zero(
sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0;
+
/*! Configure a flow controlled data stream from the endpoint with ID src_epid to the
* endpoint with ID dst_epid
*
@@ -102,7 +106,33 @@ public:
const double fc_headroom_ratio,
const bool reset = false) = 0;
- // TODO: Implement functions to get graph-wide streamers
+ /*! \brief Create a data stream going from the device to the host
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
+
+ /*! \brief Create a data stream going from the host to the device
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
/*!
* \brief Create a graph_stream_manager and return a unique_ptr to it
diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
index 79121a498..72f1cf1c7 100644
--- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
@@ -11,6 +11,7 @@
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -112,41 +113,29 @@ public:
/*! \brief Create a data stream going from the host to the device
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
/*! \brief Create a data stream going from the device to the host
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
const sep_addr_t src_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
static uptr make(const chdr::chdr_packet_factory& pkt_factory,
diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
index 0a2790a34..33a0e3df0 100644
--- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
@@ -8,21 +8,14 @@
#define INCLUDED_LIBUHD_MB_IFACE_HPP
#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
-#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <uhdlib/rfnoc/clock_iface.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
-// FIXME: Update this
-class chdr_rx_data_xport
-{
-public:
- using uptr = std::unique_ptr<chdr_rx_data_xport>;
-};
-
-using chdr_tx_data_xport = chdr_rx_data_xport;
-
/*! Motherboard (backchannel) interface
*
* In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to
@@ -70,7 +63,8 @@ public:
/*! Return a reference to a clock iface
*/
- virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0;
+ virtual std::shared_ptr<clock_iface> get_clock_iface(
+ const std::string& clock_name) = 0;
/*! Create a control transport
*
@@ -89,30 +83,34 @@ public:
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (sink) streaming endpoint ID
- * \param remote_epid Remote device (source) streaming endpoint ID
+ * \param addrs Address of the device and host stream endpoints
+ * \param epids Endpoint IDs of the device and host stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR RX data transport
*/
- virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
/*! Create an TX data transport
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (source) streaming endpoint ID
- * \param remote_epid Remote device (sink) streaming endpoint ID
+ * \param addrs Address of the host and device stream endpoints
+ * \param epids Endpoint IDs of the host and device stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR TX data transport
*/
- virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
};
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
index 7ec1b7bb2..bc56fd311 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
@@ -41,6 +41,8 @@ using device_id_t = uint16_t;
using sep_inst_t = uint16_t;
//! Stream Endpoint Physical Address Type
using sep_addr_t = std::pair<device_id_t, sep_inst_t>;
+//! Stream Endpoint Physical Address Type (first = source, second = destination)
+using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>;
//! Stream Endpoint ID Type
using sep_id_t = uint16_t;
//! Stream Endpoint pair Type (first = source, second = destination)
@@ -65,6 +67,19 @@ struct stream_buff_params_t
//! The data type of the buffer used to capture/generate data
enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
+//! Conversion from number of bits to sw_buff
+constexpr sw_buff_t bits_to_sw_buff(size_t bits)
+{
+ if (bits <= 8) {
+ return BUFF_U8;
+ } else if (bits <= 16) {
+ return BUFF_U16;
+ } else if (bits <= 32) {
+ return BUFF_U32;
+ } else {
+ return BUFF_U64;
+ }
+}
//----------------------------------------------
// Constants
@@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
constexpr uint16_t RFNOC_PROTO_VER = 0x0100;
+constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_CAPACITY_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_FREQ_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1;
+constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1;
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
new file mode 100644
index 000000000..6ced60d19
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -0,0 +1,95 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_rx_streamer : public node_t,
+ public transport::rx_streamer_impl<chdr_rx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "RxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Implementation of rx_streamer API method
+ *
+ * \param stream_cmd the stream command to issue
+ */
+ void issue_stream_cmd(const stream_cmd_t& stream_cmd);
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_in;
+ std::vector<property_t<double>> _samp_rate_in;
+ std::vector<property_t<double>> _tick_rate_in;
+ std::vector<property_t<std::string>> _type_in;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
new file mode 100644
index 000000000..4acee45cc
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
@@ -0,0 +1,90 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/transport/tx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_tx_streamer : public node_t,
+ public transport::tx_streamer_impl<chdr_tx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "TxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_out;
+ std::vector<property_t<double>> _samp_rate_out;
+ std::vector<property_t<double>> _tick_rate_out;
+ std::vector<property_t<std::string>> _type_out;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..937baf982
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
@@ -0,0 +1,130 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage rx flow control state
+class rx_flow_ctrl_state
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {}
+
+ //! Initialize frequency parameters
+ void initialize(const stream_buff_params_t fc_freq)
+ {
+ _fc_freq = fc_freq;
+ }
+
+ //! Resynchronize with transfer counts from the sender
+ void resynchronize(const stream_buff_params_t counts)
+ {
+ if (_recv_counts.bytes != counts.bytes
+ || _recv_counts.packets != counts.packets) {
+ // If there is a discrepancy between the amount of data sent by
+ // the device and received by the transport, adjust the counts
+ // of data received and transferred to include the dropped data.
+ auto bytes_dropped = counts.bytes - _recv_counts.bytes;
+ auto pkts_dropped = counts.packets - _recv_counts.packets;
+ _xfer_counts.bytes += bytes_dropped;
+ _xfer_counts.packets += pkts_dropped;
+
+ UHD_LOGGER_DEBUG("rx_flow_ctrl_state")
+ << "oh noes: bytes_sent=" << counts.bytes
+ << " bytes_received=" << _recv_counts.bytes
+ << " pkts_sent=" << counts.packets
+ << " pkts_received=" << _recv_counts.packets
+ << " src_epid=" << _epids.first << " dst_epid=" << _epids.second
+ << std::endl;
+
+ _recv_counts = counts;
+ }
+ }
+
+ //! Reset the transfer counts (happens during init)
+ void reset_counts()
+ {
+ UHD_LOGGER_TRACE("rx_flow_ctrl_state")
+ << "Resetting transfer counts" << std::endl;
+ _recv_counts = {0, 0};
+ _xfer_counts = {0, 0};
+ }
+
+ //! Update state when data is received
+ void data_received(const size_t bytes)
+ {
+ _recv_counts.bytes += bytes;
+ _recv_counts.packets++;
+ }
+
+ //! Update state when transfer is complete (buffer space freed)
+ void xfer_done(const size_t bytes)
+ {
+ _xfer_counts.bytes += bytes;
+ _xfer_counts.packets++;
+ }
+
+ //! Returns whether a flow control response is needed
+ bool fc_resp_due() const
+ {
+ stream_buff_params_t accum_counts = {
+ _xfer_counts.bytes - _last_fc_resp_counts.bytes,
+ _xfer_counts.packets - _last_fc_resp_counts.packets};
+
+ return accum_counts.bytes >= _fc_freq.bytes
+ || accum_counts.packets >= _fc_freq.packets;
+ }
+
+ //! Update state after flow control response was sent
+ void fc_resp_sent()
+ {
+ _last_fc_resp_counts = _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_recv_counts() const
+ {
+ return _recv_counts;
+ }
+
+ //! Returns configured flow control frequency
+ stream_buff_params_t get_fc_freq() const
+ {
+ return _fc_freq;
+ }
+
+private:
+ // Counts for data received, including any data still in use
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Counts for data read and whose buffer space is ok to reuse
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts sent in last flow control response
+ stream_buff_params_t _last_fc_resp_counts{0, 0};
+
+ // Frequency of flow control responses
+ stream_buff_params_t _fc_freq{0, 0};
+
+ // Endpoint ID for log messages
+ const sep_id_pair_t _epids;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..65fc1b093
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
@@ -0,0 +1,99 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage tx flow control state
+class tx_flow_ctrl_state
+{
+public:
+ //! Updates destination capacity
+ void set_dest_capacity(const stream_buff_params_t& capacity)
+ {
+ _dest_capacity = capacity;
+ }
+
+ //! Updates destination received count
+ void update_dest_recv_count(const stream_buff_params_t& recv_count)
+ {
+ _recv_counts = recv_count;
+ }
+
+ /*! Returns whether the destination has buffer space for the requested
+ * packet size
+ */
+ bool dest_has_space(const size_t packet_size)
+ {
+ // The stream endpoint only cares about bytes, the packet count is not
+ // important to determine the space available.
+ const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes;
+ const auto space_available = _dest_capacity.bytes - buffer_fullness;
+ return space_available >= packet_size;
+ }
+
+ //! Increments transfer count with amount of data sent
+ void data_sent(const size_t packet_size)
+ {
+ _xfer_counts.bytes += packet_size;
+ _xfer_counts.packets++;
+
+ // Request an fc resync after we have transferred a number of bytes >=
+ // to the destination capacity. There is no strict requirement on how
+ // often we need to send this, as it is only needed to correct for
+ // dropped packets. One buffer's worth of bytes is probably a good
+ // cadence.
+ if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) {
+ _fc_resync_req = true;
+ }
+ }
+
+ /*! Returns whether an fc resync request is pending. The policy we use
+ * here is to send an fc resync every time we send a number of bytes
+ * equal to the destination buffer capacity.
+ */
+ bool get_fc_resync_req_pending() const
+ {
+ return _fc_resync_req;
+ }
+
+ //! Clears fc resync request pending status
+ void clear_fc_resync_req_pending()
+ {
+ _fc_resync_req = false;
+ _last_fc_resync_bytes = _xfer_counts.bytes;
+ }
+
+ //! Returns counts for packets sent
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+private:
+ // Counts for data sent
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts for data received by the destination
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Buffer size at the destination
+ stream_buff_params_t _dest_capacity{0, 0};
+
+ // Counts sent in last flow control resync
+ size_t _last_fc_resync_bytes = 0;
+
+ // Track when to send ack packets
+ bool _fc_resync_req = false;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
new file mode 100644
index 000000000..662be6d2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
@@ -0,0 +1,175 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <boost/format.hpp>
+
+namespace uhd { namespace transport {
+
+// Number of iterations that get_aligned_buffs will attempt to time align
+// packets before returning an alignment failure. get_aligned_buffs increments
+// the iteration count when it finds a timestamp that is larger than the
+// timestamps on channels it has already aligned and thus has to restart
+// aligning timestamps on all channels to the new timestamp.
+constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000;
+
+/*!
+ * Implementation of rx time alignment. This method reads packets from the
+ * transports for each channel and discards any packets whose tsf does not
+ * match those of other channels due to dropped packets. Packets that do not
+ * have a tsf are not checked for alignment and never dropped.
+ */
+template <typename transport_t>
+class get_aligned_buffs
+{
+public:
+ enum alignment_result_t {
+ SUCCESS,
+ TIMEOUT,
+ SEQUENCE_ERROR,
+ ALIGNMENT_FAILURE,
+ BAD_PACKET
+ };
+
+ get_aligned_buffs(std::vector<typename transport_t::uptr>& xports,
+ std::vector<typename transport_t::buff_t::uptr>& frame_buffs,
+ std::vector<typename transport_t::packet_info_t>& infos)
+ : _xports(xports)
+ , _frame_buffs(frame_buffs)
+ , _infos(infos)
+ , _prev_tsf(_xports.size(), 0)
+ , _channels_to_align(_xports.size())
+ {
+ }
+
+ alignment_result_t operator()(const int32_t timeout_ms)
+ {
+ // Clear state
+ _channels_to_align.set();
+ bool time_valid = false;
+ uint64_t tsf = 0;
+ size_t iterations = 0;
+
+ while (_channels_to_align.any()) {
+ const size_t chan = _channels_to_align.find_first();
+ auto& xport = _xports[chan];
+ auto& info = _infos[chan];
+ auto& frame_buff = _frame_buffs[chan];
+ bool seq_error = false;
+
+ // Receive a data packet for the channel if we don't have one. A
+ // packet may already be there if the previous call was interrupted
+ // by an error.
+ if (!frame_buff) {
+ try {
+ std::tie(frame_buff, info, seq_error) =
+ xport->get_recv_buff(timeout_ms);
+ } catch (const uhd::value_error& e) {
+ // Bad packet
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive transport caught a value exception.\n%s")
+ % e.what();
+ return BAD_PACKET;
+ }
+ }
+
+ if (!frame_buff) {
+ return TIMEOUT;
+ }
+
+ if (info.has_tsf) {
+ const bool time_out_of_order = _prev_tsf[chan] > info.tsf;
+ _prev_tsf[chan] = info.tsf;
+
+ // If the user changes the device time while streaming, we can
+ // receive a packet that comes before the previous packet in
+ // time. This would cause the alignment logic to discard future
+ // received packets. Therefore, when this occurs, we reset the
+ // info to restart the alignment.
+ if (time_out_of_order) {
+ time_valid = false;
+ }
+
+ // Check if the time is larger than packets received for other
+ // channels, and if so, use this time to align all channels
+ if (!time_valid || info.tsf > tsf) {
+ // If we haven't found a set of aligned packets after many
+ // iterations, return an alignment failure
+ if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << "The rx streamer failed to time-align packets.";
+ return ALIGNMENT_FAILURE;
+ }
+
+ // Release buffers for channels aligned previously. Keep
+ // buffers that don't have a tsf since we don't need to
+ // align those.
+ for (size_t i = 0; i < _xports.size(); i++) {
+ if (!_channels_to_align.test(i) && _infos[i].has_tsf) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ _frame_buffs[i] = nullptr;
+ }
+ }
+
+ // Mark only this channel as aligned and save its tsf
+ _channels_to_align.set();
+ _channels_to_align.reset(chan);
+ time_valid = true;
+ tsf = info.tsf;
+ }
+
+ // Check if the time matches that of other aligned channels
+ else if (info.tsf == tsf) {
+ _channels_to_align.reset(chan);
+ }
+
+ // Otherwise, time is smaller than other channels, release the buffer
+ else {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+ } else {
+ // Packet doesn't have a tsf, just mark it as aligned
+ _channels_to_align.reset(chan);
+ }
+
+ // If this packet had a sequence error, stop to return the error.
+ // Keep the packet for the next call to get_aligned_buffs.
+ if (seq_error) {
+ return SEQUENCE_ERROR;
+ }
+ }
+
+ // All channels aligned
+ return SUCCESS;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr>& _xports;
+
+ // Storage for buffers resulting from alignment
+ std::vector<typename transport_t::buff_t::uptr>& _frame_buffs;
+
+ // Packet info corresponding to aligned buffers
+ std::vector<typename transport_t::packet_info_t>& _infos;
+
+ // Time of previous packet for each channel
+ std::vector<uint64_t> _prev_tsf;
+
+ // Keeps track of channels that are aligned
+ boost::dynamic_bitset<> _channels_to_align;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
new file mode 100644
index 000000000..d66e867bc
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -0,0 +1,341 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/endianness.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/rx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for error handling
+ *
+ * If a recv call reads data from multiple packets, and an error occurs in the
+ * second or later packets, recv stops short of the num samps requested and
+ * returns no error. The error is cached for the next call to recv.
+ *
+ * Timeout errors are an exception. Timeouts that occur in the second or later
+ * packets of a recv call stop the recv method but the error is not returned in
+ * the next call. The user can check for this condition since fewer samples are
+ * returned than the number requested.
+ */
+class rx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache, ignoring timeout errors
+ UHD_FORCE_INLINE void store(const rx_metadata_t& metadata)
+ {
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE bool check(rx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ metadata = _metadata_cache;
+ _cached_metadata = false;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::rx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of rx streamer API
+ */
+template <typename transport_t>
+class rx_streamer_impl : public rx_streamer
+{
+public:
+ //! Constructor
+ rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_ports)
+ , _in_buffs(num_ports)
+ {
+ if (stream_args.cpu_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a cpu_format!");
+ }
+ if (stream_args.otw_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a otw_format!");
+ }
+ _setup_converters(num_ports, stream_args);
+ _zero_copy_streamer.set_samp_rate(_samp_rate);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ //! Implementation of rx_streamer API method
+ UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const double timeout,
+ const bool one_packet)
+ {
+ if (_error_metadata_cache.check(metadata)) {
+ return 0;
+ }
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ size_t total_samps_recv =
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+
+ if (one_packet or metadata.end_of_burst) {
+ return total_samps_recv;
+ }
+
+ // First set of packets recv had an error, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ return total_samps_recv;
+ }
+
+ // Loop until buffer is filled or error code. This method returns the
+ // metadata from the first packet received, with the exception of
+ // end-of-burst.
+ uhd::rx_metadata_t loop_metadata;
+
+ while (total_samps_recv < nsamps_per_buff) {
+ size_t num_samps = _recv_one_packet(buffs,
+ nsamps_per_buff - total_samps_recv,
+ loop_metadata,
+ timeout_ms,
+ total_samps_recv * _convert_info.bytes_per_cpu_item);
+
+ // If metadata had an error code set, store for next call and return
+ if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ _error_metadata_cache.store(loop_metadata);
+ break;
+ }
+
+ total_samps_recv += num_samps;
+
+ // Return immediately if end of burst
+ if (loop_metadata.end_of_burst) {
+ metadata.end_of_burst = true;
+ break;
+ }
+ }
+
+ return total_samps_recv;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ _zero_copy_streamer.set_samp_rate(rate);
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Receive a single packet
+ UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const int32_t timeout_ms,
+ const size_t buffer_offset_bytes = 0)
+ {
+ if (_buff_samps_remaining == 0) {
+ // Current set of buffers has expired, get the next one
+ _buff_samps_remaining =
+ _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _fragment_offset_in_samps = 0;
+ } else {
+ // There are samples still left in the current set of buffers
+ metadata = _last_fragment_metadata;
+ metadata.time_spec += time_spec_t::from_ticks(
+ _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate);
+ }
+
+ if (_buff_samps_remaining != 0) {
+ const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining);
+
+ // Convert samples to the streamer's output format
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ char* b = reinterpret_cast<char*>(buffs[i]);
+ const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes);
+ _convert_to_out_buff(out_buffs, i, num_samps);
+ }
+
+ _buff_samps_remaining -= num_samps;
+
+ // Write the fragment flags and offset
+ metadata.more_fragments = _buff_samps_remaining != 0;
+ metadata.fragment_offset = _fragment_offset_in_samps;
+
+ if (metadata.more_fragments) {
+ _fragment_offset_in_samps += num_samps;
+ _last_fragment_metadata = metadata;
+ }
+
+ return num_samps;
+ } else {
+ return 0;
+ }
+ }
+
+ //! Convert samples for one channel into its buffer
+ UHD_FORCE_INLINE void _convert_to_out_buff(
+ const uhd::rx_streamer::buffs_type& out_buffs,
+ const size_t chan,
+ const size_t num_samps)
+ {
+ const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]);
+
+ _converters[chan]->conv(buffer_ptr, out_buffs, num_samps);
+
+ // Advance the pointer for the source buffer
+ _in_buffs[chan] =
+ buffer_ptr + num_samps * _convert_info.bytes_per_otw_item;
+
+ if (_buff_samps_remaining == num_samps) {
+ _zero_copy_streamer.release_recv_buff(chan);
+ }
+ }
+
+ //! Create converters and initialize _convert_info
+ void _setup_converters(const size_t num_ports,
+ const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.otw_format + "_chdr";
+ id.num_inputs = 1;
+ id.output_format = stream_args.cpu_format;
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_ports; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(1 / 32767.0);
+ }
+ }
+
+ // Converter and item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Implementation of frame buffer management and packet info
+ rx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Container for buffer pointers used in recv method
+ std::vector<const void*> _in_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Num samps remaining in buffer currently held by zero copy streamer
+ size_t _buff_samps_remaining = 0;
+
+ // Metadata cache for error handling
+ detail::rx_metadata_cache _error_metadata_cache;
+
+ // Fragment (partially read packet) information
+ size_t _fragment_offset_in_samps = 0;
+ rx_metadata_t _last_fragment_metadata;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..36f568f2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -0,0 +1,207 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/get_aligned_buffs.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of rx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class rx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ rx_streamer_zero_copy(const size_t num_ports)
+ : _xports(num_ports)
+ , _frame_buffs(num_ports)
+ , _infos(num_ports)
+ , _get_aligned_buffs(_xports, _frame_buffs, _infos)
+ {
+ }
+
+ ~rx_streamer_zero_copy()
+ {
+ for (size_t i = 0; i < _frame_buffs.size(); i++) {
+ if (_frame_buffs[i]) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ }
+ }
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of time-aligned buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param metadata returns the metadata corresponding to the buffer
+ * \param timeout_ms timeout in milliseconds
+ * \return the size in samples of each packet, or 0 if timeout
+ */
+ size_t get_recv_buffs(std::vector<const void*>& buffs,
+ rx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ metadata.reset();
+
+ switch (_get_aligned_buffs(timeout_ms)) {
+ case get_aligned_buffs_t::SUCCESS:
+ break;
+
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return 0;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return 0;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return 0;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ metadata.has_time_spec = _last_read_time_info.has_time_spec;
+ metadata.time_spec =
+ _last_read_time_info.time_spec
+ + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ return 0;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ // Get payload pointers for each buffer and aggregate eob. We set eob to
+ // true if any channel has it set, since no more data will be received for
+ // that channel. In most cases, all channels should have the same value.
+ bool eob = false;
+ for (size_t i = 0; i < buffs.size(); i++) {
+ buffs[i] = _infos[i].payload;
+ eob |= _infos[i].eob;
+ }
+
+ // Set the metadata from the buffer information at index zero
+ const auto& info_0 = _infos[0];
+
+ metadata.has_time_spec = info_0.has_tsf;
+ metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate);
+ metadata.start_of_burst = false;
+ metadata.end_of_burst = eob;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ // Done with these packets, save timestamp info for next call
+ _last_read_time_info.has_time_spec = metadata.has_time_spec;
+ _last_read_time_info.time_spec = metadata.time_spec;
+ _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+
+ return _last_read_time_info.num_samps;
+ }
+
+ /*!
+ * Release the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ void release_recv_buff(const size_t channel)
+ {
+ _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel]));
+ _frame_buffs[channel] = typename transport_t::buff_t::uptr();
+ }
+
+private:
+ using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+
+ // Information recorded by streamer about the last data packet processed,
+ // used to create the metadata when there is a sequence error.
+ struct last_read_time_info_t
+ {
+ size_t num_samps = 0;
+ bool has_time_spec = false;
+ time_spec_t time_spec;
+ };
+
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Storage for buffers for each channel while they are in flight (between
+ // calls to get_recv_buff and release_recv_buff).
+ std::vector<typename transport_t::buff_t::uptr> _frame_buffs;
+
+ // Packet info corresponding to the packets in flight
+ std::vector<typename transport_t::packet_info_t> _infos;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _samp_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Implementation of packet time alignment
+ get_aligned_buffs_t _get_aligned_buffs;
+
+ // Information about the last data packet processed
+ last_read_time_info_t _last_read_time_info;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
new file mode 100644
index 000000000..60881dad2
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -0,0 +1,307 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhdlib/transport/tx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for send calls with zero samples
+ *
+ * Metadata is cached when we get a send requesting a start of burst with no
+ * samples. It is applied here on the next call to send() that actually has
+ * samples to send.
+ */
+class tx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache
+ UHD_FORCE_INLINE void store(const tx_metadata_t& metadata)
+ {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE void check(tx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ // Only use cached time_spec if metadata does not have one
+ if (!metadata.has_time_spec) {
+ metadata.has_time_spec = _metadata_cache.has_time_spec;
+ metadata.time_spec = _metadata_cache.time_spec;
+ }
+ metadata.start_of_burst = _metadata_cache.start_of_burst;
+ metadata.end_of_burst = _metadata_cache.end_of_burst;
+ _cached_metadata = false;
+ }
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::tx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of tx streamer API
+ */
+template <typename transport_t>
+class tx_streamer_impl : public tx_streamer
+{
+public:
+ tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_chans)
+ , _zero_buffs(num_chans, &_zero)
+ , _out_buffs(num_chans)
+ {
+ _setup_converters(num_chans, stream_args);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ }
+
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ size_t send(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t& metadata_,
+ const double timeout)
+ {
+ uhd::tx_metadata_t metadata(metadata_);
+
+ if (nsamps_per_buff == 0 && metadata.start_of_burst) {
+ _metadata_cache.store(metadata);
+ return 0;
+ }
+
+ _metadata_cache.check(metadata);
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ if (nsamps_per_buff == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ timeout_ms);
+
+ return 0;
+ } else if (nsamps_per_buff <= _spp) {
+ return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+
+ } else {
+ size_t total_num_samps_sent = 0;
+ const bool eob = metadata.end_of_burst;
+ metadata.end_of_burst = false;
+
+ const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
+ const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ const size_t num_samps_sent = _send_one_packet(
+ buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+
+ total_num_samps_sent += num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_num_samps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
+ }
+
+ // Send the final fragment
+ metadata.end_of_burst = eob;
+
+ size_t nsamps_sent =
+ total_num_samps_sent
+ + _send_one_packet(
+ buffs, total_num_samps_sent, final_length, metadata, timeout);
+
+ return nsamps_sent;
+ }
+ }
+
+ //! Implementation of rx_streamer API method
+ bool recv_async_msg(
+ uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+ {
+ // TODO: implement me
+ return false;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Convert samples for one channel and sends a packet
+ size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t buffer_offset_in_samps,
+ const size_t num_samples,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ if (!_zero_copy_streamer.get_send_buffs(
+ _out_buffs, num_samples, metadata, timeout_ms)) {
+ return 0;
+ }
+
+ size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset;
+ _converters[i]->conv(input_ptr, _out_buffs[i], num_samples);
+
+ _zero_copy_streamer.release_send_buff(i);
+ }
+
+ return num_samples;
+ }
+
+ //! Create converters and initialize _bytes_per_cpu_item
+ void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = stream_args.otw_format + "_chdr";
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(32767.0);
+ }
+ }
+
+ // Converter item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Manages frame buffers and packet info
+ tx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Buffer used to handle send calls with no data
+ std::vector<const void*> _zero_buffs;
+
+ const uint64_t _zero = 0;
+
+ // Container for buffer pointers used in send method
+ std::vector<void*> _out_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Metadata cache for send calls with no data
+ detail::tx_metadata_cache _metadata_cache;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..1b6f55238
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -0,0 +1,147 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of tx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class tx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ tx_streamer_zero_copy(const size_t num_chans)
+ : _xports(num_chans), _frame_buffs(num_chans)
+ {
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of frame buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param nsamps_per_buff the number of samples that will be written to each buffer
+ * \param metadata the metadata to write to the packet header
+ * \param timeout_ms timeout in milliseconds
+ * \return true if the operation was sucessful, false if timeout occurs
+ */
+ UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
+ const size_t nsamps_per_buff,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ // Try to get a buffer per channel
+ for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) {
+ _frame_buffs[_next_buff_to_get].first =
+ _xports[_next_buff_to_get]->get_send_buff(timeout_ms);
+
+ if (!_frame_buffs[_next_buff_to_get].first) {
+ return false;
+ }
+ }
+
+ // Got all the buffers, start from index 0 next call
+ _next_buff_to_get = 0;
+
+ // Store portions of metadata we care about
+ typename transport_t::packet_info_t info;
+ info.has_tsf = metadata.has_time_spec;
+
+ if (metadata.has_time_spec) {
+ info.tsf = metadata.time_spec.to_ticks(_tick_rate);
+ }
+
+ info.payload_bytes = nsamps_per_buff * _bytes_per_item;
+ info.eob = metadata.end_of_burst;
+
+ // Write packet header
+ for (size_t i = 0; i < buffs.size(); i++) {
+ std::tie(buffs[i], _frame_buffs[i].second) =
+ _xports[i]->write_packet_header(_frame_buffs[i].first, info);
+ }
+
+ return true;
+ }
+
+ /*!
+ * Send the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ UHD_FORCE_INLINE void release_send_buff(const size_t channel)
+ {
+ _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second);
+ _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first));
+
+ _frame_buffs[channel].first = nullptr;
+ _frame_buffs[channel].second = 0;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Container to hold frame buffers for each channel and their packet sizes
+ std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Next channel from which to get a buffer, stored as a member to
+ // allow the streamer to continue where it stopped due to timeouts.
+ size_t _next_buff_to_get = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index dfef4f90f..963458fe6 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -53,6 +53,8 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/tick_node_ctrl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tx_stream_terminator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp
# Default block control classes:
${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp
diff --git a/host/lib/rfnoc/chdr_packet.cpp b/host/lib/rfnoc/chdr_packet.cpp
index 653181c04..d4b7494e2 100644
--- a/host/lib/rfnoc/chdr_packet.cpp
+++ b/host/lib/rfnoc/chdr_packet.cpp
@@ -31,7 +31,7 @@ public:
{
assert(pkt_buff);
_pkt_buff = const_cast<uint64_t*>(reinterpret_cast<const uint64_t*>(pkt_buff));
- _compute_mdata_offset();
+ _mdata_offset = _compute_mdata_offset(get_chdr_header());
}
virtual void refresh(void* pkt_buff, chdr_header& header, uint64_t timestamp = 0)
@@ -42,7 +42,7 @@ public:
if (_has_timestamp(header)) {
_pkt_buff[1] = u64_from_host(timestamp);
}
- _compute_mdata_offset();
+ _mdata_offset = _compute_mdata_offset(get_chdr_header());
}
virtual void update_payload_size(size_t payload_size_bytes)
@@ -115,19 +115,27 @@ public:
+ (chdr_w_stride * (_mdata_offset + get_chdr_header().get_num_mdata())));
}
+ virtual size_t calculate_payload_offset(const packet_type_t pkt_type,
+ const uint8_t num_mdata = 0) const
+ {
+ chdr_header header;
+ header.set_pkt_type(pkt_type);
+ return (_compute_mdata_offset(header) + num_mdata) * chdr_w_bytes;
+ }
+
private:
inline bool _has_timestamp(const chdr_header& header) const
{
return (header.get_pkt_type() == PKT_TYPE_DATA_WITH_TS);
}
- inline void _compute_mdata_offset() const
+ inline size_t _compute_mdata_offset(const chdr_header& header) const
{
// The metadata offset depends on the chdr_w and whether we have a timestamp
if (chdr_w == 64) {
- _mdata_offset = _has_timestamp(get_chdr_header()) ? 2 : 1;
+ return _has_timestamp(header) ? 2 : 1;
} else {
- _mdata_offset = 1;
+ return 1;
}
}
diff --git a/host/lib/rfnoc/graph_stream_manager.cpp b/host/lib/rfnoc/graph_stream_manager.cpp
index f2024786a..2db68db04 100644
--- a/host/lib/rfnoc/graph_stream_manager.cpp
+++ b/host/lib/rfnoc/graph_stream_manager.cpp
@@ -8,7 +8,9 @@
#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/graph_stream_manager.hpp>
#include <uhdlib/rfnoc/link_stream_manager.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <boost/format.hpp>
+#include <boost/make_shared.hpp>
#include <map>
#include <set>
@@ -175,6 +177,38 @@ public:
"specified source endpoint");
}
+ chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ const sep_addr_t src_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args)
+ {
+ // TODO: choose a route
+ const device_id_t via_device = NULL_DEVICE_ID;
+
+ return _link_mgrs.at(_check_dst_and_find_src(src_addr, via_device))
+ ->create_device_to_host_data_stream(src_addr,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
+ }
+
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args)
+ {
+ // TODO: choose a route
+ const device_id_t via_device = NULL_DEVICE_ID;
+
+ return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device))
+ ->create_host_to_device_data_stream(dst_addr,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
+ }
+
private:
device_id_t _check_dst_and_find_src(sep_addr_t dst_addr, device_id_t via_device) const
{
diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp
index 4fe183529..6855162de 100644
--- a/host/lib/rfnoc/link_stream_manager.cpp
+++ b/host/lib/rfnoc/link_stream_manager.cpp
@@ -203,86 +203,71 @@ public:
false,
STREAM_SETUP_TIMEOUT);
- // Compute FC frequency and headroom based on buff parameters
- stream_buff_params_t fc_freq{
- static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * fc_freq_ratio)),
- static_cast<uint32_t>(
- std::ceil(double(buff_params.packets) * fc_freq_ratio))};
- stream_buff_params_t fc_headroom{
- static_cast<uint64_t>(
- std::ceil(double(buff_params.bytes) * fc_headroom_ratio)),
- static_cast<uint32_t>(
- std::ceil(double(buff_params.packets) * fc_headroom_ratio))};
-
// Reconfigure flow control using the new frequency and headroom
return _mgmt_portal->config_remote_stream(*_ctrl_xport,
dst_epid,
src_epid,
lossy_xport,
- fc_freq,
- fc_headroom,
+ _get_buff_params_ratio(buff_params, fc_freq_ratio),
+ _get_buff_params_ratio(buff_params, fc_headroom_ratio),
reset,
STREAM_SETUP_TIMEOUT);
}
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args)
{
- // Create a new source endpoint and EPID
- sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++));
- sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr);
- _allocated_epids.insert(src_epid);
_ensure_ep_is_reachable(dst_addr);
- // Generate a new destination EPID instance
+ // Generate a new destination (device) EPID instance
sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid);
- // Create the data transport that we will return to the client
- chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport(
- _my_device_id, src_epid, dst_epid, xport_args);
-
- chdr_ctrl_xport::sptr mgmt_xport =
- _mb_iface.make_ctrl_transport(_my_device_id, src_epid);
-
- // Create new temporary management portal with the transports used for this stream
- // TODO: This is a bit excessive. Maybe we can pair down the functionality of the
- // portal just for route setup purposes. Whatever we do, we *must* use xport in it
- // though otherwise the transport will not behave correctly.
- mgmt_portal::uptr data_mgmt_portal =
- mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid);
-
- // Setup a route to the EPID
- data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid);
- data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid);
if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) {
throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");
}
- // TODO: Implement data transport setup logic here
-
+ // Create a new destination (host) endpoint and EPID
+ sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++));
+ sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr);
+ _allocated_epids.insert(src_epid);
- // Delete the portal when done
- data_mgmt_portal.reset();
- return xport;
+ return _mb_iface.make_tx_data_transport({sw_epid_addr, dst_addr},
+ {src_epid, dst_epid},
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
}
- virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream(
- const sep_addr_t src_addr,
- const bool lossy_xport,
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ sep_addr_t src_addr,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args)
{
- // TODO: Implement me
- return chdr_tx_data_xport::uptr();
+ _ensure_ep_is_reachable(src_addr);
+
+ // Generate a new source (device) EPID instance
+ sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid);
+
+ if (!_mgmt_portal->get_endpoint_info(src_epid).has_data) {
+ throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");
+ }
+
+ // Create a new destination (host) endpoint and EPID
+ sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++));
+ sep_id_t dst_epid = _epid_alloc->allocate_epid(sw_epid_addr);
+ _allocated_epids.insert(dst_epid);
+
+ return _mb_iface.make_rx_data_transport({src_addr, sw_epid_addr},
+ {src_epid, dst_epid},
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
}
private:
@@ -295,7 +280,14 @@ private:
throw uhd::routing_error("Specified endpoint is not reachable");
}
- // A reference to the packet factor
+ stream_buff_params_t _get_buff_params_ratio(
+ const stream_buff_params_t& buff_params, const double ratio)
+ {
+ return {static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * ratio)),
+ static_cast<uint32_t>(std::ceil(double(buff_params.packets) * ratio))};
+ }
+
+ // A reference to the packet factory
const chdr::chdr_packet_factory& _pkt_factory;
// The device address of this software endpoint
const device_id_t _my_device_id;
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp
index b490e0baf..a8b72cbdf 100644
--- a/host/lib/rfnoc/mgmt_portal.cpp
+++ b/host/lib/rfnoc/mgmt_portal.cpp
@@ -794,12 +794,11 @@ private: // Functions
mgmt_hop_t& hop)
{
// Validate flow control parameters
- if (fc_freq.bytes >= (uint64_t(1) << 40)
- || fc_freq.packets >= (uint64_t(1) << 24)) {
+ if (fc_freq.bytes > MAX_FC_FREQ_BYTES || fc_freq.packets > MAX_FC_FREQ_PKTS) {
throw uhd::value_error("Flow control frequency parameters out of bounds");
}
- if (fc_headroom.bytes >= (uint64_t(1) << 16)
- || fc_headroom.packets >= (uint64_t(1) << 8)) {
+ if (fc_headroom.bytes > MAX_FC_HEADROOM_BYTES
+ || fc_headroom.packets > MAX_FC_HEADROOM_PKTS) {
throw uhd::value_error("Flow control headroom parameters out of bounds");
}
@@ -992,7 +991,8 @@ private: // Functions
auto send_buff = xport.get_send_buff(timeout * 1000);
if (not send_buff) {
- UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction");
+ UHD_LOG_ERROR(
+ "RFNOC::MGMT", "Timed out getting send buff for management transaction");
throw uhd::io_error("Timed out getting send buff for management transaction");
}
_send_pkt->refresh(send_buff->data(), header, payload);
diff --git a/host/lib/rfnoc/rfnoc_graph.cpp b/host/lib/rfnoc/rfnoc_graph.cpp
index dd3dd7b90..4bf35cff1 100644
--- a/host/lib/rfnoc/rfnoc_graph.cpp
+++ b/host/lib/rfnoc/rfnoc_graph.cpp
@@ -15,12 +15,18 @@
#include <uhdlib/rfnoc/graph.hpp>
#include <uhdlib/rfnoc/graph_stream_manager.hpp>
#include <uhdlib/rfnoc/rfnoc_device.hpp>
+#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
+#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp>
#include <uhdlib/utils/narrow.hpp>
+#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp> // FIXME remove when rfnoc_device is ready
#include <memory>
using namespace uhd::rfnoc;
+namespace {
+const std::string LOG_ID("RFNOC::GRAPH");
+}
class rfnoc_graph_impl : public rfnoc_graph
{
@@ -33,6 +39,7 @@ public:
, _graph(std::make_unique<uhd::rfnoc::detail::graph_t>())
{
setup_graph(dev_addr);
+ _init_sep_map();
_init_static_connections();
}
@@ -76,31 +83,136 @@ public:
}
if (!has_block(dst_blk)) {
throw uhd::lookup_error(
- std::string("Cannot connect blocks, source block not found: ")
- + src_blk.to_string());
+ std::string("Cannot connect blocks, destination block not found: ")
+ + dst_blk.to_string());
}
+ auto edge_type = _physical_connect(src_blk, src_port, dst_blk, dst_port);
_connect(get_block(src_blk),
src_port,
get_block(dst_blk),
dst_port,
+ edge_type,
skip_property_propagation);
- _physical_connect(src_blk, src_port, dst_blk, dst_port);
}
- void connect(uhd::tx_streamer& /*streamer*/,
- size_t /*strm_port*/,
- const block_id_t& /*dst_blk*/,
- size_t /*dst_port*/)
+ void connect(uhd::tx_streamer::sptr streamer,
+ size_t strm_port,
+ const block_id_t& dst_blk,
+ size_t dst_port)
{
- throw uhd::not_implemented_error("");
+ // Verify the streamer was created by us
+ auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_tx_streamer>(streamer);
+ if (!rfnoc_streamer) {
+ throw uhd::type_error("Streamer is not rfnoc capable");
+ }
+
+ // Verify src_blk even exists in this graph
+ if (!has_block(dst_blk)) {
+ throw uhd::lookup_error(
+ std::string("Cannot connect block to streamer, source block not found: ")
+ + dst_blk.to_string());
+ }
+
+ // Verify src_blk has an SEP upstream
+ graph_edge_t dst_static_edge = _assert_edge(
+ _get_static_edge(
+ [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) {
+ return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port;
+ }),
+ dst_blk.to_string());
+ if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ dst_blk.to_string() + ":" + std::to_string(dst_port)
+ + " is not connected to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+
+ // Now get the name and address of the SEP
+ const std::string sep_block_id = dst_static_edge.src_blockid;
+ const sep_addr_t sep_addr = _sep_map.at(sep_block_id);
+
+ const sw_buff_t pyld_fmt =
+ bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width());
+ const sw_buff_t mdata_fmt = BUFF_U64;
+
+ auto xport = _gsm->create_host_to_device_data_stream(sep_addr,
+ pyld_fmt,
+ mdata_fmt,
+ rfnoc_streamer->get_stream_args().args);
+
+ rfnoc_streamer->connect_channel(strm_port, std::move(xport));
+
+ //// If this worked, then also connect the streamer in the BGL graph
+ auto dst = get_block(dst_blk);
+ graph_edge_t edge_info(strm_port, dst_port, graph_edge_t::TX_STREAM, true);
+ _graph->connect(rfnoc_streamer.get(), dst.get(), edge_info);
}
- void connect(const block_id_t& /*src_blk*/,
- size_t /*src_port*/,
- uhd::rx_streamer& /*streamer*/,
- size_t /*strm_port*/)
+ void connect(const block_id_t& src_blk,
+ size_t src_port,
+ uhd::rx_streamer::sptr streamer,
+ size_t strm_port)
{
- throw uhd::not_implemented_error("");
+ // Verify the streamer was created by us
+ auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_rx_streamer>(streamer);
+ if (!rfnoc_streamer) {
+ throw uhd::type_error("Streamer is not rfnoc capable");
+ }
+
+ // Verify src_blk even exists in this graph
+ if (!has_block(src_blk)) {
+ throw uhd::lookup_error(
+ std::string("Cannot connect block to streamer, source block not found: ")
+ + src_blk.to_string());
+ }
+
+ // Verify src_blk has an SEP downstream
+ graph_edge_t src_static_edge = _assert_edge(
+ _get_static_edge(
+ [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) {
+ return edge.src_blockid == src_blk_id && edge.src_port == src_port;
+ }),
+ src_blk.to_string());
+ if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ src_blk.to_string() + ":" + std::to_string(src_port)
+ + " is not connected to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+
+ // Now get the name and address of the SEP
+ const std::string sep_block_id = src_static_edge.dst_blockid;
+ const sep_addr_t sep_addr = _sep_map.at(sep_block_id);
+
+ const sw_buff_t pyld_fmt =
+ bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width());
+ const sw_buff_t mdata_fmt = BUFF_U64;
+
+ auto xport = _gsm->create_device_to_host_data_stream(sep_addr,
+ pyld_fmt,
+ mdata_fmt,
+ rfnoc_streamer->get_stream_args().args);
+
+ rfnoc_streamer->connect_channel(strm_port, std::move(xport));
+
+ // If this worked, then also connect the streamer in the BGL graph
+ auto src = get_block(src_blk);
+ graph_edge_t edge_info(src_port, strm_port, graph_edge_t::RX_STREAM, true);
+ _graph->connect(src.get(), rfnoc_streamer.get(), edge_info);
+ }
+
+ uhd::rx_streamer::sptr create_rx_streamer(
+ const size_t num_chans, const uhd::stream_args_t& args)
+ {
+ return boost::make_shared<rfnoc_rx_streamer>(num_chans, args);
+ }
+
+ uhd::tx_streamer::sptr create_tx_streamer(
+ const size_t num_chans, const uhd::stream_args_t& args)
+ {
+ return boost::make_shared<rfnoc_tx_streamer>(num_chans, args);
}
std::shared_ptr<mb_controller> get_mb_controller(const size_t mb_index = 0)
@@ -152,7 +264,7 @@ private:
throw uhd::key_error(std::string("Found no RFNoC devices for ----->\n")
+ dev_addr.to_pp_string());
}
- _tree = _device->get_tree();
+ _tree = _device->get_tree();
_num_mboards = _tree->list("/mboards").size();
for (size_t i = 0; i < _num_mboards; ++i) {
_mb_controllers.emplace(i, _device->get_mb_controller(i));
@@ -170,7 +282,8 @@ private:
try {
_gsm = graph_stream_manager::make(pkt_factory, epid_alloc, links);
} catch (uhd::io_error& ex) {
- UHD_LOG_ERROR("RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what());
+ UHD_LOG_ERROR(
+ "RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what());
throw;
}
@@ -187,6 +300,9 @@ private:
_gsm->connect_host_to_device(ctrl_sep_addr);
// Grab and stash the Client Zero for this mboard
detail::client_zero::sptr mb_cz = _gsm->get_client_zero(ctrl_sep_addr);
+ // Client zero port numbers are based on the control xbar numbers,
+ // which have the client 0 interface first, followed by stream
+ // endpoints, and then the blocks.
_client_zeros.emplace(mb_idx, mb_cz);
const size_t num_blocks = mb_cz->get_num_blocks();
@@ -204,7 +320,7 @@ private:
// Iterate through and register each of the blocks in this mboard
for (size_t portno = 0; portno < num_blocks; ++portno) {
- auto noc_id = mb_cz->get_noc_id(portno + first_block_port);
+ auto noc_id = mb_cz->get_noc_id(portno + first_block_port);
auto block_factory_info = factory::get_block_factory(noc_id);
auto block_info = mb_cz->get_block_info(portno + first_block_port);
block_id_t block_id(mb_idx,
@@ -222,24 +338,25 @@ private:
// iface object through the mb_iface
auto ctrlport_clk_iface =
mb.get_clock_iface(block_factory_info.ctrlport_clk);
- auto tb_clk_iface = (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) ?
- std::make_shared<clock_iface>(CLOCK_KEY_GRAPH) :
- mb.get_clock_iface(block_factory_info.timebase_clk);
+ auto tb_clk_iface =
+ (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH)
+ ? std::make_shared<clock_iface>(CLOCK_KEY_GRAPH)
+ : mb.get_clock_iface(block_factory_info.timebase_clk);
// A "graph" clock is always "running"
if (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) {
tb_clk_iface->set_running(true);
}
- auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr,
+ auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr,
portno,
*ctrlport_clk_iface.get(),
*tb_clk_iface.get());
- auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>();
+ auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>();
make_args_uptr->noc_id = noc_id;
- make_args_uptr->block_id = block_id;
- make_args_uptr->num_input_ports = block_info.num_inputs;
- make_args_uptr->num_output_ports = block_info.num_outputs;
- make_args_uptr->reg_iface = block_reg_iface;
- make_args_uptr->tb_clk_iface = tb_clk_iface;
+ make_args_uptr->block_id = block_id;
+ make_args_uptr->num_input_ports = block_info.num_inputs;
+ make_args_uptr->num_output_ports = block_info.num_outputs;
+ make_args_uptr->reg_iface = block_reg_iface;
+ make_args_uptr->tb_clk_iface = tb_clk_iface;
make_args_uptr->ctrlport_clk_iface = ctrlport_clk_iface;
make_args_uptr->mb_control = (factory::has_requested_mb_access(noc_id)
? _mb_controllers.at(mb_idx)
@@ -262,40 +379,43 @@ private:
_block_registry->init_props();
}
+ void _init_sep_map()
+ {
+ for (size_t mb_idx = 0; mb_idx < get_num_mboards(); ++mb_idx) {
+ auto remote_device_id = _device->get_mb_iface(mb_idx).get_remote_device_id();
+ auto& cz = _client_zeros.at(mb_idx);
+ for (size_t sep_idx = 0; sep_idx < cz->get_num_stream_endpoints();
+ ++sep_idx) {
+ // Register ID in _port_block_map
+ block_id_t id(mb_idx, NODE_ID_SEP, sep_idx);
+ _port_block_map.insert({{mb_idx, sep_idx + 1}, id});
+ _sep_map.insert({id.to_string(), sep_addr_t(remote_device_id, sep_idx)});
+ }
+ }
+ }
+
void _init_static_connections()
{
UHD_LOG_TRACE("RFNOC::GRAPH", "Identifying static connections...");
for (auto& kv_cz : _client_zeros) {
- auto& adjacency_list = kv_cz.second->get_adjacency_list();
- const size_t first_block_port = 1 + kv_cz.second->get_num_stream_endpoints();
-
+ auto& adjacency_list = kv_cz.second->get_adjacency_list();
for (auto& edge : adjacency_list) {
// Assemble edge
auto graph_edge = graph_edge_t();
- if (edge.src_blk_index < first_block_port) {
- block_id_t id(kv_cz.first, NODE_ID_SEP, edge.src_blk_index - 1);
- _port_block_map.insert({{kv_cz.first, edge.src_blk_index}, id});
- graph_edge.src_blockid = id.to_string();
- } else {
- graph_edge.src_blockid =
- _port_block_map.at({kv_cz.first, edge.src_blk_index});
- }
- if (edge.dst_blk_index < first_block_port) {
- block_id_t id(kv_cz.first, NODE_ID_SEP, edge.dst_blk_index - 1);
- _port_block_map.insert({{kv_cz.first, edge.dst_blk_index}, id});
- graph_edge.dst_blockid = id.to_string();
- } else {
- graph_edge.dst_blockid =
- _port_block_map.at({kv_cz.first, edge.dst_blk_index});
- }
+ UHD_ASSERT_THROW(
+ _port_block_map.count({kv_cz.first, edge.src_blk_index}));
+ graph_edge.src_blockid =
+ _port_block_map.at({kv_cz.first, edge.src_blk_index});
+ UHD_ASSERT_THROW(
+ _port_block_map.count({kv_cz.first, edge.dst_blk_index}));
+ graph_edge.dst_blockid =
+ _port_block_map.at({kv_cz.first, edge.dst_blk_index});
graph_edge.src_port = edge.src_blk_port;
graph_edge.dst_port = edge.dst_blk_port;
graph_edge.edge = graph_edge_t::edge_t::STATIC;
_static_edges.push_back(graph_edge);
- UHD_LOG_TRACE("RFNOC::GRAPH",
- "Static connection: "
- << graph_edge.src_blockid << ":" << graph_edge.src_port << " -> "
- << graph_edge.dst_blockid << ":" << graph_edge.dst_port);
+ UHD_LOG_TRACE(
+ "RFNOC::GRAPH", "Static connection: " << graph_edge.to_string());
}
}
}
@@ -312,214 +432,98 @@ private:
size_t src_port,
std::shared_ptr<node_t> dst_blk,
size_t dst_port,
+ graph_edge_t::edge_t edge_type,
bool skip_property_propagation)
{
graph_edge_t edge_info(
- src_port, dst_port, graph_edge_t::DYNAMIC, not skip_property_propagation);
+ src_port, dst_port, edge_type, not skip_property_propagation);
edge_info.src_blockid = src_blk->get_unique_id();
edge_info.dst_blockid = dst_blk->get_unique_id();
_graph->connect(src_blk.get(), dst_blk.get(), edge_info);
}
- /*! Helper method to find a stream endpoint connected to a block
- *
- * \param blk_id the block connected to the stream endpoint
- * \param port the port connected to the stream endpoint
- * \param blk_is_src true if the block is a data source, false if it is a
- * destination
- * \return the address of the stream endpoint, or boost::none if it is not
- * directly connected to a stream endpoint
- */
- boost::optional<sep_addr_t> _get_adjacent_sep(
- const block_id_t& blk_id, const size_t port, const bool blk_is_src) const
- {
- const std::string block_id_str = get_block(blk_id)->get_block_id().to_string();
- UHD_LOG_TRACE("RFNOC::GRAPH",
- "Finding SEP for " << (blk_is_src ? "source" : "dst") << " block "
- << block_id_str << ":" << port);
- // TODO: This is an attempt to simplify the algo, but it turns out to be
- // as many lines as before:
- //auto edge_predicate = [blk_is_src, block_id_str](const graph_edge_t edge) {
- //if (blk_is_src) {
- //return edge.src_blockid == block_id_str;
- //}
- //return edge.dst_blockid == block_id_str;
- //};
-
- //auto blk_edge_it =
- //std::find_if(_static_edges.cbegin(), _static_edges.cend(), edge_predicate);
- //if (blk_edge_it == _static_edges.cend()) {
- //return boost::none;
- //}
-
- //const std::string sep_block_id = blk_is_src ?
- //blk_edge_it->dst_blockid : blk_edge_it->src_blockid;
- //UHD_LOG_TRACE("RFNOC::GRAPH",
- //"Found SEP: " << sep_block_id);
-
- //auto port_map_result = std::find_if(_port_block_map.cbegin(),
- //_port_block_map.cend,
- //[sep_block_id](std::pair<std::pair<size_t, size_t>, block_id_t> port_block) {
- //return port_block.second == sep_block_id;
- //});
- //if (port_map_result == _port_block_map.cend()) {
- //throw uhd::lookup_error(
- //std::string("SEP `") + sep_block_id + "' not found in port/block map!");
- //}
- //const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id();
- //const sep_inst_t sep_inst = blk_is_src ?
- //edge.dst_blk_index - 1 : edge.src_blk_index - 1;
- //return sep_addr_t(dev, sep_inst);
-
- const auto& info = _xbar_block_config.at(block_id_str);
- const auto mb_idx = blk_id.get_device_no();
- const auto cz = _client_zeros.at(mb_idx);
-
- const size_t first_block_port = 1 + cz->get_num_stream_endpoints();
-
- for (const auto& edge : cz->get_adjacency_list()) {
- const auto edge_blk_idx = blk_is_src ? edge.src_blk_index
- : edge.dst_blk_index;
- const auto edge_blk_port = blk_is_src ? edge.src_blk_port : edge.dst_blk_port;
-
- if ((edge_blk_idx == info.xbar_port + first_block_port)
- and (edge_blk_port == port)) {
- UHD_LOGGER_DEBUG("RFNOC::GRAPH")
- << boost::format("Block found in adjacency list. %d:%d->%d:%d")
- % edge.src_blk_index
- % static_cast<unsigned int>(edge.src_blk_port)
- % edge.dst_blk_index
- % static_cast<unsigned int>(edge.dst_blk_port);
-
- // Check that the block is connected to a stream endpoint. The
- // minus one here is because index zero is client 0.
- const sep_inst_t sep_inst = blk_is_src ?
- edge.dst_blk_index - 1 : edge.src_blk_index - 1;
-
- if (sep_inst < cz->get_num_stream_endpoints()) {
- const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id();
- return sep_addr_t(dev, sep_inst);
- } else {
- // Block is connected to another block
- return boost::none;
- }
- }
- }
- return boost::none;
- }
-
/*! Internal physical connection helper
*
* Make the connections in the physical device
*
- * \throws connect_disallowed_on_src
- * if the source port is statically connected to a *different* block
- * \throws connect_disallowed_on_dst
- * if the destination port is statically connected to a *different* block
+ * \throws uhd::routing_error
+ * if the blocks are statically connected to something else
*/
- void _physical_connect(const block_id_t& src_blk,
+ graph_edge_t::edge_t _physical_connect(const block_id_t& src_blk,
size_t src_port,
const block_id_t& dst_blk,
size_t dst_port)
{
- auto src_blk_ctrl = get_block(src_blk);
- auto dst_blk_ctrl = get_block(dst_blk);
-
- /*
- * Start by determining if the connection can be made
- * Get the adjacency list and check if the connection is in it already
- */
- // Read the adjacency list for the source and destination blocks
- auto src_mb_idx = src_blk.get_device_no();
- auto src_cz = _gsm->get_client_zero(
- sep_addr_t(_device->get_mb_iface(src_mb_idx).get_remote_device_id(), 0));
- std::vector<detail::client_zero::edge_def_t>& adj_list =
- src_cz->get_adjacency_list();
- // Check the src_blk
- auto src_blk_xbar_info =
- _xbar_block_config.at(src_blk_ctrl->get_block_id().to_string());
- // This "xbar_port" starts at the first block, so we need to add the client zero
- // and stream endpoint ports
- const auto src_xbar_port =
- src_blk_xbar_info.xbar_port + src_cz->get_num_stream_endpoints() + 1;
- // We can also find out which stream endpoint the src block is connected to
- sep_inst_t src_sep;
- for (detail::client_zero::edge_def_t edge : adj_list) {
- if ((edge.src_blk_index == src_xbar_port)
- and (edge.src_blk_port == src_port)) {
- UHD_LOGGER_DEBUG("RFNOC::GRAPH")
- << boost::format("Block found in adjacency list. %d:%d->%d:%d")
- % edge.src_blk_index
- % static_cast<unsigned int>(edge.src_blk_port)
- % edge.dst_blk_index
- % static_cast<unsigned int>(edge.dst_blk_port);
- if (edge.dst_blk_index <= src_cz->get_num_stream_endpoints()) {
- src_sep =
- edge.dst_blk_index - 1 /* minus 1 because port 0 is client zero*/;
- } else {
- // TODO connect_disallowed_on_src?
- // TODO put more info in exception
- throw uhd::routing_error(
- "Unable to connect to statically connected source port");
- }
- }
+ const std::string src_blk_info =
+ src_blk.to_string() + ":" + std::to_string(src_port);
+ const std::string dst_blk_info =
+ dst_blk.to_string() + ":" + std::to_string(dst_port);
+
+ // Find the static edge for src_blk:src_port
+ graph_edge_t src_static_edge = _assert_edge(
+ _get_static_edge(
+ [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) {
+ return edge.src_blockid == src_blk_id && edge.src_port == src_port;
+ }),
+ src_blk_info);
+
+ // Now see if it's already connected to the destination
+ if (src_static_edge.dst_blockid == dst_blk.to_string()
+ && src_static_edge.dst_port == dst_port) {
+ UHD_LOG_TRACE(LOG_ID,
+ "Blocks " << src_blk_info << " and " << dst_blk_info
+ << " are already statically connected, no physical connection "
+ "required.");
+ return graph_edge_t::STATIC;
}
- // Read the dst adjacency list if its different
- // TODO they may be on the same mboard, which would make this redundant
- auto dst_mb_idx = dst_blk.get_device_no();
- auto dst_cz = _gsm->get_client_zero(
- sep_addr_t(_device->get_mb_iface(dst_mb_idx).get_remote_device_id(), 0));
- adj_list = dst_cz->get_adjacency_list();
- // Check the dst blk
- auto dst_blk_xbar_info =
- _xbar_block_config.at(dst_blk_ctrl->get_block_id().to_string());
- // This "xbar_port" starts at the first block, so we need to add the client zero
- // and stream endpoint ports
- const auto dst_xbar_port =
- dst_blk_xbar_info.xbar_port + dst_cz->get_num_stream_endpoints() + 1;
- // We can also find out which stream endpoint the dst block is connected to
- sep_inst_t dst_sep;
- for (detail::client_zero::edge_def_t edge : adj_list) {
- if ((edge.dst_blk_index == dst_xbar_port)
- and (edge.dst_blk_port == dst_port)) {
- UHD_LOGGER_DEBUG("RFNOC::GRAPH")
- << boost::format("Block found in adjacency list. %d:%d->%d:%d")
- % edge.src_blk_index
- % static_cast<unsigned int>(edge.src_blk_port)
- % edge.dst_blk_index
- % static_cast<unsigned int>(edge.dst_blk_port);
- if (edge.src_blk_index <= dst_cz->get_num_stream_endpoints()) {
- dst_sep =
- edge.src_blk_index - 1 /* minus 1 because port 0 is client zero*/;
- } else {
- // TODO connect_disallowed_on_dst?
- // TODO put more info in exception
- throw uhd::routing_error(
- "Unable to connect to statically connected destination port");
- }
- }
+ // If they're not statically connected, the source *must* be connected
+ // to an SEP, or this route is impossible
+ if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ src_blk_info + " is neither statically connected to " + dst_blk_info
+ + " nor to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
}
- /* TODO: we checked if either port is used in a static connection (and its not if
- * we've made it this far). We also need to check something else, but I can't
- * remember what...
- */
-
- // At this point, we know the attempted connection is valid, so let's go ahead and
- // make it
- sep_addr_t src_sep_addr(
- _device->get_mb_iface(src_mb_idx).get_remote_device_id(), src_sep);
- sep_addr_t dst_sep_addr(
- _device->get_mb_iface(dst_mb_idx).get_remote_device_id(), dst_sep);
+ // OK, now we know which source SEP we have
+ const std::string src_sep_info = src_static_edge.dst_blockid;
+ const sep_addr_t src_sep_addr = _sep_map.at(src_sep_info);
+
+ // Now find the static edge for the destination SEP
+ auto dst_static_edge = _assert_edge(
+ _get_static_edge(
+ [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) {
+ return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port;
+ }),
+ dst_blk_info);
+
+ // If they're not statically connected, the source *must* be connected
+ // to an SEP, or this route is impossible
+ if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ dst_blk_info + " is neither statically connected to " + src_blk_info
+ + " nor to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+
+ // OK, now we know which destination SEP we have
+ const std::string dst_sep_info = dst_static_edge.src_blockid;
+ const sep_addr_t dst_sep_addr = _sep_map.at(dst_sep_info);
+
+ // Now all we need to do is dynamically connect those SEPs
auto strm_info = _gsm->create_device_to_device_data_stream(
dst_sep_addr, src_sep_addr, false, 0.1, 0.0, false);
- UHD_LOGGER_INFO("RFNOC::GRAPH")
+ UHD_LOGGER_DEBUG(LOG_ID)
<< boost::format("Data stream between EPID %d and EPID %d established "
"where downstream buffer can hold %lu bytes and %u packets")
% std::get<0>(strm_info).first % std::get<0>(strm_info).second
% std::get<1>(strm_info).bytes % std::get<1>(strm_info).packets;
+
+ return graph_edge_t::DYNAMIC;
}
//! Flush and reset each connected port on the mboard
@@ -541,6 +545,35 @@ private:
mb_cz->reset_ctrl(block_portno);
}
}
+
+ /*! Find the static edge that matches \p pred
+ *
+ * \throws uhd::assertion_error if the edge can't be found. So be careful!
+ */
+ template <typename UnaryPredicate>
+ boost::optional<graph_edge_t> _get_static_edge(UnaryPredicate&& pred)
+ {
+ auto edge_it = std::find_if(_static_edges.cbegin(), _static_edges.cend(), pred);
+ if (edge_it == _static_edges.cend()) {
+ return boost::none;
+ }
+ return *edge_it;
+ }
+
+ /*! Make sure an optional edge info is valid, or throw.
+ */
+ graph_edge_t _assert_edge(
+ boost::optional<graph_edge_t> edge_o, const std::string& blk_info)
+ {
+ if (!bool(edge_o)) {
+ const std::string err_msg = std::string("Cannot connect block ") + blk_info
+ + ", port is unconnected in the FPGA!";
+ UHD_LOG_ERROR("RFNOC::GRAPH", err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+ return edge_o.get();
+ }
+
/**************************************************************************
* Attributes
*************************************************************************/
@@ -580,6 +613,9 @@ private:
// or SEP
std::map<std::pair<size_t, size_t>, block_id_t> _port_block_map;
+ //! Map SEP block ID (e.g. 0/SEP#0) onto a sep_addr_t
+ std::unordered_map<std::string, sep_addr_t> _sep_map;
+
//! List of statically connected edges. Includes SEPs too!
std::vector<graph_edge_t> _static_edges;
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
new file mode 100644
index 000000000..4340faff0
--- /dev/null
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -0,0 +1,141 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/defaults.hpp>
+#include <uhdlib/rfnoc/node_accessor.hpp>
+#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
+#include <atomic>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+
+const std::string STREAMER_ID = "RxStreamer";
+static std::atomic<uint64_t> streamer_inst_ctr;
+
+rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
+ const uhd::stream_args_t stream_args)
+ : rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args)
+ , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
+ , _stream_args(stream_args)
+{
+ // No block to which to forward properties or actions
+ set_prop_forwarding_policy(forwarding_policy_t::DROP);
+ set_action_forwarding_policy(forwarding_policy_t::DROP);
+
+ // Initialize properties
+ _scaling_in.reserve(num_chans);
+ _samp_rate_in.reserve(num_chans);
+ _tick_rate_in.reserve(num_chans);
+ _type_in.reserve(num_chans);
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _register_props(i, stream_args.otw_format);
+ }
+ node_accessor_t node_accessor{};
+ node_accessor.init_props(this);
+}
+
+std::string rfnoc_rx_streamer::get_unique_id() const
+{
+ return _unique_id;
+}
+
+size_t rfnoc_rx_streamer::get_num_input_ports() const
+{
+ return get_num_channels();
+}
+
+size_t rfnoc_rx_streamer::get_num_output_ports() const
+{
+ return 0;
+}
+
+void rfnoc_rx_streamer::issue_stream_cmd(const stream_cmd_t& stream_cmd)
+{
+ if (get_num_channels() > 1 and stream_cmd.stream_now
+ and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) {
+ throw uhd::runtime_error(
+ "Invalid recv stream command - stream now on multiple channels in a "
+ "single streamer will fail to time align.");
+ }
+
+ auto cmd = stream_cmd_action_info::make(stream_cmd.stream_mode);
+ cmd->stream_cmd = stream_cmd;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const res_source_info info(res_source_info::INPUT_EDGE, i);
+ post_action(info, cmd);
+ }
+}
+
+const uhd::stream_args_t& rfnoc_rx_streamer::get_stream_args() const
+{
+ return _stream_args;
+}
+
+bool rfnoc_rx_streamer::check_topology(
+ const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs)
+{
+ // Check that all channels are connected
+ if (connected_inputs.size() != get_num_input_ports()) {
+ return false;
+ }
+
+ // Call base class to check that connections are valid
+ return node_t::check_topology(connected_inputs, connected_outputs);
+}
+
+void rfnoc_rx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
+{
+ // Create actual properties and store them
+ _scaling_in.push_back(property_t<double>(
+ PROP_KEY_SCALING, {res_source_info::INPUT_EDGE, chan}));
+ _samp_rate_in.push_back(
+ property_t<double>(PROP_KEY_SAMP_RATE, {res_source_info::INPUT_EDGE, chan}));
+ _tick_rate_in.push_back(property_t<double>(
+ PROP_KEY_TICK_RATE, {res_source_info::INPUT_EDGE, chan}));
+ _type_in.emplace_back(property_t<std::string>(
+ PROP_KEY_TYPE, otw_format, {res_source_info::INPUT_EDGE, chan}));
+
+ // Give us some shorthands for the rest of this function
+ property_t<double>* scaling_in = &_scaling_in.back();
+ property_t<double>* samp_rate_in = &_samp_rate_in.back();
+ property_t<double>* tick_rate_in = &_tick_rate_in.back();
+ property_t<std::string>* type_in = &_type_in.back();
+
+ // Register them
+ register_property(scaling_in);
+ register_property(samp_rate_in);
+ register_property(tick_rate_in);
+ register_property(type_in);
+
+ // Add resolvers
+ add_property_resolver({scaling_in}, {},
+ [&scaling_in = *scaling_in, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `scaling_in'@" << chan);
+ if (scaling_in.is_valid()) {
+ this->set_scale_factor(chan, scaling_in.get() / 32767.0);
+ }
+ });
+
+ add_property_resolver({samp_rate_in}, {},
+ [&samp_rate_in = *samp_rate_in, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `samp_rate_in'@" << chan);
+ if (samp_rate_in.is_valid()) {
+ this->set_samp_rate(samp_rate_in.get());
+ }
+ });
+
+ add_property_resolver({tick_rate_in}, {},
+ [&tick_rate_in = *tick_rate_in, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `tick_rate_in'@" << chan);
+ if (tick_rate_in.is_valid()) {
+ this->set_tick_rate(tick_rate_in.get());
+ }
+ });
+}
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
new file mode 100644
index 000000000..82feeaf1f
--- /dev/null
+++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
@@ -0,0 +1,124 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/defaults.hpp>
+#include <uhdlib/rfnoc/node_accessor.hpp>
+#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp>
+#include <atomic>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+
+const std::string STREAMER_ID = "TxStreamer";
+static std::atomic<uint64_t> streamer_inst_ctr;
+
+rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
+ const uhd::stream_args_t stream_args)
+ : tx_streamer_impl<chdr_tx_data_xport>(num_chans, stream_args)
+ , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
+ , _stream_args(stream_args)
+{
+ // No block to which to forward properties or actions
+ set_prop_forwarding_policy(forwarding_policy_t::DROP);
+ set_action_forwarding_policy(forwarding_policy_t::DROP);
+
+ // Initialize properties
+ _scaling_out.reserve(num_chans);
+ _samp_rate_out.reserve(num_chans);
+ _tick_rate_out.reserve(num_chans);
+ _type_out.reserve(num_chans);
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _register_props(i, stream_args.otw_format);
+ }
+
+ node_accessor_t node_accessor;
+ node_accessor.init_props(this);
+}
+
+std::string rfnoc_tx_streamer::get_unique_id() const
+{
+ return _unique_id;
+}
+
+size_t rfnoc_tx_streamer::get_num_input_ports() const
+{
+ return 0;
+}
+
+size_t rfnoc_tx_streamer::get_num_output_ports() const
+{
+ return get_num_channels();
+}
+
+const uhd::stream_args_t& rfnoc_tx_streamer::get_stream_args() const
+{
+ return _stream_args;
+}
+
+bool rfnoc_tx_streamer::check_topology(
+ const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs)
+{
+ // Check that all channels are connected
+ if (connected_outputs.size() != get_num_output_ports()) {
+ return false;
+ }
+
+ // Call base class to check that connections are valid
+ return node_t::check_topology(connected_inputs, connected_outputs);
+}
+
+void rfnoc_tx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
+{
+ // Create actual properties and store them
+ _scaling_out.push_back(property_t<double>(
+ PROP_KEY_SCALING, {res_source_info::OUTPUT_EDGE, chan}));
+ _samp_rate_out.push_back(property_t<double>(
+ PROP_KEY_SAMP_RATE, {res_source_info::OUTPUT_EDGE, chan}));
+ _tick_rate_out.push_back(property_t<double>(
+ PROP_KEY_TICK_RATE, {res_source_info::OUTPUT_EDGE, chan}));
+ _type_out.emplace_back(property_t<std::string>(
+ PROP_KEY_TYPE, otw_format, {res_source_info::OUTPUT_EDGE, chan}));
+
+ // Give us some shorthands for the rest of this function
+ property_t<double>* scaling_out = &_scaling_out.back();
+ property_t<double>* samp_rate_out = &_samp_rate_out.back();
+ property_t<double>* tick_rate_out = &_tick_rate_out.back();
+ property_t<std::string>* type_out = &_type_out.back();
+
+ // Register them
+ register_property(scaling_out);
+ register_property(samp_rate_out);
+ register_property(tick_rate_out);
+ register_property(type_out);
+
+ // Add resolvers
+ add_property_resolver({scaling_out}, {},
+ [&scaling_out = *scaling_out, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan);
+ if (scaling_out.is_valid()) {
+ this->set_scale_factor(chan, 32767.0 / scaling_out.get());
+ }
+ });
+
+ add_property_resolver({samp_rate_out}, {},
+ [&samp_rate_out = *samp_rate_out, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan);
+ if (samp_rate_out.is_valid()) {
+ this->set_samp_rate(samp_rate_out.get());
+ }
+ });
+
+ add_property_resolver({tick_rate_out}, {},
+ [&tick_rate_out = *tick_rate_out, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan);
+ if (tick_rate_out.is_valid()) {
+ this->set_tick_rate(tick_rate_out.get());
+ }
+ });
+}