aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-05-23 20:38:07 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:29 -0800
commit75a090543b8fb8e7c875387eee6d3fe7227e4450 (patch)
tree2904b48607cc07158aa6b068ada35ab56c4da516 /host/lib/include
parentd8e9705bc6c34b8d015b56a76955ee2f15426bd8 (diff)
downloaduhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.gz
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.bz2
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.zip
rfnoc: add rx and tx transports, and amend rfnoc_graph
transports: Transports build on I/O service and implements flow control and sequence number checking. The rx streamer subclass extends the streamer implementation to connect it to the rfnoc graph. It receives configuration values from property propagation and configures the streamer accordingly. It also implements the issue_stream_cmd rx_streamer API method. Add implementation of rx streamer creation and method to connect it to an rfnoc block. rfnoc_graph: Cache more connection info, clarify contract Summary of changes: - rfnoc_graph stores more information about static connections at the beginning. Some search algorithms are replaced by simpler lookups. - The contract for connect() was clarified. It is required to call connect, even for static connections.
Diffstat (limited to 'host/lib/include')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_packet.hpp9
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp481
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp550
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_types.hpp2
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp32
-rw-r--r--host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp13
-rw-r--r--host/lib/include/uhdlib/rfnoc/mb_iface.hpp44
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp21
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp95
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp90
-rw-r--r--host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp130
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp99
-rw-r--r--host/lib/include/uhdlib/transport/get_aligned_buffs.hpp175
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp341
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp207
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp307
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp147
17 files changed, 2707 insertions, 36 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
index 770c6cf6f..cc729de6c 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
@@ -114,6 +114,15 @@ public:
*/
virtual void* get_payload_ptr() = 0;
+ /*! Return the payload offset in bytes for a given type and num_mdata
+ *
+ * \param pkt_type The packet type for calculation
+ * \param num_mdata The number of metadata words for calculation
+ * \return The offset of the payload in a packet with the given params
+ */
+ virtual size_t calculate_payload_offset(const packet_type_t pkt_type,
+ const uint8_t num_mdata = 0) const = 0;
+
//! Shortcut to return the const metadata pointer cast as a specific type
template <typename data_t>
inline const data_t* get_mdata_const_ptr_as() const
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
new file mode 100644
index 000000000..69dceebe1
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -0,0 +1,481 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+
+#include <uhd/config.hpp>
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send rx flow control responses
+ */
+class rx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.first)
+ {
+ _fc_packet = pkt_factory.make_strs();
+ _fc_strs_pyld.src_epid = sep_ids.second;
+ }
+
+ /*! Configure buffer capacity
+ * \param recv_capacity The buffer capacity of the receive link
+ */
+ void set_capacity(const stream_buff_params_t& recv_capacity)
+ {
+ _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
+ _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
+ }
+
+ /*! Send a flow control response packet
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strs_payload fc_payload(_fc_strs_pyld);
+ fc_payload.xfer_count_bytes = counts.bytes;
+ fc_payload.xfer_count_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strs_packet::uptr _fc_packet;
+
+ // Pre-configured strs payload to hold values that don't change
+ chdr::strs_payload _fc_strs_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for RX chdr data
+ *
+ * This transport provides the streamer an interface to read RX data packets.
+ * The transport implements flow control and sequence number checking.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For an rx transport, the device sends data packets, and the host sends strs
+ * packets letting the device know that buffer space in the host has been freed.
+ * For lossy links, the device also sends strc packets to resynchronize the
+ * transfer counts between host and device, to correct for any dropped packets
+ * in the link.
+ */
+class chdr_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_rx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_recv_frames Num frames to reserve from the recv link
+ * \param recv_capacity Total capacity of the recv link
+ * \param fc_freq Frequency of flow control status messages
+ * \param fc_headroom Headroom for flow control status messages
+ * \param lossy_xport Whether the xport is lossy, for flow control configuration
+ */
+ chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_recv_frames,
+ const stream_buff_params_t& recv_capacity,
+ const stream_buff_params_t& fc_freq,
+ const stream_buff_params_t& fc_headroom,
+ const bool lossy_xport)
+ : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second)
+ {
+ const sep_addr_t remote_sep_addr = addrs.first;
+ const sep_addr_t local_sep_addr = addrs.second;
+ const sep_id_t remote_epid = epids.first;
+ const sep_id_t local_epid = epids.second;
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Creating rx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _recv_packet = pkt_factory.make_generic();
+ _fc_sender.set_capacity(recv_capacity);
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
+
+ // Make data transport
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ this->_fc_callback(std::move(buff), recv_link, send_link);
+ };
+
+ // Needs just a single send frame for responses
+ _recv_io = io_srv->make_recv_client(recv_link,
+ num_recv_frames,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ // Create a control transport with the rx data links to send mgmt packets
+ // needed to setup the stream
+ // Piggyback on frames from the recv_io_if
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 0, // num_send_frames
+ 0); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ // Note that this may be gratuitous--The endpoint may already have been set up
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ // Initialize flow control - management portal sends a stream command
+ // containing its requested flow control frequency, the rx transport
+ // responds with a stream status containing its buffer capacity.
+ data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport,
+ remote_epid,
+ lossy_xport,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ fc_freq,
+ fc_headroom);
+
+ data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid);
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Stream endpoint was configured with:"
+ << std::endl
+ << "capacity bytes=" << recv_capacity.bytes
+ << ", packets=" << recv_capacity.packets << std::endl
+ << "fc headroom bytes=" << fc_headroom.bytes
+ << ", packets=" << fc_headroom.packets << std::endl
+ << "fc frequency bytes=" << fc_freq.bytes
+ << ", packets=" << fc_freq.packets);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*! Returns maximum number payload bytes
+ *
+ * \return maximum payload bytes per packet
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets an RX frame buffer containing a recv packet
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return returns a tuple containing:
+ * - a frame_buff, or null if timeout occurs
+ * - info struct corresponding to the packet
+ * - whether the packet was out of sequence
+ */
+ std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms);
+
+ if (!buff) {
+ return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false);
+ }
+
+ auto info = _read_data_packet_info(buff);
+ bool seq_error = _is_out_of_sequence(std::get<1>(info));
+
+ return std::make_tuple(std::move(buff), std::get<0>(info), seq_error);
+ }
+
+ /*!
+ * Releases an RX frame buffer
+ *
+ * \param buff the frame buffer to release
+ */
+ void release_recv_buff(typename buff_t::uptr buff)
+ {
+ _recv_io->release_recv_buff(std::move(buff));
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+ const auto packet_size = buff->packet_size();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRC) {
+ chdr::strc_payload strc;
+ strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ const stream_buff_params_t strc_counts = {
+ strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)};
+
+ if (strc.op_code == chdr::STRC_RESYNC) {
+ // Resynchronize before updating fc_state, the strc payload
+ // contains counts before the strc packet itself
+ _fc_state.resynchronize(strc_counts);
+
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ } else if (strc.op_code == chdr::STRC_INIT) {
+ _fc_state.initialize(
+ {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)});
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Received strc init with fc freq"
+ << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
+
+ // Make sure flow control was initialized
+ assert(_fc_state.get_fc_freq().bytes > 0);
+ assert(_fc_state.get_fc_freq().packets > 0);
+
+ // Send a strs response to configure flow control on the sender
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+
+ // Reset counts, since mgmt_portal will do it to FPGA
+ _fc_state.reset_counts();
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ } else {
+ throw uhd::value_error("Unexpected opcode value in STRC packet.");
+ }
+
+ // For stream commands, we return true (packet was destined to this
+ // client) but release the buffer. The I/O service won't queue this
+ // packet in the recv_io_if.
+ return true;
+
+ } else if (type == chdr::PKT_TYPE_DATA_NO_TS
+ || type == chdr::PKT_TYPE_DATA_WITH_TS) {
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ // If this is a data packet, just claim it by returning true. The
+ // I/O service will queue this packet in the recv_io_if.
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ /*!
+ * Flow control callback for I/O service
+ *
+ * The I/O service invokes this callback when a packet needs to be released
+ * to the recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link to which to release the buffer
+ * \param send_link the send link for flow control messages
+ */
+ void _fc_callback(buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+ recv_link->release_recv_buff(std::move(buff));
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ }
+
+ /*!
+ * Sends a flow control response packet if necessary.
+ *
+ * \param send_link the send link for flow control messages
+ */
+ void _send_fc_response(transport::send_link_if* send_link)
+ {
+ if (_fc_state.fc_resp_due()) {
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+ _fc_state.fc_resp_sent();
+ }
+ }
+
+ /*!
+ * Checks if the sequence number is out of sequence, prints 'D' if it is
+ * and returns result of check.
+ *
+ * \return true if a sequence error occurred
+ */
+ UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num)
+ {
+ const uint16_t expected_packet_count = _data_seq_num;
+ _data_seq_num = seq_num + 1;
+
+ if (expected_packet_count != seq_num) {
+ UHD_LOG_FASTPATH("D");
+ return true;
+ }
+ return false;
+ }
+
+ /*!
+ * Reads packet header and returns information in a struct.
+ *
+ * \return a tuple containing the packet info and packet sequence number
+ */
+ std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff)
+ {
+ const void* data = buff->data();
+ _recv_packet->refresh(data);
+ const auto header = _recv_packet->get_chdr_header();
+ const auto optional_time = _recv_packet->get_timestamp();
+
+ packet_info_t info;
+ info.eob = header.get_eob();
+ info.has_tsf = optional_time.is_initialized();
+ info.tsf = optional_time ? *optional_time : 0;
+ info.payload_bytes = _recv_packet->get_payload_size();
+ info.payload = _recv_packet->get_payload_const_ptr();
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_io->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad CHDR header or invalid packet length.");
+ }
+
+ return std::make_tuple(info, header.get_seq_num());
+ }
+
+ // Interface to the I/O service
+ transport::recv_io_if::sptr _recv_io;
+
+ // Flow control state
+ rx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Packet for received data
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strs flow control response packets
+ detail::rx_flow_ctrl_sender _fc_sender;
+
+ // Local / Sink EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
new file mode 100644
index 000000000..62b811bf5
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -0,0 +1,550 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send tx flow control messages
+ */
+class tx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ tx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.second)
+ {
+ _fc_packet = pkt_factory.make_strc();
+ _fc_strc_pyld.src_epid = sep_ids.first;
+ _fc_strc_pyld.op_code = chdr::STRC_RESYNC;
+ }
+
+ /*!
+ * Sends a flow control resync packet
+ *
+ * Sends a strc packet with the resync opcode to make the device transfer
+ * counts match those of the host, to correct for dropped packets.
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ size_t send_strc_resync(
+ transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strc_payload fc_payload(_fc_strc_pyld);
+ fc_payload.num_bytes = counts.bytes;
+ fc_payload.num_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ return size;
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strc_packet::uptr _fc_packet;
+
+ // Pre-configured strc payload to hold values that don't change
+ chdr::strc_payload _fc_strc_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for TX chdr data
+ *
+ * This transport provides the streamer an interface to send TX data packets.
+ * The transport implements flow control and keeps track of sequence numbers.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For a tx transport, the host sends data packets, and the device sends strs
+ * packets letting the host know that buffer space in the device stream endpoint
+ * has been freed. For lossy links, the host also sends strc packets to
+ * resynchronize the transfer counts between host and device, to correct for
+ * any dropped packets in the link.
+ */
+class chdr_tx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Information about data packet
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_send_frames Num frames to reserve from the send link
+ * \param fc_freq_ratio Ratio to use to configure the device fc frequency
+ * \param fc_headroom_ratio Ratio to use to configure the device fc headroom
+ */
+ chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_send_frames,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ : _fc_sender(pkt_factory, epids), _epid(epids.first)
+ {
+ const sep_addr_t remote_sep_addr = addrs.second;
+ const sep_addr_t local_sep_addr = addrs.first;
+ const sep_id_t remote_epid = epids.second;
+ const sep_id_t local_epid = epids.first;
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Creating tx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _send_header.set_dst_epid(epids.second);
+ _send_packet = pkt_factory.make_generic();
+ _recv_packet = pkt_factory.make_generic();
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = send_link->get_send_frame_size() - pyld_offset;
+
+ _configure_sep(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ local_sep_addr,
+ local_epid,
+ remote_sep_addr,
+ remote_epid,
+ pyld_buff_fmt,
+ mdata_buff_fmt);
+
+ _initialize_flow_ctrl(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ epids,
+ fc_freq_ratio,
+ fc_headroom_ratio);
+
+ // Now create the send I/O we will use for data
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ this->_send_callback(buff, send_link);
+ };
+
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ // Needs just a single recv frame for strs packets
+ _send_io = io_srv->make_send_client(send_link,
+ num_send_frames,
+ send_cb,
+ recv_link,
+ /* num_recv_frames */ 1,
+ recv_cb);
+ }
+
+ /*! Returns maximum number of payload bytes
+ *
+ * \return maximum number of payload bytes
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets a TX frame buffer
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return the frame buffer, or nullptr if timeout occurs
+ */
+ buff_t::uptr get_send_buff(const int32_t timeout_ms)
+ {
+ return _send_io->get_send_buff(timeout_ms);
+ }
+
+ /*!
+ * Sends a TX data packet
+ *
+ * \param buff the frame buffer containing the packet to send
+ */
+ void release_send_buff(buff_t::uptr buff)
+ {
+ _send_io->release_send_buff(std::move(buff));
+ }
+
+ /*!
+ * Writes header into frame buffer and returns payload pointer
+ *
+ * \param buff Frame buffer to write header into
+ * \param info Information to include in the header
+ * \return A pointer to the payload data area and the packet size in bytes
+ */
+ std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff,
+ const packet_info_t& info)
+ {
+ uint64_t tsf = 0;
+
+ if (info.has_tsf) {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS);
+ tsf = info.tsf;
+ } else {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS);
+ }
+
+ _send_header.set_eob(info.eob);
+ _send_header.set_seq_num(_data_seq_num++);
+
+ _send_packet->refresh(buff->data(), _send_header, tsf);
+ _send_packet->update_payload_size(info.payload_bytes);
+
+ return std::make_pair(
+ _send_packet->get_payload_ptr(),
+ _send_packet->get_chdr_header().get_length());
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRS) {
+ chdr::strs_payload strs;
+ strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ _fc_state.update_dest_recv_count({strs.xfer_count_bytes,
+ static_cast<uint32_t>(strs.xfer_count_pkts)});
+
+ // TODO: check strs status here and push into async msg queue
+
+ // Packet belongs to this transport, release buff and return true
+ recv_link->release_recv_buff(std::move(buff));
+ buff = nullptr;
+ return true;
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ }
+
+ /*!
+ * Send callback for I/O service
+ *
+ * The I/O service invokes this callback when it is requested to release
+ * a send buffer to the send link.
+ *
+ * \param buff the frame buffer to release
+ * \param send_link the send link for flow control messages
+ */
+ void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+
+ if (_fc_state.dest_has_space(packet_size)) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+
+ _fc_state.data_sent(packet_size);
+
+ if (_fc_state.get_fc_resync_req_pending()
+ && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) {
+ const auto& xfer_counts = _fc_state.get_xfer_counts();
+ const size_t strc_size =
+ _fc_sender.send_strc_resync(send_link, xfer_counts);
+ _fc_state.clear_fc_resync_req_pending();
+ _fc_state.data_sent(strc_size);
+ }
+ }
+ }
+
+ /*!
+ * Configures the stream endpoint using mgmt_portal
+ */
+ void _configure_sep(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_t& local_sep_addr,
+ const uhd::rfnoc::sep_id_t& local_epid,
+ const uhd::rfnoc::sep_addr_t& remote_sep_addr,
+ const uhd::rfnoc::sep_id_t& remote_epid,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt)
+ {
+ // Create a control transport with the tx data links to send mgmt packets
+ // needed to setup the stream. Only need one frame for this.
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 1, // num_send_frames
+ 1); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ data_mgmt_portal->config_local_tx_stream(
+ *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*!
+ * Initializes flow control
+ *
+ * To initialize flow control, we need to send an init strc packet, then
+ * receive a strs containing the stream endpoint ingress buffer size. We
+ * then repeat this (now that we know the buffer size) to configure the flow
+ * control frequency. To avoid having this logic within the data packet
+ * processing flow, we use temporary send and recv I/O instances with
+ * simple callbacks here.
+ */
+ void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const sep_id_pair_t sep_ids,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ {
+ // No flow control at initialization, just release all send buffs
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+ };
+
+ // For recv, just queue strs packets for recv_io to read
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* /*recv_link*/,
+ transport::send_link_if* /*send_link*/) {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS);
+ };
+
+ // No flow control at initialization, just release all recv buffs
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/) {
+ recv_link->release_recv_buff(std::move(buff));
+ };
+
+ auto send_io = io_srv->make_send_client(send_link,
+ 1, // num_send_frames
+ send_cb,
+ nullptr,
+ 0, // num_recv_frames
+ nullptr);
+
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ 1, // num_recv_frames
+ recv_cb,
+ nullptr,
+ 0, // num_send_frames
+ fc_cb);
+
+ chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
+ chdr::chdr_packet::uptr& recv_packet = _recv_packet;
+
+ // Function to send a strc init
+ auto send_strc_init = [&send_io, sep_ids, &strc_packet](
+ const stream_buff_params_t fc_freq = {0, 0}) {
+ transport::frame_buff::uptr buff = send_io->get_send_buff(0);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out getting a send buffer for strc init");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(0);
+ header.set_dst_epid(sep_ids.second);
+
+ chdr::strc_payload strc_pyld;
+ strc_pyld.src_epid = sep_ids.first;
+ strc_pyld.op_code = chdr::STRC_INIT;
+ strc_pyld.num_bytes = fc_freq.bytes;
+ strc_pyld.num_pkts = fc_freq.packets;
+ strc_packet->refresh(buff->data(), header, strc_pyld);
+
+ const size_t size = header.get_length();
+ buff->set_packet_size(size);
+ send_io->release_send_buff(std::move(buff));
+ };
+
+ // Function to receive a strs, returns buffer capacity
+ auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
+ transport::frame_buff::uptr buff = recv_io->get_recv_buff(200);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out wating for a strs packet during initialization");
+ }
+
+ recv_packet->refresh(buff->data());
+ UHD_ASSERT_THROW(
+ recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
+ chdr::strs_payload strs;
+ strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ recv_packet->get_payload_size() / sizeof(uint64_t),
+ recv_packet->conv_to_host<uint64_t>());
+
+ recv_io->release_recv_buff(std::move(buff));
+
+ return {strs.capacity_bytes,
+ static_cast<uint32_t>(strs.capacity_pkts)};
+ };
+
+ // Send a strc init to get the buffer size
+ send_strc_init();
+ stream_buff_params_t capacity = recv_strs();
+ _fc_state.set_dest_capacity(capacity);
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Received strs initializing buffer capacity to "
+ << capacity.bytes << " bytes");
+
+ // Calculate the requested fc_freq parameters
+ uhd::rfnoc::stream_buff_params_t fc_freq = {
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
+ static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_freq_ratio))};
+
+ const size_t headroom_bytes =
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
+ const size_t headroom_packets = static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_headroom_ratio));
+
+ fc_freq.bytes -= headroom_bytes;
+ fc_freq.packets -= headroom_packets;
+
+ // Send a strc init to configure fc freq
+ send_strc_init(fc_freq);
+ recv_strs();
+
+ // Release temporary I/O service interfaces to disconnect from it
+ send_io.reset();
+ recv_io.reset();
+ }
+
+ // Interface to the I/O service
+ transport::send_io_if::sptr _send_io;
+
+ // Flow control state
+ tx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Header to write into send packets
+ chdr::chdr_header _send_header;
+
+ // Packet for send data
+ chdr::chdr_packet::uptr _send_packet;
+
+ // Packet to receive strs messages
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strc flow control ack packets
+ detail::tx_flow_ctrl_sender _fc_sender;
+
+ // Local / Source EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
index 62b24ab61..1f14ea7d0 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
@@ -482,6 +482,8 @@ public: // Members
uint64_t num_pkts = 0;
//! Number of bytes to use for operation (64 bits)
uint64_t num_bytes = 0;
+ //! Size of a strc packet (including header)
+ static constexpr size_t PACKET_SIZE = 24;
public: // Functions
strc_payload() = default;
diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
index 120b0e0f8..28fa8ec7c 100644
--- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
@@ -7,11 +7,14 @@
#ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
#define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
+#include <uhd/stream.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/client_zero.hpp>
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -84,6 +87,7 @@ public:
virtual detail::client_zero::sptr get_client_zero(
sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0;
+
/*! Configure a flow controlled data stream from the endpoint with ID src_epid to the
* endpoint with ID dst_epid
*
@@ -102,7 +106,33 @@ public:
const double fc_headroom_ratio,
const bool reset = false) = 0;
- // TODO: Implement functions to get graph-wide streamers
+ /*! \brief Create a data stream going from the device to the host
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
+
+ /*! \brief Create a data stream going from the host to the device
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
/*!
* \brief Create a graph_stream_manager and return a unique_ptr to it
diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
index 79121a498..72f1cf1c7 100644
--- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
@@ -11,6 +11,7 @@
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -112,41 +113,29 @@ public:
/*! \brief Create a data stream going from the host to the device
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
/*! \brief Create a data stream going from the device to the host
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
const sep_addr_t src_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
static uptr make(const chdr::chdr_packet_factory& pkt_factory,
diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
index 0a2790a34..33a0e3df0 100644
--- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
@@ -8,21 +8,14 @@
#define INCLUDED_LIBUHD_MB_IFACE_HPP
#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
-#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <uhdlib/rfnoc/clock_iface.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
-// FIXME: Update this
-class chdr_rx_data_xport
-{
-public:
- using uptr = std::unique_ptr<chdr_rx_data_xport>;
-};
-
-using chdr_tx_data_xport = chdr_rx_data_xport;
-
/*! Motherboard (backchannel) interface
*
* In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to
@@ -70,7 +63,8 @@ public:
/*! Return a reference to a clock iface
*/
- virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0;
+ virtual std::shared_ptr<clock_iface> get_clock_iface(
+ const std::string& clock_name) = 0;
/*! Create a control transport
*
@@ -89,30 +83,34 @@ public:
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (sink) streaming endpoint ID
- * \param remote_epid Remote device (source) streaming endpoint ID
+ * \param addrs Address of the device and host stream endpoints
+ * \param epids Endpoint IDs of the device and host stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR RX data transport
*/
- virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
/*! Create an TX data transport
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (source) streaming endpoint ID
- * \param remote_epid Remote device (sink) streaming endpoint ID
+ * \param addrs Address of the host and device stream endpoints
+ * \param epids Endpoint IDs of the host and device stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR TX data transport
*/
- virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
};
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
index 7ec1b7bb2..bc56fd311 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
@@ -41,6 +41,8 @@ using device_id_t = uint16_t;
using sep_inst_t = uint16_t;
//! Stream Endpoint Physical Address Type
using sep_addr_t = std::pair<device_id_t, sep_inst_t>;
+//! Stream Endpoint Physical Address Type (first = source, second = destination)
+using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>;
//! Stream Endpoint ID Type
using sep_id_t = uint16_t;
//! Stream Endpoint pair Type (first = source, second = destination)
@@ -65,6 +67,19 @@ struct stream_buff_params_t
//! The data type of the buffer used to capture/generate data
enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
+//! Conversion from number of bits to sw_buff
+constexpr sw_buff_t bits_to_sw_buff(size_t bits)
+{
+ if (bits <= 8) {
+ return BUFF_U8;
+ } else if (bits <= 16) {
+ return BUFF_U16;
+ } else if (bits <= 32) {
+ return BUFF_U32;
+ } else {
+ return BUFF_U64;
+ }
+}
//----------------------------------------------
// Constants
@@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
constexpr uint16_t RFNOC_PROTO_VER = 0x0100;
+constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_CAPACITY_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_FREQ_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1;
+constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1;
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
new file mode 100644
index 000000000..6ced60d19
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -0,0 +1,95 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_rx_streamer : public node_t,
+ public transport::rx_streamer_impl<chdr_rx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "RxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Implementation of rx_streamer API method
+ *
+ * \param stream_cmd the stream command to issue
+ */
+ void issue_stream_cmd(const stream_cmd_t& stream_cmd);
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_in;
+ std::vector<property_t<double>> _samp_rate_in;
+ std::vector<property_t<double>> _tick_rate_in;
+ std::vector<property_t<std::string>> _type_in;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
new file mode 100644
index 000000000..4acee45cc
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
@@ -0,0 +1,90 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/transport/tx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_tx_streamer : public node_t,
+ public transport::tx_streamer_impl<chdr_tx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "TxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_out;
+ std::vector<property_t<double>> _samp_rate_out;
+ std::vector<property_t<double>> _tick_rate_out;
+ std::vector<property_t<std::string>> _type_out;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..937baf982
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
@@ -0,0 +1,130 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage rx flow control state
+class rx_flow_ctrl_state
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {}
+
+ //! Initialize frequency parameters
+ void initialize(const stream_buff_params_t fc_freq)
+ {
+ _fc_freq = fc_freq;
+ }
+
+ //! Resynchronize with transfer counts from the sender
+ void resynchronize(const stream_buff_params_t counts)
+ {
+ if (_recv_counts.bytes != counts.bytes
+ || _recv_counts.packets != counts.packets) {
+ // If there is a discrepancy between the amount of data sent by
+ // the device and received by the transport, adjust the counts
+ // of data received and transferred to include the dropped data.
+ auto bytes_dropped = counts.bytes - _recv_counts.bytes;
+ auto pkts_dropped = counts.packets - _recv_counts.packets;
+ _xfer_counts.bytes += bytes_dropped;
+ _xfer_counts.packets += pkts_dropped;
+
+ UHD_LOGGER_DEBUG("rx_flow_ctrl_state")
+ << "oh noes: bytes_sent=" << counts.bytes
+ << " bytes_received=" << _recv_counts.bytes
+ << " pkts_sent=" << counts.packets
+ << " pkts_received=" << _recv_counts.packets
+ << " src_epid=" << _epids.first << " dst_epid=" << _epids.second
+ << std::endl;
+
+ _recv_counts = counts;
+ }
+ }
+
+ //! Reset the transfer counts (happens during init)
+ void reset_counts()
+ {
+ UHD_LOGGER_TRACE("rx_flow_ctrl_state")
+ << "Resetting transfer counts" << std::endl;
+ _recv_counts = {0, 0};
+ _xfer_counts = {0, 0};
+ }
+
+ //! Update state when data is received
+ void data_received(const size_t bytes)
+ {
+ _recv_counts.bytes += bytes;
+ _recv_counts.packets++;
+ }
+
+ //! Update state when transfer is complete (buffer space freed)
+ void xfer_done(const size_t bytes)
+ {
+ _xfer_counts.bytes += bytes;
+ _xfer_counts.packets++;
+ }
+
+ //! Returns whether a flow control response is needed
+ bool fc_resp_due() const
+ {
+ stream_buff_params_t accum_counts = {
+ _xfer_counts.bytes - _last_fc_resp_counts.bytes,
+ _xfer_counts.packets - _last_fc_resp_counts.packets};
+
+ return accum_counts.bytes >= _fc_freq.bytes
+ || accum_counts.packets >= _fc_freq.packets;
+ }
+
+ //! Update state after flow control response was sent
+ void fc_resp_sent()
+ {
+ _last_fc_resp_counts = _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_recv_counts() const
+ {
+ return _recv_counts;
+ }
+
+ //! Returns configured flow control frequency
+ stream_buff_params_t get_fc_freq() const
+ {
+ return _fc_freq;
+ }
+
+private:
+ // Counts for data received, including any data still in use
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Counts for data read and whose buffer space is ok to reuse
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts sent in last flow control response
+ stream_buff_params_t _last_fc_resp_counts{0, 0};
+
+ // Frequency of flow control responses
+ stream_buff_params_t _fc_freq{0, 0};
+
+ // Endpoint ID for log messages
+ const sep_id_pair_t _epids;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..65fc1b093
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
@@ -0,0 +1,99 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage tx flow control state
+class tx_flow_ctrl_state
+{
+public:
+ //! Updates destination capacity
+ void set_dest_capacity(const stream_buff_params_t& capacity)
+ {
+ _dest_capacity = capacity;
+ }
+
+ //! Updates destination received count
+ void update_dest_recv_count(const stream_buff_params_t& recv_count)
+ {
+ _recv_counts = recv_count;
+ }
+
+ /*! Returns whether the destination has buffer space for the requested
+ * packet size
+ */
+ bool dest_has_space(const size_t packet_size)
+ {
+ // The stream endpoint only cares about bytes, the packet count is not
+ // important to determine the space available.
+ const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes;
+ const auto space_available = _dest_capacity.bytes - buffer_fullness;
+ return space_available >= packet_size;
+ }
+
+ //! Increments transfer count with amount of data sent
+ void data_sent(const size_t packet_size)
+ {
+ _xfer_counts.bytes += packet_size;
+ _xfer_counts.packets++;
+
+ // Request an fc resync after we have transferred a number of bytes >=
+ // to the destination capacity. There is no strict requirement on how
+ // often we need to send this, as it is only needed to correct for
+ // dropped packets. One buffer's worth of bytes is probably a good
+ // cadence.
+ if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) {
+ _fc_resync_req = true;
+ }
+ }
+
+ /*! Returns whether an fc resync request is pending. The policy we use
+ * here is to send an fc resync every time we send a number of bytes
+ * equal to the destination buffer capacity.
+ */
+ bool get_fc_resync_req_pending() const
+ {
+ return _fc_resync_req;
+ }
+
+ //! Clears fc resync request pending status
+ void clear_fc_resync_req_pending()
+ {
+ _fc_resync_req = false;
+ _last_fc_resync_bytes = _xfer_counts.bytes;
+ }
+
+ //! Returns counts for packets sent
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+private:
+ // Counts for data sent
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts for data received by the destination
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Buffer size at the destination
+ stream_buff_params_t _dest_capacity{0, 0};
+
+ // Counts sent in last flow control resync
+ size_t _last_fc_resync_bytes = 0;
+
+ // Track when to send ack packets
+ bool _fc_resync_req = false;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
new file mode 100644
index 000000000..662be6d2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
@@ -0,0 +1,175 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <boost/format.hpp>
+
+namespace uhd { namespace transport {
+
+// Number of iterations that get_aligned_buffs will attempt to time align
+// packets before returning an alignment failure. get_aligned_buffs increments
+// the iteration count when it finds a timestamp that is larger than the
+// timestamps on channels it has already aligned and thus has to restart
+// aligning timestamps on all channels to the new timestamp.
+constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000;
+
+/*!
+ * Implementation of rx time alignment. This method reads packets from the
+ * transports for each channel and discards any packets whose tsf does not
+ * match those of other channels due to dropped packets. Packets that do not
+ * have a tsf are not checked for alignment and never dropped.
+ */
+template <typename transport_t>
+class get_aligned_buffs
+{
+public:
+ enum alignment_result_t {
+ SUCCESS,
+ TIMEOUT,
+ SEQUENCE_ERROR,
+ ALIGNMENT_FAILURE,
+ BAD_PACKET
+ };
+
+ get_aligned_buffs(std::vector<typename transport_t::uptr>& xports,
+ std::vector<typename transport_t::buff_t::uptr>& frame_buffs,
+ std::vector<typename transport_t::packet_info_t>& infos)
+ : _xports(xports)
+ , _frame_buffs(frame_buffs)
+ , _infos(infos)
+ , _prev_tsf(_xports.size(), 0)
+ , _channels_to_align(_xports.size())
+ {
+ }
+
+ alignment_result_t operator()(const int32_t timeout_ms)
+ {
+ // Clear state
+ _channels_to_align.set();
+ bool time_valid = false;
+ uint64_t tsf = 0;
+ size_t iterations = 0;
+
+ while (_channels_to_align.any()) {
+ const size_t chan = _channels_to_align.find_first();
+ auto& xport = _xports[chan];
+ auto& info = _infos[chan];
+ auto& frame_buff = _frame_buffs[chan];
+ bool seq_error = false;
+
+ // Receive a data packet for the channel if we don't have one. A
+ // packet may already be there if the previous call was interrupted
+ // by an error.
+ if (!frame_buff) {
+ try {
+ std::tie(frame_buff, info, seq_error) =
+ xport->get_recv_buff(timeout_ms);
+ } catch (const uhd::value_error& e) {
+ // Bad packet
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive transport caught a value exception.\n%s")
+ % e.what();
+ return BAD_PACKET;
+ }
+ }
+
+ if (!frame_buff) {
+ return TIMEOUT;
+ }
+
+ if (info.has_tsf) {
+ const bool time_out_of_order = _prev_tsf[chan] > info.tsf;
+ _prev_tsf[chan] = info.tsf;
+
+ // If the user changes the device time while streaming, we can
+ // receive a packet that comes before the previous packet in
+ // time. This would cause the alignment logic to discard future
+ // received packets. Therefore, when this occurs, we reset the
+ // info to restart the alignment.
+ if (time_out_of_order) {
+ time_valid = false;
+ }
+
+ // Check if the time is larger than packets received for other
+ // channels, and if so, use this time to align all channels
+ if (!time_valid || info.tsf > tsf) {
+ // If we haven't found a set of aligned packets after many
+ // iterations, return an alignment failure
+ if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << "The rx streamer failed to time-align packets.";
+ return ALIGNMENT_FAILURE;
+ }
+
+ // Release buffers for channels aligned previously. Keep
+ // buffers that don't have a tsf since we don't need to
+ // align those.
+ for (size_t i = 0; i < _xports.size(); i++) {
+ if (!_channels_to_align.test(i) && _infos[i].has_tsf) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ _frame_buffs[i] = nullptr;
+ }
+ }
+
+ // Mark only this channel as aligned and save its tsf
+ _channels_to_align.set();
+ _channels_to_align.reset(chan);
+ time_valid = true;
+ tsf = info.tsf;
+ }
+
+ // Check if the time matches that of other aligned channels
+ else if (info.tsf == tsf) {
+ _channels_to_align.reset(chan);
+ }
+
+ // Otherwise, time is smaller than other channels, release the buffer
+ else {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+ } else {
+ // Packet doesn't have a tsf, just mark it as aligned
+ _channels_to_align.reset(chan);
+ }
+
+ // If this packet had a sequence error, stop to return the error.
+ // Keep the packet for the next call to get_aligned_buffs.
+ if (seq_error) {
+ return SEQUENCE_ERROR;
+ }
+ }
+
+ // All channels aligned
+ return SUCCESS;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr>& _xports;
+
+ // Storage for buffers resulting from alignment
+ std::vector<typename transport_t::buff_t::uptr>& _frame_buffs;
+
+ // Packet info corresponding to aligned buffers
+ std::vector<typename transport_t::packet_info_t>& _infos;
+
+ // Time of previous packet for each channel
+ std::vector<uint64_t> _prev_tsf;
+
+ // Keeps track of channels that are aligned
+ boost::dynamic_bitset<> _channels_to_align;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
new file mode 100644
index 000000000..d66e867bc
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -0,0 +1,341 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/endianness.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/rx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for error handling
+ *
+ * If a recv call reads data from multiple packets, and an error occurs in the
+ * second or later packets, recv stops short of the num samps requested and
+ * returns no error. The error is cached for the next call to recv.
+ *
+ * Timeout errors are an exception. Timeouts that occur in the second or later
+ * packets of a recv call stop the recv method but the error is not returned in
+ * the next call. The user can check for this condition since fewer samples are
+ * returned than the number requested.
+ */
+class rx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache, ignoring timeout errors
+ UHD_FORCE_INLINE void store(const rx_metadata_t& metadata)
+ {
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE bool check(rx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ metadata = _metadata_cache;
+ _cached_metadata = false;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::rx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of rx streamer API
+ */
+template <typename transport_t>
+class rx_streamer_impl : public rx_streamer
+{
+public:
+ //! Constructor
+ rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_ports)
+ , _in_buffs(num_ports)
+ {
+ if (stream_args.cpu_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a cpu_format!");
+ }
+ if (stream_args.otw_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a otw_format!");
+ }
+ _setup_converters(num_ports, stream_args);
+ _zero_copy_streamer.set_samp_rate(_samp_rate);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ //! Implementation of rx_streamer API method
+ UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const double timeout,
+ const bool one_packet)
+ {
+ if (_error_metadata_cache.check(metadata)) {
+ return 0;
+ }
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ size_t total_samps_recv =
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+
+ if (one_packet or metadata.end_of_burst) {
+ return total_samps_recv;
+ }
+
+ // First set of packets recv had an error, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ return total_samps_recv;
+ }
+
+ // Loop until buffer is filled or error code. This method returns the
+ // metadata from the first packet received, with the exception of
+ // end-of-burst.
+ uhd::rx_metadata_t loop_metadata;
+
+ while (total_samps_recv < nsamps_per_buff) {
+ size_t num_samps = _recv_one_packet(buffs,
+ nsamps_per_buff - total_samps_recv,
+ loop_metadata,
+ timeout_ms,
+ total_samps_recv * _convert_info.bytes_per_cpu_item);
+
+ // If metadata had an error code set, store for next call and return
+ if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ _error_metadata_cache.store(loop_metadata);
+ break;
+ }
+
+ total_samps_recv += num_samps;
+
+ // Return immediately if end of burst
+ if (loop_metadata.end_of_burst) {
+ metadata.end_of_burst = true;
+ break;
+ }
+ }
+
+ return total_samps_recv;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ _zero_copy_streamer.set_samp_rate(rate);
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Receive a single packet
+ UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const int32_t timeout_ms,
+ const size_t buffer_offset_bytes = 0)
+ {
+ if (_buff_samps_remaining == 0) {
+ // Current set of buffers has expired, get the next one
+ _buff_samps_remaining =
+ _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _fragment_offset_in_samps = 0;
+ } else {
+ // There are samples still left in the current set of buffers
+ metadata = _last_fragment_metadata;
+ metadata.time_spec += time_spec_t::from_ticks(
+ _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate);
+ }
+
+ if (_buff_samps_remaining != 0) {
+ const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining);
+
+ // Convert samples to the streamer's output format
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ char* b = reinterpret_cast<char*>(buffs[i]);
+ const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes);
+ _convert_to_out_buff(out_buffs, i, num_samps);
+ }
+
+ _buff_samps_remaining -= num_samps;
+
+ // Write the fragment flags and offset
+ metadata.more_fragments = _buff_samps_remaining != 0;
+ metadata.fragment_offset = _fragment_offset_in_samps;
+
+ if (metadata.more_fragments) {
+ _fragment_offset_in_samps += num_samps;
+ _last_fragment_metadata = metadata;
+ }
+
+ return num_samps;
+ } else {
+ return 0;
+ }
+ }
+
+ //! Convert samples for one channel into its buffer
+ UHD_FORCE_INLINE void _convert_to_out_buff(
+ const uhd::rx_streamer::buffs_type& out_buffs,
+ const size_t chan,
+ const size_t num_samps)
+ {
+ const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]);
+
+ _converters[chan]->conv(buffer_ptr, out_buffs, num_samps);
+
+ // Advance the pointer for the source buffer
+ _in_buffs[chan] =
+ buffer_ptr + num_samps * _convert_info.bytes_per_otw_item;
+
+ if (_buff_samps_remaining == num_samps) {
+ _zero_copy_streamer.release_recv_buff(chan);
+ }
+ }
+
+ //! Create converters and initialize _convert_info
+ void _setup_converters(const size_t num_ports,
+ const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.otw_format + "_chdr";
+ id.num_inputs = 1;
+ id.output_format = stream_args.cpu_format;
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_ports; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(1 / 32767.0);
+ }
+ }
+
+ // Converter and item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Implementation of frame buffer management and packet info
+ rx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Container for buffer pointers used in recv method
+ std::vector<const void*> _in_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Num samps remaining in buffer currently held by zero copy streamer
+ size_t _buff_samps_remaining = 0;
+
+ // Metadata cache for error handling
+ detail::rx_metadata_cache _error_metadata_cache;
+
+ // Fragment (partially read packet) information
+ size_t _fragment_offset_in_samps = 0;
+ rx_metadata_t _last_fragment_metadata;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..36f568f2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -0,0 +1,207 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/get_aligned_buffs.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of rx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class rx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ rx_streamer_zero_copy(const size_t num_ports)
+ : _xports(num_ports)
+ , _frame_buffs(num_ports)
+ , _infos(num_ports)
+ , _get_aligned_buffs(_xports, _frame_buffs, _infos)
+ {
+ }
+
+ ~rx_streamer_zero_copy()
+ {
+ for (size_t i = 0; i < _frame_buffs.size(); i++) {
+ if (_frame_buffs[i]) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ }
+ }
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of time-aligned buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param metadata returns the metadata corresponding to the buffer
+ * \param timeout_ms timeout in milliseconds
+ * \return the size in samples of each packet, or 0 if timeout
+ */
+ size_t get_recv_buffs(std::vector<const void*>& buffs,
+ rx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ metadata.reset();
+
+ switch (_get_aligned_buffs(timeout_ms)) {
+ case get_aligned_buffs_t::SUCCESS:
+ break;
+
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return 0;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return 0;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return 0;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ metadata.has_time_spec = _last_read_time_info.has_time_spec;
+ metadata.time_spec =
+ _last_read_time_info.time_spec
+ + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ return 0;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ // Get payload pointers for each buffer and aggregate eob. We set eob to
+ // true if any channel has it set, since no more data will be received for
+ // that channel. In most cases, all channels should have the same value.
+ bool eob = false;
+ for (size_t i = 0; i < buffs.size(); i++) {
+ buffs[i] = _infos[i].payload;
+ eob |= _infos[i].eob;
+ }
+
+ // Set the metadata from the buffer information at index zero
+ const auto& info_0 = _infos[0];
+
+ metadata.has_time_spec = info_0.has_tsf;
+ metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate);
+ metadata.start_of_burst = false;
+ metadata.end_of_burst = eob;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ // Done with these packets, save timestamp info for next call
+ _last_read_time_info.has_time_spec = metadata.has_time_spec;
+ _last_read_time_info.time_spec = metadata.time_spec;
+ _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+
+ return _last_read_time_info.num_samps;
+ }
+
+ /*!
+ * Release the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ void release_recv_buff(const size_t channel)
+ {
+ _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel]));
+ _frame_buffs[channel] = typename transport_t::buff_t::uptr();
+ }
+
+private:
+ using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+
+ // Information recorded by streamer about the last data packet processed,
+ // used to create the metadata when there is a sequence error.
+ struct last_read_time_info_t
+ {
+ size_t num_samps = 0;
+ bool has_time_spec = false;
+ time_spec_t time_spec;
+ };
+
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Storage for buffers for each channel while they are in flight (between
+ // calls to get_recv_buff and release_recv_buff).
+ std::vector<typename transport_t::buff_t::uptr> _frame_buffs;
+
+ // Packet info corresponding to the packets in flight
+ std::vector<typename transport_t::packet_info_t> _infos;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _samp_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Implementation of packet time alignment
+ get_aligned_buffs_t _get_aligned_buffs;
+
+ // Information about the last data packet processed
+ last_read_time_info_t _last_read_time_info;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
new file mode 100644
index 000000000..60881dad2
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -0,0 +1,307 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhdlib/transport/tx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for send calls with zero samples
+ *
+ * Metadata is cached when we get a send requesting a start of burst with no
+ * samples. It is applied here on the next call to send() that actually has
+ * samples to send.
+ */
+class tx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache
+ UHD_FORCE_INLINE void store(const tx_metadata_t& metadata)
+ {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE void check(tx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ // Only use cached time_spec if metadata does not have one
+ if (!metadata.has_time_spec) {
+ metadata.has_time_spec = _metadata_cache.has_time_spec;
+ metadata.time_spec = _metadata_cache.time_spec;
+ }
+ metadata.start_of_burst = _metadata_cache.start_of_burst;
+ metadata.end_of_burst = _metadata_cache.end_of_burst;
+ _cached_metadata = false;
+ }
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::tx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of tx streamer API
+ */
+template <typename transport_t>
+class tx_streamer_impl : public tx_streamer
+{
+public:
+ tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_chans)
+ , _zero_buffs(num_chans, &_zero)
+ , _out_buffs(num_chans)
+ {
+ _setup_converters(num_chans, stream_args);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ }
+
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ size_t send(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t& metadata_,
+ const double timeout)
+ {
+ uhd::tx_metadata_t metadata(metadata_);
+
+ if (nsamps_per_buff == 0 && metadata.start_of_burst) {
+ _metadata_cache.store(metadata);
+ return 0;
+ }
+
+ _metadata_cache.check(metadata);
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ if (nsamps_per_buff == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ timeout_ms);
+
+ return 0;
+ } else if (nsamps_per_buff <= _spp) {
+ return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+
+ } else {
+ size_t total_num_samps_sent = 0;
+ const bool eob = metadata.end_of_burst;
+ metadata.end_of_burst = false;
+
+ const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
+ const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ const size_t num_samps_sent = _send_one_packet(
+ buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+
+ total_num_samps_sent += num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_num_samps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
+ }
+
+ // Send the final fragment
+ metadata.end_of_burst = eob;
+
+ size_t nsamps_sent =
+ total_num_samps_sent
+ + _send_one_packet(
+ buffs, total_num_samps_sent, final_length, metadata, timeout);
+
+ return nsamps_sent;
+ }
+ }
+
+ //! Implementation of rx_streamer API method
+ bool recv_async_msg(
+ uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+ {
+ // TODO: implement me
+ return false;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Convert samples for one channel and sends a packet
+ size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t buffer_offset_in_samps,
+ const size_t num_samples,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ if (!_zero_copy_streamer.get_send_buffs(
+ _out_buffs, num_samples, metadata, timeout_ms)) {
+ return 0;
+ }
+
+ size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset;
+ _converters[i]->conv(input_ptr, _out_buffs[i], num_samples);
+
+ _zero_copy_streamer.release_send_buff(i);
+ }
+
+ return num_samples;
+ }
+
+ //! Create converters and initialize _bytes_per_cpu_item
+ void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = stream_args.otw_format + "_chdr";
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(32767.0);
+ }
+ }
+
+ // Converter item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Manages frame buffers and packet info
+ tx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Buffer used to handle send calls with no data
+ std::vector<const void*> _zero_buffs;
+
+ const uint64_t _zero = 0;
+
+ // Container for buffer pointers used in send method
+ std::vector<void*> _out_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Metadata cache for send calls with no data
+ detail::tx_metadata_cache _metadata_cache;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..1b6f55238
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -0,0 +1,147 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of tx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class tx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ tx_streamer_zero_copy(const size_t num_chans)
+ : _xports(num_chans), _frame_buffs(num_chans)
+ {
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of frame buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param nsamps_per_buff the number of samples that will be written to each buffer
+ * \param metadata the metadata to write to the packet header
+ * \param timeout_ms timeout in milliseconds
+ * \return true if the operation was sucessful, false if timeout occurs
+ */
+ UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
+ const size_t nsamps_per_buff,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ // Try to get a buffer per channel
+ for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) {
+ _frame_buffs[_next_buff_to_get].first =
+ _xports[_next_buff_to_get]->get_send_buff(timeout_ms);
+
+ if (!_frame_buffs[_next_buff_to_get].first) {
+ return false;
+ }
+ }
+
+ // Got all the buffers, start from index 0 next call
+ _next_buff_to_get = 0;
+
+ // Store portions of metadata we care about
+ typename transport_t::packet_info_t info;
+ info.has_tsf = metadata.has_time_spec;
+
+ if (metadata.has_time_spec) {
+ info.tsf = metadata.time_spec.to_ticks(_tick_rate);
+ }
+
+ info.payload_bytes = nsamps_per_buff * _bytes_per_item;
+ info.eob = metadata.end_of_burst;
+
+ // Write packet header
+ for (size_t i = 0; i < buffs.size(); i++) {
+ std::tie(buffs[i], _frame_buffs[i].second) =
+ _xports[i]->write_packet_header(_frame_buffs[i].first, info);
+ }
+
+ return true;
+ }
+
+ /*!
+ * Send the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ UHD_FORCE_INLINE void release_send_buff(const size_t channel)
+ {
+ _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second);
+ _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first));
+
+ _frame_buffs[channel].first = nullptr;
+ _frame_buffs[channel].second = 0;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Container to hold frame buffers for each channel and their packet sizes
+ std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Next channel from which to get a buffer, stored as a member to
+ // allow the streamer to continue where it stopped due to timeouts.
+ size_t _next_buff_to_get = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */