aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/include')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_packet.hpp9
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp481
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp550
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_types.hpp2
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp32
-rw-r--r--host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp13
-rw-r--r--host/lib/include/uhdlib/rfnoc/mb_iface.hpp44
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp21
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp95
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp90
-rw-r--r--host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp130
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp99
-rw-r--r--host/lib/include/uhdlib/transport/get_aligned_buffs.hpp175
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp341
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp207
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp307
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp147
17 files changed, 2707 insertions, 36 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
index 770c6cf6f..cc729de6c 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
@@ -114,6 +114,15 @@ public:
*/
virtual void* get_payload_ptr() = 0;
+ /*! Return the payload offset in bytes for a given type and num_mdata
+ *
+ * \param pkt_type The packet type for calculation
+ * \param num_mdata The number of metadata words for calculation
+ * \return The offset of the payload in a packet with the given params
+ */
+ virtual size_t calculate_payload_offset(const packet_type_t pkt_type,
+ const uint8_t num_mdata = 0) const = 0;
+
//! Shortcut to return the const metadata pointer cast as a specific type
template <typename data_t>
inline const data_t* get_mdata_const_ptr_as() const
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
new file mode 100644
index 000000000..69dceebe1
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -0,0 +1,481 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+
+#include <uhd/config.hpp>
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send rx flow control responses
+ */
+class rx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.first)
+ {
+ _fc_packet = pkt_factory.make_strs();
+ _fc_strs_pyld.src_epid = sep_ids.second;
+ }
+
+ /*! Configure buffer capacity
+ * \param recv_capacity The buffer capacity of the receive link
+ */
+ void set_capacity(const stream_buff_params_t& recv_capacity)
+ {
+ _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
+ _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
+ }
+
+ /*! Send a flow control response packet
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strs_payload fc_payload(_fc_strs_pyld);
+ fc_payload.xfer_count_bytes = counts.bytes;
+ fc_payload.xfer_count_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strs_packet::uptr _fc_packet;
+
+ // Pre-configured strs payload to hold values that don't change
+ chdr::strs_payload _fc_strs_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for RX chdr data
+ *
+ * This transport provides the streamer an interface to read RX data packets.
+ * The transport implements flow control and sequence number checking.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For an rx transport, the device sends data packets, and the host sends strs
+ * packets letting the device know that buffer space in the host has been freed.
+ * For lossy links, the device also sends strc packets to resynchronize the
+ * transfer counts between host and device, to correct for any dropped packets
+ * in the link.
+ */
+class chdr_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_rx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_recv_frames Num frames to reserve from the recv link
+ * \param recv_capacity Total capacity of the recv link
+ * \param fc_freq Frequency of flow control status messages
+ * \param fc_headroom Headroom for flow control status messages
+ * \param lossy_xport Whether the xport is lossy, for flow control configuration
+ */
+ chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_recv_frames,
+ const stream_buff_params_t& recv_capacity,
+ const stream_buff_params_t& fc_freq,
+ const stream_buff_params_t& fc_headroom,
+ const bool lossy_xport)
+ : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second)
+ {
+ const sep_addr_t remote_sep_addr = addrs.first;
+ const sep_addr_t local_sep_addr = addrs.second;
+ const sep_id_t remote_epid = epids.first;
+ const sep_id_t local_epid = epids.second;
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Creating rx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _recv_packet = pkt_factory.make_generic();
+ _fc_sender.set_capacity(recv_capacity);
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
+
+ // Make data transport
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ this->_fc_callback(std::move(buff), recv_link, send_link);
+ };
+
+ // Needs just a single send frame for responses
+ _recv_io = io_srv->make_recv_client(recv_link,
+ num_recv_frames,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ // Create a control transport with the rx data links to send mgmt packets
+ // needed to setup the stream
+ // Piggyback on frames from the recv_io_if
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 0, // num_send_frames
+ 0); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ // Note that this may be gratuitous--The endpoint may already have been set up
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ // Initialize flow control - management portal sends a stream command
+ // containing its requested flow control frequency, the rx transport
+ // responds with a stream status containing its buffer capacity.
+ data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport,
+ remote_epid,
+ lossy_xport,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ fc_freq,
+ fc_headroom);
+
+ data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid);
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Stream endpoint was configured with:"
+ << std::endl
+ << "capacity bytes=" << recv_capacity.bytes
+ << ", packets=" << recv_capacity.packets << std::endl
+ << "fc headroom bytes=" << fc_headroom.bytes
+ << ", packets=" << fc_headroom.packets << std::endl
+ << "fc frequency bytes=" << fc_freq.bytes
+ << ", packets=" << fc_freq.packets);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*! Returns maximum number payload bytes
+ *
+ * \return maximum payload bytes per packet
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets an RX frame buffer containing a recv packet
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return returns a tuple containing:
+ * - a frame_buff, or null if timeout occurs
+ * - info struct corresponding to the packet
+ * - whether the packet was out of sequence
+ */
+ std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms);
+
+ if (!buff) {
+ return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false);
+ }
+
+ auto info = _read_data_packet_info(buff);
+ bool seq_error = _is_out_of_sequence(std::get<1>(info));
+
+ return std::make_tuple(std::move(buff), std::get<0>(info), seq_error);
+ }
+
+ /*!
+ * Releases an RX frame buffer
+ *
+ * \param buff the frame buffer to release
+ */
+ void release_recv_buff(typename buff_t::uptr buff)
+ {
+ _recv_io->release_recv_buff(std::move(buff));
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+ const auto packet_size = buff->packet_size();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRC) {
+ chdr::strc_payload strc;
+ strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ const stream_buff_params_t strc_counts = {
+ strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)};
+
+ if (strc.op_code == chdr::STRC_RESYNC) {
+ // Resynchronize before updating fc_state, the strc payload
+ // contains counts before the strc packet itself
+ _fc_state.resynchronize(strc_counts);
+
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ } else if (strc.op_code == chdr::STRC_INIT) {
+ _fc_state.initialize(
+ {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)});
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Received strc init with fc freq"
+ << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
+
+ // Make sure flow control was initialized
+ assert(_fc_state.get_fc_freq().bytes > 0);
+ assert(_fc_state.get_fc_freq().packets > 0);
+
+ // Send a strs response to configure flow control on the sender
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+
+ // Reset counts, since mgmt_portal will do it to FPGA
+ _fc_state.reset_counts();
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ } else {
+ throw uhd::value_error("Unexpected opcode value in STRC packet.");
+ }
+
+ // For stream commands, we return true (packet was destined to this
+ // client) but release the buffer. The I/O service won't queue this
+ // packet in the recv_io_if.
+ return true;
+
+ } else if (type == chdr::PKT_TYPE_DATA_NO_TS
+ || type == chdr::PKT_TYPE_DATA_WITH_TS) {
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ // If this is a data packet, just claim it by returning true. The
+ // I/O service will queue this packet in the recv_io_if.
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ /*!
+ * Flow control callback for I/O service
+ *
+ * The I/O service invokes this callback when a packet needs to be released
+ * to the recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link to which to release the buffer
+ * \param send_link the send link for flow control messages
+ */
+ void _fc_callback(buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+ recv_link->release_recv_buff(std::move(buff));
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ }
+
+ /*!
+ * Sends a flow control response packet if necessary.
+ *
+ * \param send_link the send link for flow control messages
+ */
+ void _send_fc_response(transport::send_link_if* send_link)
+ {
+ if (_fc_state.fc_resp_due()) {
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+ _fc_state.fc_resp_sent();
+ }
+ }
+
+ /*!
+ * Checks if the sequence number is out of sequence, prints 'D' if it is
+ * and returns result of check.
+ *
+ * \return true if a sequence error occurred
+ */
+ UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num)
+ {
+ const uint16_t expected_packet_count = _data_seq_num;
+ _data_seq_num = seq_num + 1;
+
+ if (expected_packet_count != seq_num) {
+ UHD_LOG_FASTPATH("D");
+ return true;
+ }
+ return false;
+ }
+
+ /*!
+ * Reads packet header and returns information in a struct.
+ *
+ * \return a tuple containing the packet info and packet sequence number
+ */
+ std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff)
+ {
+ const void* data = buff->data();
+ _recv_packet->refresh(data);
+ const auto header = _recv_packet->get_chdr_header();
+ const auto optional_time = _recv_packet->get_timestamp();
+
+ packet_info_t info;
+ info.eob = header.get_eob();
+ info.has_tsf = optional_time.is_initialized();
+ info.tsf = optional_time ? *optional_time : 0;
+ info.payload_bytes = _recv_packet->get_payload_size();
+ info.payload = _recv_packet->get_payload_const_ptr();
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_io->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad CHDR header or invalid packet length.");
+ }
+
+ return std::make_tuple(info, header.get_seq_num());
+ }
+
+ // Interface to the I/O service
+ transport::recv_io_if::sptr _recv_io;
+
+ // Flow control state
+ rx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Packet for received data
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strs flow control response packets
+ detail::rx_flow_ctrl_sender _fc_sender;
+
+ // Local / Sink EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
new file mode 100644
index 000000000..62b811bf5
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -0,0 +1,550 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send tx flow control messages
+ */
+class tx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ tx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.second)
+ {
+ _fc_packet = pkt_factory.make_strc();
+ _fc_strc_pyld.src_epid = sep_ids.first;
+ _fc_strc_pyld.op_code = chdr::STRC_RESYNC;
+ }
+
+ /*!
+ * Sends a flow control resync packet
+ *
+ * Sends a strc packet with the resync opcode to make the device transfer
+ * counts match those of the host, to correct for dropped packets.
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ size_t send_strc_resync(
+ transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strc_payload fc_payload(_fc_strc_pyld);
+ fc_payload.num_bytes = counts.bytes;
+ fc_payload.num_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ return size;
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strc_packet::uptr _fc_packet;
+
+ // Pre-configured strc payload to hold values that don't change
+ chdr::strc_payload _fc_strc_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for TX chdr data
+ *
+ * This transport provides the streamer an interface to send TX data packets.
+ * The transport implements flow control and keeps track of sequence numbers.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For a tx transport, the host sends data packets, and the device sends strs
+ * packets letting the host know that buffer space in the device stream endpoint
+ * has been freed. For lossy links, the host also sends strc packets to
+ * resynchronize the transfer counts between host and device, to correct for
+ * any dropped packets in the link.
+ */
+class chdr_tx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Information about data packet
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_send_frames Num frames to reserve from the send link
+ * \param fc_freq_ratio Ratio to use to configure the device fc frequency
+ * \param fc_headroom_ratio Ratio to use to configure the device fc headroom
+ */
+ chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_send_frames,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ : _fc_sender(pkt_factory, epids), _epid(epids.first)
+ {
+ const sep_addr_t remote_sep_addr = addrs.second;
+ const sep_addr_t local_sep_addr = addrs.first;
+ const sep_id_t remote_epid = epids.second;
+ const sep_id_t local_epid = epids.first;
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Creating tx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _send_header.set_dst_epid(epids.second);
+ _send_packet = pkt_factory.make_generic();
+ _recv_packet = pkt_factory.make_generic();
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = send_link->get_send_frame_size() - pyld_offset;
+
+ _configure_sep(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ local_sep_addr,
+ local_epid,
+ remote_sep_addr,
+ remote_epid,
+ pyld_buff_fmt,
+ mdata_buff_fmt);
+
+ _initialize_flow_ctrl(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ epids,
+ fc_freq_ratio,
+ fc_headroom_ratio);
+
+ // Now create the send I/O we will use for data
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ this->_send_callback(buff, send_link);
+ };
+
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ // Needs just a single recv frame for strs packets
+ _send_io = io_srv->make_send_client(send_link,
+ num_send_frames,
+ send_cb,
+ recv_link,
+ /* num_recv_frames */ 1,
+ recv_cb);
+ }
+
+ /*! Returns maximum number of payload bytes
+ *
+ * \return maximum number of payload bytes
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets a TX frame buffer
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return the frame buffer, or nullptr if timeout occurs
+ */
+ buff_t::uptr get_send_buff(const int32_t timeout_ms)
+ {
+ return _send_io->get_send_buff(timeout_ms);
+ }
+
+ /*!
+ * Sends a TX data packet
+ *
+ * \param buff the frame buffer containing the packet to send
+ */
+ void release_send_buff(buff_t::uptr buff)
+ {
+ _send_io->release_send_buff(std::move(buff));
+ }
+
+ /*!
+ * Writes header into frame buffer and returns payload pointer
+ *
+ * \param buff Frame buffer to write header into
+ * \param info Information to include in the header
+ * \return A pointer to the payload data area and the packet size in bytes
+ */
+ std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff,
+ const packet_info_t& info)
+ {
+ uint64_t tsf = 0;
+
+ if (info.has_tsf) {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS);
+ tsf = info.tsf;
+ } else {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS);
+ }
+
+ _send_header.set_eob(info.eob);
+ _send_header.set_seq_num(_data_seq_num++);
+
+ _send_packet->refresh(buff->data(), _send_header, tsf);
+ _send_packet->update_payload_size(info.payload_bytes);
+
+ return std::make_pair(
+ _send_packet->get_payload_ptr(),
+ _send_packet->get_chdr_header().get_length());
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRS) {
+ chdr::strs_payload strs;
+ strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ _fc_state.update_dest_recv_count({strs.xfer_count_bytes,
+ static_cast<uint32_t>(strs.xfer_count_pkts)});
+
+ // TODO: check strs status here and push into async msg queue
+
+ // Packet belongs to this transport, release buff and return true
+ recv_link->release_recv_buff(std::move(buff));
+ buff = nullptr;
+ return true;
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ }
+
+ /*!
+ * Send callback for I/O service
+ *
+ * The I/O service invokes this callback when it is requested to release
+ * a send buffer to the send link.
+ *
+ * \param buff the frame buffer to release
+ * \param send_link the send link for flow control messages
+ */
+ void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+
+ if (_fc_state.dest_has_space(packet_size)) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+
+ _fc_state.data_sent(packet_size);
+
+ if (_fc_state.get_fc_resync_req_pending()
+ && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) {
+ const auto& xfer_counts = _fc_state.get_xfer_counts();
+ const size_t strc_size =
+ _fc_sender.send_strc_resync(send_link, xfer_counts);
+ _fc_state.clear_fc_resync_req_pending();
+ _fc_state.data_sent(strc_size);
+ }
+ }
+ }
+
+ /*!
+ * Configures the stream endpoint using mgmt_portal
+ */
+ void _configure_sep(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_t& local_sep_addr,
+ const uhd::rfnoc::sep_id_t& local_epid,
+ const uhd::rfnoc::sep_addr_t& remote_sep_addr,
+ const uhd::rfnoc::sep_id_t& remote_epid,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt)
+ {
+ // Create a control transport with the tx data links to send mgmt packets
+ // needed to setup the stream. Only need one frame for this.
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 1, // num_send_frames
+ 1); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ data_mgmt_portal->config_local_tx_stream(
+ *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*!
+ * Initializes flow control
+ *
+ * To initialize flow control, we need to send an init strc packet, then
+ * receive a strs containing the stream endpoint ingress buffer size. We
+ * then repeat this (now that we know the buffer size) to configure the flow
+ * control frequency. To avoid having this logic within the data packet
+ * processing flow, we use temporary send and recv I/O instances with
+ * simple callbacks here.
+ */
+ void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const sep_id_pair_t sep_ids,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ {
+ // No flow control at initialization, just release all send buffs
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+ };
+
+ // For recv, just queue strs packets for recv_io to read
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* /*recv_link*/,
+ transport::send_link_if* /*send_link*/) {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS);
+ };
+
+ // No flow control at initialization, just release all recv buffs
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/) {
+ recv_link->release_recv_buff(std::move(buff));
+ };
+
+ auto send_io = io_srv->make_send_client(send_link,
+ 1, // num_send_frames
+ send_cb,
+ nullptr,
+ 0, // num_recv_frames
+ nullptr);
+
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ 1, // num_recv_frames
+ recv_cb,
+ nullptr,
+ 0, // num_send_frames
+ fc_cb);
+
+ chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
+ chdr::chdr_packet::uptr& recv_packet = _recv_packet;
+
+ // Function to send a strc init
+ auto send_strc_init = [&send_io, sep_ids, &strc_packet](
+ const stream_buff_params_t fc_freq = {0, 0}) {
+ transport::frame_buff::uptr buff = send_io->get_send_buff(0);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out getting a send buffer for strc init");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(0);
+ header.set_dst_epid(sep_ids.second);
+
+ chdr::strc_payload strc_pyld;
+ strc_pyld.src_epid = sep_ids.first;
+ strc_pyld.op_code = chdr::STRC_INIT;
+ strc_pyld.num_bytes = fc_freq.bytes;
+ strc_pyld.num_pkts = fc_freq.packets;
+ strc_packet->refresh(buff->data(), header, strc_pyld);
+
+ const size_t size = header.get_length();
+ buff->set_packet_size(size);
+ send_io->release_send_buff(std::move(buff));
+ };
+
+ // Function to receive a strs, returns buffer capacity
+ auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
+ transport::frame_buff::uptr buff = recv_io->get_recv_buff(200);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out wating for a strs packet during initialization");
+ }
+
+ recv_packet->refresh(buff->data());
+ UHD_ASSERT_THROW(
+ recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
+ chdr::strs_payload strs;
+ strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ recv_packet->get_payload_size() / sizeof(uint64_t),
+ recv_packet->conv_to_host<uint64_t>());
+
+ recv_io->release_recv_buff(std::move(buff));
+
+ return {strs.capacity_bytes,
+ static_cast<uint32_t>(strs.capacity_pkts)};
+ };
+
+ // Send a strc init to get the buffer size
+ send_strc_init();
+ stream_buff_params_t capacity = recv_strs();
+ _fc_state.set_dest_capacity(capacity);
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Received strs initializing buffer capacity to "
+ << capacity.bytes << " bytes");
+
+ // Calculate the requested fc_freq parameters
+ uhd::rfnoc::stream_buff_params_t fc_freq = {
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
+ static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_freq_ratio))};
+
+ const size_t headroom_bytes =
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
+ const size_t headroom_packets = static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_headroom_ratio));
+
+ fc_freq.bytes -= headroom_bytes;
+ fc_freq.packets -= headroom_packets;
+
+ // Send a strc init to configure fc freq
+ send_strc_init(fc_freq);
+ recv_strs();
+
+ // Release temporary I/O service interfaces to disconnect from it
+ send_io.reset();
+ recv_io.reset();
+ }
+
+ // Interface to the I/O service
+ transport::send_io_if::sptr _send_io;
+
+ // Flow control state
+ tx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Header to write into send packets
+ chdr::chdr_header _send_header;
+
+ // Packet for send data
+ chdr::chdr_packet::uptr _send_packet;
+
+ // Packet to receive strs messages
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strc flow control ack packets
+ detail::tx_flow_ctrl_sender _fc_sender;
+
+ // Local / Source EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
index 62b24ab61..1f14ea7d0 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
@@ -482,6 +482,8 @@ public: // Members
uint64_t num_pkts = 0;
//! Number of bytes to use for operation (64 bits)
uint64_t num_bytes = 0;
+ //! Size of a strc packet (including header)
+ static constexpr size_t PACKET_SIZE = 24;
public: // Functions
strc_payload() = default;
diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
index 120b0e0f8..28fa8ec7c 100644
--- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
@@ -7,11 +7,14 @@
#ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
#define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
+#include <uhd/stream.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/client_zero.hpp>
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -84,6 +87,7 @@ public:
virtual detail::client_zero::sptr get_client_zero(
sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0;
+
/*! Configure a flow controlled data stream from the endpoint with ID src_epid to the
* endpoint with ID dst_epid
*
@@ -102,7 +106,33 @@ public:
const double fc_headroom_ratio,
const bool reset = false) = 0;
- // TODO: Implement functions to get graph-wide streamers
+ /*! \brief Create a data stream going from the device to the host
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
+
+ /*! \brief Create a data stream going from the host to the device
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
/*!
* \brief Create a graph_stream_manager and return a unique_ptr to it
diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
index 79121a498..72f1cf1c7 100644
--- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
@@ -11,6 +11,7 @@
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -112,41 +113,29 @@ public:
/*! \brief Create a data stream going from the host to the device
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
/*! \brief Create a data stream going from the device to the host
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
const sep_addr_t src_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
static uptr make(const chdr::chdr_packet_factory& pkt_factory,
diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
index 0a2790a34..33a0e3df0 100644
--- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
@@ -8,21 +8,14 @@
#define INCLUDED_LIBUHD_MB_IFACE_HPP
#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
-#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <uhdlib/rfnoc/clock_iface.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
-// FIXME: Update this
-class chdr_rx_data_xport
-{
-public:
- using uptr = std::unique_ptr<chdr_rx_data_xport>;
-};
-
-using chdr_tx_data_xport = chdr_rx_data_xport;
-
/*! Motherboard (backchannel) interface
*
* In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to
@@ -70,7 +63,8 @@ public:
/*! Return a reference to a clock iface
*/
- virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0;
+ virtual std::shared_ptr<clock_iface> get_clock_iface(
+ const std::string& clock_name) = 0;
/*! Create a control transport
*
@@ -89,30 +83,34 @@ public:
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (sink) streaming endpoint ID
- * \param remote_epid Remote device (source) streaming endpoint ID
+ * \param addrs Address of the device and host stream endpoints
+ * \param epids Endpoint IDs of the device and host stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR RX data transport
*/
- virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
/*! Create an TX data transport
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (source) streaming endpoint ID
- * \param remote_epid Remote device (sink) streaming endpoint ID
+ * \param addrs Address of the host and device stream endpoints
+ * \param epids Endpoint IDs of the host and device stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR TX data transport
*/
- virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
};
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
index 7ec1b7bb2..bc56fd311 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
@@ -41,6 +41,8 @@ using device_id_t = uint16_t;
using sep_inst_t = uint16_t;
//! Stream Endpoint Physical Address Type
using sep_addr_t = std::pair<device_id_t, sep_inst_t>;
+//! Stream Endpoint Physical Address Type (first = source, second = destination)
+using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>;
//! Stream Endpoint ID Type
using sep_id_t = uint16_t;
//! Stream Endpoint pair Type (first = source, second = destination)
@@ -65,6 +67,19 @@ struct stream_buff_params_t
//! The data type of the buffer used to capture/generate data
enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
+//! Conversion from number of bits to sw_buff
+constexpr sw_buff_t bits_to_sw_buff(size_t bits)
+{
+ if (bits <= 8) {
+ return BUFF_U8;
+ } else if (bits <= 16) {
+ return BUFF_U16;
+ } else if (bits <= 32) {
+ return BUFF_U32;
+ } else {
+ return BUFF_U64;
+ }
+}
//----------------------------------------------
// Constants
@@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
constexpr uint16_t RFNOC_PROTO_VER = 0x0100;
+constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_CAPACITY_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_FREQ_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1;
+constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1;
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
new file mode 100644
index 000000000..6ced60d19
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -0,0 +1,95 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_rx_streamer : public node_t,
+ public transport::rx_streamer_impl<chdr_rx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "RxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Implementation of rx_streamer API method
+ *
+ * \param stream_cmd the stream command to issue
+ */
+ void issue_stream_cmd(const stream_cmd_t& stream_cmd);
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_in;
+ std::vector<property_t<double>> _samp_rate_in;
+ std::vector<property_t<double>> _tick_rate_in;
+ std::vector<property_t<std::string>> _type_in;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
new file mode 100644
index 000000000..4acee45cc
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
@@ -0,0 +1,90 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/transport/tx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_tx_streamer : public node_t,
+ public transport::tx_streamer_impl<chdr_tx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "TxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_out;
+ std::vector<property_t<double>> _samp_rate_out;
+ std::vector<property_t<double>> _tick_rate_out;
+ std::vector<property_t<std::string>> _type_out;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..937baf982
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
@@ -0,0 +1,130 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage rx flow control state
+class rx_flow_ctrl_state
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {}
+
+ //! Initialize frequency parameters
+ void initialize(const stream_buff_params_t fc_freq)
+ {
+ _fc_freq = fc_freq;
+ }
+
+ //! Resynchronize with transfer counts from the sender
+ void resynchronize(const stream_buff_params_t counts)
+ {
+ if (_recv_counts.bytes != counts.bytes
+ || _recv_counts.packets != counts.packets) {
+ // If there is a discrepancy between the amount of data sent by
+ // the device and received by the transport, adjust the counts
+ // of data received and transferred to include the dropped data.
+ auto bytes_dropped = counts.bytes - _recv_counts.bytes;
+ auto pkts_dropped = counts.packets - _recv_counts.packets;
+ _xfer_counts.bytes += bytes_dropped;
+ _xfer_counts.packets += pkts_dropped;
+
+ UHD_LOGGER_DEBUG("rx_flow_ctrl_state")
+ << "oh noes: bytes_sent=" << counts.bytes
+ << " bytes_received=" << _recv_counts.bytes
+ << " pkts_sent=" << counts.packets
+ << " pkts_received=" << _recv_counts.packets
+ << " src_epid=" << _epids.first << " dst_epid=" << _epids.second
+ << std::endl;
+
+ _recv_counts = counts;
+ }
+ }
+
+ //! Reset the transfer counts (happens during init)
+ void reset_counts()
+ {
+ UHD_LOGGER_TRACE("rx_flow_ctrl_state")
+ << "Resetting transfer counts" << std::endl;
+ _recv_counts = {0, 0};
+ _xfer_counts = {0, 0};
+ }
+
+ //! Update state when data is received
+ void data_received(const size_t bytes)
+ {
+ _recv_counts.bytes += bytes;
+ _recv_counts.packets++;
+ }
+
+ //! Update state when transfer is complete (buffer space freed)
+ void xfer_done(const size_t bytes)
+ {
+ _xfer_counts.bytes += bytes;
+ _xfer_counts.packets++;
+ }
+
+ //! Returns whether a flow control response is needed
+ bool fc_resp_due() const
+ {
+ stream_buff_params_t accum_counts = {
+ _xfer_counts.bytes - _last_fc_resp_counts.bytes,
+ _xfer_counts.packets - _last_fc_resp_counts.packets};
+
+ return accum_counts.bytes >= _fc_freq.bytes
+ || accum_counts.packets >= _fc_freq.packets;
+ }
+
+ //! Update state after flow control response was sent
+ void fc_resp_sent()
+ {
+ _last_fc_resp_counts = _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_recv_counts() const
+ {
+ return _recv_counts;
+ }
+
+ //! Returns configured flow control frequency
+ stream_buff_params_t get_fc_freq() const
+ {
+ return _fc_freq;
+ }
+
+private:
+ // Counts for data received, including any data still in use
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Counts for data read and whose buffer space is ok to reuse
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts sent in last flow control response
+ stream_buff_params_t _last_fc_resp_counts{0, 0};
+
+ // Frequency of flow control responses
+ stream_buff_params_t _fc_freq{0, 0};
+
+ // Endpoint ID for log messages
+ const sep_id_pair_t _epids;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..65fc1b093
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
@@ -0,0 +1,99 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage tx flow control state
+class tx_flow_ctrl_state
+{
+public:
+ //! Updates destination capacity
+ void set_dest_capacity(const stream_buff_params_t& capacity)
+ {
+ _dest_capacity = capacity;
+ }
+
+ //! Updates destination received count
+ void update_dest_recv_count(const stream_buff_params_t& recv_count)
+ {
+ _recv_counts = recv_count;
+ }
+
+ /*! Returns whether the destination has buffer space for the requested
+ * packet size
+ */
+ bool dest_has_space(const size_t packet_size)
+ {
+ // The stream endpoint only cares about bytes, the packet count is not
+ // important to determine the space available.
+ const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes;
+ const auto space_available = _dest_capacity.bytes - buffer_fullness;
+ return space_available >= packet_size;
+ }
+
+ //! Increments transfer count with amount of data sent
+ void data_sent(const size_t packet_size)
+ {
+ _xfer_counts.bytes += packet_size;
+ _xfer_counts.packets++;
+
+ // Request an fc resync after we have transferred a number of bytes >=
+ // to the destination capacity. There is no strict requirement on how
+ // often we need to send this, as it is only needed to correct for
+ // dropped packets. One buffer's worth of bytes is probably a good
+ // cadence.
+ if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) {
+ _fc_resync_req = true;
+ }
+ }
+
+ /*! Returns whether an fc resync request is pending. The policy we use
+ * here is to send an fc resync every time we send a number of bytes
+ * equal to the destination buffer capacity.
+ */
+ bool get_fc_resync_req_pending() const
+ {
+ return _fc_resync_req;
+ }
+
+ //! Clears fc resync request pending status
+ void clear_fc_resync_req_pending()
+ {
+ _fc_resync_req = false;
+ _last_fc_resync_bytes = _xfer_counts.bytes;
+ }
+
+ //! Returns counts for packets sent
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+private:
+ // Counts for data sent
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts for data received by the destination
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Buffer size at the destination
+ stream_buff_params_t _dest_capacity{0, 0};
+
+ // Counts sent in last flow control resync
+ size_t _last_fc_resync_bytes = 0;
+
+ // Track when to send ack packets
+ bool _fc_resync_req = false;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
new file mode 100644
index 000000000..662be6d2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
@@ -0,0 +1,175 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <boost/format.hpp>
+
+namespace uhd { namespace transport {
+
+// Number of iterations that get_aligned_buffs will attempt to time align
+// packets before returning an alignment failure. get_aligned_buffs increments
+// the iteration count when it finds a timestamp that is larger than the
+// timestamps on channels it has already aligned and thus has to restart
+// aligning timestamps on all channels to the new timestamp.
+constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000;
+
+/*!
+ * Implementation of rx time alignment. This method reads packets from the
+ * transports for each channel and discards any packets whose tsf does not
+ * match those of other channels due to dropped packets. Packets that do not
+ * have a tsf are not checked for alignment and never dropped.
+ */
+template <typename transport_t>
+class get_aligned_buffs
+{
+public:
+ enum alignment_result_t {
+ SUCCESS,
+ TIMEOUT,
+ SEQUENCE_ERROR,
+ ALIGNMENT_FAILURE,
+ BAD_PACKET
+ };
+
+ get_aligned_buffs(std::vector<typename transport_t::uptr>& xports,
+ std::vector<typename transport_t::buff_t::uptr>& frame_buffs,
+ std::vector<typename transport_t::packet_info_t>& infos)
+ : _xports(xports)
+ , _frame_buffs(frame_buffs)
+ , _infos(infos)
+ , _prev_tsf(_xports.size(), 0)
+ , _channels_to_align(_xports.size())
+ {
+ }
+
+ alignment_result_t operator()(const int32_t timeout_ms)
+ {
+ // Clear state
+ _channels_to_align.set();
+ bool time_valid = false;
+ uint64_t tsf = 0;
+ size_t iterations = 0;
+
+ while (_channels_to_align.any()) {
+ const size_t chan = _channels_to_align.find_first();
+ auto& xport = _xports[chan];
+ auto& info = _infos[chan];
+ auto& frame_buff = _frame_buffs[chan];
+ bool seq_error = false;
+
+ // Receive a data packet for the channel if we don't have one. A
+ // packet may already be there if the previous call was interrupted
+ // by an error.
+ if (!frame_buff) {
+ try {
+ std::tie(frame_buff, info, seq_error) =
+ xport->get_recv_buff(timeout_ms);
+ } catch (const uhd::value_error& e) {
+ // Bad packet
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive transport caught a value exception.\n%s")
+ % e.what();
+ return BAD_PACKET;
+ }
+ }
+
+ if (!frame_buff) {
+ return TIMEOUT;
+ }
+
+ if (info.has_tsf) {
+ const bool time_out_of_order = _prev_tsf[chan] > info.tsf;
+ _prev_tsf[chan] = info.tsf;
+
+ // If the user changes the device time while streaming, we can
+ // receive a packet that comes before the previous packet in
+ // time. This would cause the alignment logic to discard future
+ // received packets. Therefore, when this occurs, we reset the
+ // info to restart the alignment.
+ if (time_out_of_order) {
+ time_valid = false;
+ }
+
+ // Check if the time is larger than packets received for other
+ // channels, and if so, use this time to align all channels
+ if (!time_valid || info.tsf > tsf) {
+ // If we haven't found a set of aligned packets after many
+ // iterations, return an alignment failure
+ if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << "The rx streamer failed to time-align packets.";
+ return ALIGNMENT_FAILURE;
+ }
+
+ // Release buffers for channels aligned previously. Keep
+ // buffers that don't have a tsf since we don't need to
+ // align those.
+ for (size_t i = 0; i < _xports.size(); i++) {
+ if (!_channels_to_align.test(i) && _infos[i].has_tsf) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ _frame_buffs[i] = nullptr;
+ }
+ }
+
+ // Mark only this channel as aligned and save its tsf
+ _channels_to_align.set();
+ _channels_to_align.reset(chan);
+ time_valid = true;
+ tsf = info.tsf;
+ }
+
+ // Check if the time matches that of other aligned channels
+ else if (info.tsf == tsf) {
+ _channels_to_align.reset(chan);
+ }
+
+ // Otherwise, time is smaller than other channels, release the buffer
+ else {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+ } else {
+ // Packet doesn't have a tsf, just mark it as aligned
+ _channels_to_align.reset(chan);
+ }
+
+ // If this packet had a sequence error, stop to return the error.
+ // Keep the packet for the next call to get_aligned_buffs.
+ if (seq_error) {
+ return SEQUENCE_ERROR;
+ }
+ }
+
+ // All channels aligned
+ return SUCCESS;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr>& _xports;
+
+ // Storage for buffers resulting from alignment
+ std::vector<typename transport_t::buff_t::uptr>& _frame_buffs;
+
+ // Packet info corresponding to aligned buffers
+ std::vector<typename transport_t::packet_info_t>& _infos;
+
+ // Time of previous packet for each channel
+ std::vector<uint64_t> _prev_tsf;
+
+ // Keeps track of channels that are aligned
+ boost::dynamic_bitset<> _channels_to_align;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
new file mode 100644
index 000000000..d66e867bc
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -0,0 +1,341 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/endianness.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/rx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for error handling
+ *
+ * If a recv call reads data from multiple packets, and an error occurs in the
+ * second or later packets, recv stops short of the num samps requested and
+ * returns no error. The error is cached for the next call to recv.
+ *
+ * Timeout errors are an exception. Timeouts that occur in the second or later
+ * packets of a recv call stop the recv method but the error is not returned in
+ * the next call. The user can check for this condition since fewer samples are
+ * returned than the number requested.
+ */
+class rx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache, ignoring timeout errors
+ UHD_FORCE_INLINE void store(const rx_metadata_t& metadata)
+ {
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE bool check(rx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ metadata = _metadata_cache;
+ _cached_metadata = false;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::rx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of rx streamer API
+ */
+template <typename transport_t>
+class rx_streamer_impl : public rx_streamer
+{
+public:
+ //! Constructor
+ rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_ports)
+ , _in_buffs(num_ports)
+ {
+ if (stream_args.cpu_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a cpu_format!");
+ }
+ if (stream_args.otw_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a otw_format!");
+ }
+ _setup_converters(num_ports, stream_args);
+ _zero_copy_streamer.set_samp_rate(_samp_rate);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ //! Implementation of rx_streamer API method
+ UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const double timeout,
+ const bool one_packet)
+ {
+ if (_error_metadata_cache.check(metadata)) {
+ return 0;
+ }
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ size_t total_samps_recv =
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+
+ if (one_packet or metadata.end_of_burst) {
+ return total_samps_recv;
+ }
+
+ // First set of packets recv had an error, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ return total_samps_recv;
+ }
+
+ // Loop until buffer is filled or error code. This method returns the
+ // metadata from the first packet received, with the exception of
+ // end-of-burst.
+ uhd::rx_metadata_t loop_metadata;
+
+ while (total_samps_recv < nsamps_per_buff) {
+ size_t num_samps = _recv_one_packet(buffs,
+ nsamps_per_buff - total_samps_recv,
+ loop_metadata,
+ timeout_ms,
+ total_samps_recv * _convert_info.bytes_per_cpu_item);
+
+ // If metadata had an error code set, store for next call and return
+ if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ _error_metadata_cache.store(loop_metadata);
+ break;
+ }
+
+ total_samps_recv += num_samps;
+
+ // Return immediately if end of burst
+ if (loop_metadata.end_of_burst) {
+ metadata.end_of_burst = true;
+ break;
+ }
+ }
+
+ return total_samps_recv;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ _zero_copy_streamer.set_samp_rate(rate);
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Receive a single packet
+ UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const int32_t timeout_ms,
+ const size_t buffer_offset_bytes = 0)
+ {
+ if (_buff_samps_remaining == 0) {
+ // Current set of buffers has expired, get the next one
+ _buff_samps_remaining =
+ _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _fragment_offset_in_samps = 0;
+ } else {
+ // There are samples still left in the current set of buffers
+ metadata = _last_fragment_metadata;
+ metadata.time_spec += time_spec_t::from_ticks(
+ _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate);
+ }
+
+ if (_buff_samps_remaining != 0) {
+ const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining);
+
+ // Convert samples to the streamer's output format
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ char* b = reinterpret_cast<char*>(buffs[i]);
+ const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes);
+ _convert_to_out_buff(out_buffs, i, num_samps);
+ }
+
+ _buff_samps_remaining -= num_samps;
+
+ // Write the fragment flags and offset
+ metadata.more_fragments = _buff_samps_remaining != 0;
+ metadata.fragment_offset = _fragment_offset_in_samps;
+
+ if (metadata.more_fragments) {
+ _fragment_offset_in_samps += num_samps;
+ _last_fragment_metadata = metadata;
+ }
+
+ return num_samps;
+ } else {
+ return 0;
+ }
+ }
+
+ //! Convert samples for one channel into its buffer
+ UHD_FORCE_INLINE void _convert_to_out_buff(
+ const uhd::rx_streamer::buffs_type& out_buffs,
+ const size_t chan,
+ const size_t num_samps)
+ {
+ const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]);
+
+ _converters[chan]->conv(buffer_ptr, out_buffs, num_samps);
+
+ // Advance the pointer for the source buffer
+ _in_buffs[chan] =
+ buffer_ptr + num_samps * _convert_info.bytes_per_otw_item;
+
+ if (_buff_samps_remaining == num_samps) {
+ _zero_copy_streamer.release_recv_buff(chan);
+ }
+ }
+
+ //! Create converters and initialize _convert_info
+ void _setup_converters(const size_t num_ports,
+ const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.otw_format + "_chdr";
+ id.num_inputs = 1;
+ id.output_format = stream_args.cpu_format;
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_ports; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(1 / 32767.0);
+ }
+ }
+
+ // Converter and item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Implementation of frame buffer management and packet info
+ rx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Container for buffer pointers used in recv method
+ std::vector<const void*> _in_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Num samps remaining in buffer currently held by zero copy streamer
+ size_t _buff_samps_remaining = 0;
+
+ // Metadata cache for error handling
+ detail::rx_metadata_cache _error_metadata_cache;
+
+ // Fragment (partially read packet) information
+ size_t _fragment_offset_in_samps = 0;
+ rx_metadata_t _last_fragment_metadata;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..36f568f2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -0,0 +1,207 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/get_aligned_buffs.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of rx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class rx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ rx_streamer_zero_copy(const size_t num_ports)
+ : _xports(num_ports)
+ , _frame_buffs(num_ports)
+ , _infos(num_ports)
+ , _get_aligned_buffs(_xports, _frame_buffs, _infos)
+ {
+ }
+
+ ~rx_streamer_zero_copy()
+ {
+ for (size_t i = 0; i < _frame_buffs.size(); i++) {
+ if (_frame_buffs[i]) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ }
+ }
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of time-aligned buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param metadata returns the metadata corresponding to the buffer
+ * \param timeout_ms timeout in milliseconds
+ * \return the size in samples of each packet, or 0 if timeout
+ */
+ size_t get_recv_buffs(std::vector<const void*>& buffs,
+ rx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ metadata.reset();
+
+ switch (_get_aligned_buffs(timeout_ms)) {
+ case get_aligned_buffs_t::SUCCESS:
+ break;
+
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return 0;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return 0;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return 0;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ metadata.has_time_spec = _last_read_time_info.has_time_spec;
+ metadata.time_spec =
+ _last_read_time_info.time_spec
+ + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ return 0;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ // Get payload pointers for each buffer and aggregate eob. We set eob to
+ // true if any channel has it set, since no more data will be received for
+ // that channel. In most cases, all channels should have the same value.
+ bool eob = false;
+ for (size_t i = 0; i < buffs.size(); i++) {
+ buffs[i] = _infos[i].payload;
+ eob |= _infos[i].eob;
+ }
+
+ // Set the metadata from the buffer information at index zero
+ const auto& info_0 = _infos[0];
+
+ metadata.has_time_spec = info_0.has_tsf;
+ metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate);
+ metadata.start_of_burst = false;
+ metadata.end_of_burst = eob;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ // Done with these packets, save timestamp info for next call
+ _last_read_time_info.has_time_spec = metadata.has_time_spec;
+ _last_read_time_info.time_spec = metadata.time_spec;
+ _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+
+ return _last_read_time_info.num_samps;
+ }
+
+ /*!
+ * Release the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ void release_recv_buff(const size_t channel)
+ {
+ _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel]));
+ _frame_buffs[channel] = typename transport_t::buff_t::uptr();
+ }
+
+private:
+ using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+
+ // Information recorded by streamer about the last data packet processed,
+ // used to create the metadata when there is a sequence error.
+ struct last_read_time_info_t
+ {
+ size_t num_samps = 0;
+ bool has_time_spec = false;
+ time_spec_t time_spec;
+ };
+
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Storage for buffers for each channel while they are in flight (between
+ // calls to get_recv_buff and release_recv_buff).
+ std::vector<typename transport_t::buff_t::uptr> _frame_buffs;
+
+ // Packet info corresponding to the packets in flight
+ std::vector<typename transport_t::packet_info_t> _infos;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _samp_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Implementation of packet time alignment
+ get_aligned_buffs_t _get_aligned_buffs;
+
+ // Information about the last data packet processed
+ last_read_time_info_t _last_read_time_info;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
new file mode 100644
index 000000000..60881dad2
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -0,0 +1,307 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhdlib/transport/tx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for send calls with zero samples
+ *
+ * Metadata is cached when we get a send requesting a start of burst with no
+ * samples. It is applied here on the next call to send() that actually has
+ * samples to send.
+ */
+class tx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache
+ UHD_FORCE_INLINE void store(const tx_metadata_t& metadata)
+ {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE void check(tx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ // Only use cached time_spec if metadata does not have one
+ if (!metadata.has_time_spec) {
+ metadata.has_time_spec = _metadata_cache.has_time_spec;
+ metadata.time_spec = _metadata_cache.time_spec;
+ }
+ metadata.start_of_burst = _metadata_cache.start_of_burst;
+ metadata.end_of_burst = _metadata_cache.end_of_burst;
+ _cached_metadata = false;
+ }
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::tx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of tx streamer API
+ */
+template <typename transport_t>
+class tx_streamer_impl : public tx_streamer
+{
+public:
+ tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_chans)
+ , _zero_buffs(num_chans, &_zero)
+ , _out_buffs(num_chans)
+ {
+ _setup_converters(num_chans, stream_args);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ }
+
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ size_t send(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t& metadata_,
+ const double timeout)
+ {
+ uhd::tx_metadata_t metadata(metadata_);
+
+ if (nsamps_per_buff == 0 && metadata.start_of_burst) {
+ _metadata_cache.store(metadata);
+ return 0;
+ }
+
+ _metadata_cache.check(metadata);
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ if (nsamps_per_buff == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ timeout_ms);
+
+ return 0;
+ } else if (nsamps_per_buff <= _spp) {
+ return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+
+ } else {
+ size_t total_num_samps_sent = 0;
+ const bool eob = metadata.end_of_burst;
+ metadata.end_of_burst = false;
+
+ const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
+ const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ const size_t num_samps_sent = _send_one_packet(
+ buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+
+ total_num_samps_sent += num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_num_samps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
+ }
+
+ // Send the final fragment
+ metadata.end_of_burst = eob;
+
+ size_t nsamps_sent =
+ total_num_samps_sent
+ + _send_one_packet(
+ buffs, total_num_samps_sent, final_length, metadata, timeout);
+
+ return nsamps_sent;
+ }
+ }
+
+ //! Implementation of rx_streamer API method
+ bool recv_async_msg(
+ uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+ {
+ // TODO: implement me
+ return false;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Convert samples for one channel and sends a packet
+ size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t buffer_offset_in_samps,
+ const size_t num_samples,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ if (!_zero_copy_streamer.get_send_buffs(
+ _out_buffs, num_samples, metadata, timeout_ms)) {
+ return 0;
+ }
+
+ size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset;
+ _converters[i]->conv(input_ptr, _out_buffs[i], num_samples);
+
+ _zero_copy_streamer.release_send_buff(i);
+ }
+
+ return num_samples;
+ }
+
+ //! Create converters and initialize _bytes_per_cpu_item
+ void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = stream_args.otw_format + "_chdr";
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(32767.0);
+ }
+ }
+
+ // Converter item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Manages frame buffers and packet info
+ tx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Buffer used to handle send calls with no data
+ std::vector<const void*> _zero_buffs;
+
+ const uint64_t _zero = 0;
+
+ // Container for buffer pointers used in send method
+ std::vector<void*> _out_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Metadata cache for send calls with no data
+ detail::tx_metadata_cache _metadata_cache;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..1b6f55238
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -0,0 +1,147 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of tx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class tx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ tx_streamer_zero_copy(const size_t num_chans)
+ : _xports(num_chans), _frame_buffs(num_chans)
+ {
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of frame buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param nsamps_per_buff the number of samples that will be written to each buffer
+ * \param metadata the metadata to write to the packet header
+ * \param timeout_ms timeout in milliseconds
+ * \return true if the operation was sucessful, false if timeout occurs
+ */
+ UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
+ const size_t nsamps_per_buff,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ // Try to get a buffer per channel
+ for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) {
+ _frame_buffs[_next_buff_to_get].first =
+ _xports[_next_buff_to_get]->get_send_buff(timeout_ms);
+
+ if (!_frame_buffs[_next_buff_to_get].first) {
+ return false;
+ }
+ }
+
+ // Got all the buffers, start from index 0 next call
+ _next_buff_to_get = 0;
+
+ // Store portions of metadata we care about
+ typename transport_t::packet_info_t info;
+ info.has_tsf = metadata.has_time_spec;
+
+ if (metadata.has_time_spec) {
+ info.tsf = metadata.time_spec.to_ticks(_tick_rate);
+ }
+
+ info.payload_bytes = nsamps_per_buff * _bytes_per_item;
+ info.eob = metadata.end_of_burst;
+
+ // Write packet header
+ for (size_t i = 0; i < buffs.size(); i++) {
+ std::tie(buffs[i], _frame_buffs[i].second) =
+ _xports[i]->write_packet_header(_frame_buffs[i].first, info);
+ }
+
+ return true;
+ }
+
+ /*!
+ * Send the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ UHD_FORCE_INLINE void release_send_buff(const size_t channel)
+ {
+ _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second);
+ _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first));
+
+ _frame_buffs[channel].first = nullptr;
+ _frame_buffs[channel].second = 0;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Container to hold frame buffers for each channel and their packet sizes
+ std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Next channel from which to get a buffer, stored as a member to
+ // allow the streamer to continue where it stopped due to timeouts.
+ size_t _next_buff_to_get = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */