aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/rfnoc_graph.hpp21
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_packet.hpp9
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp481
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp550
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_types.hpp2
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp32
-rw-r--r--host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp13
-rw-r--r--host/lib/include/uhdlib/rfnoc/mb_iface.hpp44
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp21
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp95
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp90
-rw-r--r--host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp130
-rw-r--r--host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp99
-rw-r--r--host/lib/include/uhdlib/transport/get_aligned_buffs.hpp175
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp341
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp207
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp307
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp147
-rw-r--r--host/lib/rfnoc/CMakeLists.txt2
-rw-r--r--host/lib/rfnoc/chdr_packet.cpp18
-rw-r--r--host/lib/rfnoc/graph_stream_manager.cpp34
-rw-r--r--host/lib/rfnoc/link_stream_manager.cpp94
-rw-r--r--host/lib/rfnoc/mgmt_portal.cpp10
-rw-r--r--host/lib/rfnoc/rfnoc_graph.cpp494
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp141
-rw-r--r--host/lib/rfnoc/rfnoc_tx_streamer.cpp124
-rw-r--r--host/tests/CMakeLists.txt2
-rw-r--r--host/tests/common/mock_link.hpp8
-rw-r--r--host/tests/rfnoc_chdr_test.cpp46
-rw-r--r--host/tests/rx_streamer_test.cpp744
-rw-r--r--host/tests/tx_streamer_test.cpp393
31 files changed, 4540 insertions, 334 deletions
diff --git a/host/include/uhd/rfnoc_graph.hpp b/host/include/uhd/rfnoc_graph.hpp
index c13939ac9..08d5fc095 100644
--- a/host/include/uhd/rfnoc_graph.hpp
+++ b/host/include/uhd/rfnoc_graph.hpp
@@ -159,16 +159,17 @@ public:
/*! Connect a RFNOC block with block ID \p src_block to another with block ID \p
* dst_block.
*
+ * Note you need to also call this on statically connected blocks if you
+ * desire to use them.
+ *
* \param src_blk The block ID of the source block to connect.
* \param src_port The port of the source block to connect.
* \param dst_blk The block ID of the destination block to connect to.
* \param dst_port The port of the destination block to connect to.
* \param skip_property_propagation Skip property propagation for this edge
*
- * \throws connect_disallowed_on_src
- * if the source port is statically connected to a *different* block
- * \throws connect_disallowed_on_dst
- * if the destination port is statically connected to a *different* block
+ * \throws uhd::routing_error if the source or destination ports are
+ * statically connected to a *different* block
*/
virtual void connect(const block_id_t& src_blk,
size_t src_port,
@@ -186,7 +187,7 @@ public:
* \throws connect_disallowed_on_dst
* if the destination port is statically connected to a *different* block
*/
- virtual void connect(uhd::tx_streamer& streamer,
+ virtual void connect(uhd::tx_streamer::sptr streamer,
size_t strm_port,
const block_id_t& dst_blk,
size_t dst_port) = 0;
@@ -203,7 +204,7 @@ public:
*/
virtual void connect(const block_id_t& src_blk,
size_t src_port,
- uhd::rx_streamer& streamer,
+ uhd::rx_streamer::sptr streamer,
size_t strm_port) = 0;
/*! Enumerate all the possible static connections in the graph
@@ -244,10 +245,12 @@ public:
* start using it. If a different streamer is already connected
* to the intended source then that call may fail.
*
+ * \param num_ports Number of ports that will be connected to the streamer
* \param args Arguments to aid the construction of the streamer
* \return a shared pointer to a new streamer
*/
- //virtual rx_streamer::sptr create_rx_streamer(const stream_args_t& args) = 0;
+ virtual rx_streamer::sptr create_rx_streamer(const size_t num_ports,
+ const stream_args_t& args) = 0;
/*! Create a new transmit streamer from the streamer arguments
* The created streamer is still not connected to anything yet.
@@ -255,10 +258,12 @@ public:
* start using it. If a different streamer is already connected
* to the intended sink then that call may fail.
*
+ * \param num_ports Number of ports that will be connected to the streamer
* \param args Arguments to aid the construction of the streamer
* \return a shared pointer to a new streamer
*/
- //virtual tx_streamer::sptr create_tx_streamer(const stream_args_t& args) = 0;
+ virtual tx_streamer::sptr create_tx_streamer(const size_t num_ports,
+ const stream_args_t& args) = 0;
/**************************************************************************
* Hardware Control
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
index 770c6cf6f..cc729de6c 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp
@@ -114,6 +114,15 @@ public:
*/
virtual void* get_payload_ptr() = 0;
+ /*! Return the payload offset in bytes for a given type and num_mdata
+ *
+ * \param pkt_type The packet type for calculation
+ * \param num_mdata The number of metadata words for calculation
+ * \return The offset of the payload in a packet with the given params
+ */
+ virtual size_t calculate_payload_offset(const packet_type_t pkt_type,
+ const uint8_t num_mdata = 0) const = 0;
+
//! Shortcut to return the const metadata pointer cast as a specific type
template <typename data_t>
inline const data_t* get_mdata_const_ptr_as() const
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
new file mode 100644
index 000000000..69dceebe1
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -0,0 +1,481 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP
+
+#include <uhd/config.hpp>
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send rx flow control responses
+ */
+class rx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.first)
+ {
+ _fc_packet = pkt_factory.make_strs();
+ _fc_strs_pyld.src_epid = sep_ids.second;
+ }
+
+ /*! Configure buffer capacity
+ * \param recv_capacity The buffer capacity of the receive link
+ */
+ void set_capacity(const stream_buff_params_t& recv_capacity)
+ {
+ _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
+ _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
+ }
+
+ /*! Send a flow control response packet
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strs_payload fc_payload(_fc_strs_pyld);
+ fc_payload.xfer_count_bytes = counts.bytes;
+ fc_payload.xfer_count_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strs_packet::uptr _fc_packet;
+
+ // Pre-configured strs payload to hold values that don't change
+ chdr::strs_payload _fc_strs_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for RX chdr data
+ *
+ * This transport provides the streamer an interface to read RX data packets.
+ * The transport implements flow control and sequence number checking.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For an rx transport, the device sends data packets, and the host sends strs
+ * packets letting the device know that buffer space in the host has been freed.
+ * For lossy links, the device also sends strc packets to resynchronize the
+ * transfer counts between host and device, to correct for any dropped packets
+ * in the link.
+ */
+class chdr_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_rx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_recv_frames Num frames to reserve from the recv link
+ * \param recv_capacity Total capacity of the recv link
+ * \param fc_freq Frequency of flow control status messages
+ * \param fc_headroom Headroom for flow control status messages
+ * \param lossy_xport Whether the xport is lossy, for flow control configuration
+ */
+ chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_recv_frames,
+ const stream_buff_params_t& recv_capacity,
+ const stream_buff_params_t& fc_freq,
+ const stream_buff_params_t& fc_headroom,
+ const bool lossy_xport)
+ : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second)
+ {
+ const sep_addr_t remote_sep_addr = addrs.first;
+ const sep_addr_t local_sep_addr = addrs.second;
+ const sep_id_t remote_epid = epids.first;
+ const sep_id_t local_epid = epids.second;
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Creating rx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _recv_packet = pkt_factory.make_generic();
+ _fc_sender.set_capacity(recv_capacity);
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
+
+ // Make data transport
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ this->_fc_callback(std::move(buff), recv_link, send_link);
+ };
+
+ // Needs just a single send frame for responses
+ _recv_io = io_srv->make_recv_client(recv_link,
+ num_recv_frames,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ // Create a control transport with the rx data links to send mgmt packets
+ // needed to setup the stream
+ // Piggyback on frames from the recv_io_if
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 0, // num_send_frames
+ 0); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ // Note that this may be gratuitous--The endpoint may already have been set up
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ // Initialize flow control - management portal sends a stream command
+ // containing its requested flow control frequency, the rx transport
+ // responds with a stream status containing its buffer capacity.
+ data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport,
+ remote_epid,
+ lossy_xport,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ fc_freq,
+ fc_headroom);
+
+ data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid);
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Stream endpoint was configured with:"
+ << std::endl
+ << "capacity bytes=" << recv_capacity.bytes
+ << ", packets=" << recv_capacity.packets << std::endl
+ << "fc headroom bytes=" << fc_headroom.bytes
+ << ", packets=" << fc_headroom.packets << std::endl
+ << "fc frequency bytes=" << fc_freq.bytes
+ << ", packets=" << fc_freq.packets);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*! Returns maximum number payload bytes
+ *
+ * \return maximum payload bytes per packet
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets an RX frame buffer containing a recv packet
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return returns a tuple containing:
+ * - a frame_buff, or null if timeout occurs
+ * - info struct corresponding to the packet
+ * - whether the packet was out of sequence
+ */
+ std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms);
+
+ if (!buff) {
+ return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false);
+ }
+
+ auto info = _read_data_packet_info(buff);
+ bool seq_error = _is_out_of_sequence(std::get<1>(info));
+
+ return std::make_tuple(std::move(buff), std::get<0>(info), seq_error);
+ }
+
+ /*!
+ * Releases an RX frame buffer
+ *
+ * \param buff the frame buffer to release
+ */
+ void release_recv_buff(typename buff_t::uptr buff)
+ {
+ _recv_io->release_recv_buff(std::move(buff));
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+ const auto packet_size = buff->packet_size();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRC) {
+ chdr::strc_payload strc;
+ strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ const stream_buff_params_t strc_counts = {
+ strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)};
+
+ if (strc.op_code == chdr::STRC_RESYNC) {
+ // Resynchronize before updating fc_state, the strc payload
+ // contains counts before the strc packet itself
+ _fc_state.resynchronize(strc_counts);
+
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ } else if (strc.op_code == chdr::STRC_INIT) {
+ _fc_state.initialize(
+ {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)});
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Received strc init with fc freq"
+ << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
+
+ // Make sure flow control was initialized
+ assert(_fc_state.get_fc_freq().bytes > 0);
+ assert(_fc_state.get_fc_freq().packets > 0);
+
+ // Send a strs response to configure flow control on the sender
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+
+ // Reset counts, since mgmt_portal will do it to FPGA
+ _fc_state.reset_counts();
+
+ recv_link->release_recv_buff(std::move(buff));
+ buff = buff_t::uptr();
+ } else {
+ throw uhd::value_error("Unexpected opcode value in STRC packet.");
+ }
+
+ // For stream commands, we return true (packet was destined to this
+ // client) but release the buffer. The I/O service won't queue this
+ // packet in the recv_io_if.
+ return true;
+
+ } else if (type == chdr::PKT_TYPE_DATA_NO_TS
+ || type == chdr::PKT_TYPE_DATA_WITH_TS) {
+ // Update state that we received a packet
+ _fc_state.data_received(packet_size);
+
+ // If this is a data packet, just claim it by returning true. The
+ // I/O service will queue this packet in the recv_io_if.
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ /*!
+ * Flow control callback for I/O service
+ *
+ * The I/O service invokes this callback when a packet needs to be released
+ * to the recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link to which to release the buffer
+ * \param send_link the send link for flow control messages
+ */
+ void _fc_callback(buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+ recv_link->release_recv_buff(std::move(buff));
+ _fc_state.xfer_done(packet_size);
+ _send_fc_response(send_link);
+ }
+
+ /*!
+ * Sends a flow control response packet if necessary.
+ *
+ * \param send_link the send link for flow control messages
+ */
+ void _send_fc_response(transport::send_link_if* send_link)
+ {
+ if (_fc_state.fc_resp_due()) {
+ _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts());
+ _fc_state.fc_resp_sent();
+ }
+ }
+
+ /*!
+ * Checks if the sequence number is out of sequence, prints 'D' if it is
+ * and returns result of check.
+ *
+ * \return true if a sequence error occurred
+ */
+ UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num)
+ {
+ const uint16_t expected_packet_count = _data_seq_num;
+ _data_seq_num = seq_num + 1;
+
+ if (expected_packet_count != seq_num) {
+ UHD_LOG_FASTPATH("D");
+ return true;
+ }
+ return false;
+ }
+
+ /*!
+ * Reads packet header and returns information in a struct.
+ *
+ * \return a tuple containing the packet info and packet sequence number
+ */
+ std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff)
+ {
+ const void* data = buff->data();
+ _recv_packet->refresh(data);
+ const auto header = _recv_packet->get_chdr_header();
+ const auto optional_time = _recv_packet->get_timestamp();
+
+ packet_info_t info;
+ info.eob = header.get_eob();
+ info.has_tsf = optional_time.is_initialized();
+ info.tsf = optional_time ? *optional_time : 0;
+ info.payload_bytes = _recv_packet->get_payload_size();
+ info.payload = _recv_packet->get_payload_const_ptr();
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_io->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad CHDR header or invalid packet length.");
+ }
+
+ return std::make_tuple(info, header.get_seq_num());
+ }
+
+ // Interface to the I/O service
+ transport::recv_io_if::sptr _recv_io;
+
+ // Flow control state
+ rx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Packet for received data
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strs flow control response packets
+ detail::rx_flow_ctrl_sender _fc_sender;
+
+ // Local / Sink EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
new file mode 100644
index 000000000..62b811bf5
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -0,0 +1,550 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP
+
+#include <uhdlib/rfnoc/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+namespace detail {
+
+/*!
+ * Utility class to send tx flow control messages
+ */
+class tx_flow_ctrl_sender
+{
+public:
+ //! Constructor
+ tx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.second)
+ {
+ _fc_packet = pkt_factory.make_strc();
+ _fc_strc_pyld.src_epid = sep_ids.first;
+ _fc_strc_pyld.op_code = chdr::STRC_RESYNC;
+ }
+
+ /*!
+ * Sends a flow control resync packet
+ *
+ * Sends a strc packet with the resync opcode to make the device transfer
+ * counts match those of the host, to correct for dropped packets.
+ *
+ * \param send_link the link to use to send the packet
+ * \counts transfer counts for packet contents
+ */
+ size_t send_strc_resync(
+ transport::send_link_if* send_link, const stream_buff_params_t& counts)
+ {
+ auto buff = send_link->get_send_buff(0);
+ if (!buff) {
+ throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(_fc_seq_num++);
+ header.set_dst_epid(_dst_epid);
+
+ chdr::strc_payload fc_payload(_fc_strc_pyld);
+ fc_payload.num_bytes = counts.bytes;
+ fc_payload.num_pkts = counts.packets;
+
+ _fc_packet->refresh(buff->data(), header, fc_payload);
+ const size_t size = header.get_length();
+
+ buff->set_packet_size(size);
+ send_link->release_send_buff(std::move(buff));
+ return size;
+ }
+
+private:
+ // Endpoint ID for recipient of flow control response
+ const sep_id_t _dst_epid;
+
+ // Packet for writing flow control info
+ chdr::chdr_strc_packet::uptr _fc_packet;
+
+ // Pre-configured strc payload to hold values that don't change
+ chdr::strc_payload _fc_strc_pyld;
+
+ // Sequence number for flow control packets
+ uint16_t _fc_seq_num = 0;
+};
+} // namespace detail
+
+/*!
+ * Flow-controlled transport for TX chdr data
+ *
+ * This transport provides the streamer an interface to send TX data packets.
+ * The transport implements flow control and keeps track of sequence numbers.
+ *
+ * The transport uses I/O services to provide options for work scheduling. I/O
+ * services allow the I/O work to be offloaded to a worker thread or to be
+ * performed in the same thread as the streamer API calls.
+ *
+ * For a tx transport, the host sends data packets, and the device sends strs
+ * packets letting the host know that buffer space in the device stream endpoint
+ * has been freed. For lossy links, the host also sends strc packets to
+ * resynchronize the transfer counts between host and device, to correct for
+ * any dropped packets in the link.
+ */
+class chdr_tx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<chdr_tx_data_xport>;
+ using buff_t = transport::frame_buff;
+
+ //! Information about data packet
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ };
+
+ /*! Constructor
+ *
+ * \param io_srv The service that will schedule the xport I/O
+ * \param recv_link The recv link, already attached to the I/O service
+ * \param send_link The send link, already attached to the I/O service
+ * \param pkt_factory Factory to create packets with the desired chdr_w and endianness
+ * \param addrs Source and destination addresses
+ * \param epids Source and destination endpoint IDs
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param num_send_frames Num frames to reserve from the send link
+ * \param fc_freq_ratio Ratio to use to configure the device fc frequency
+ * \param fc_headroom_ratio Ratio to use to configure the device fc headroom
+ */
+ chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_pair_t& addrs,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
+ const size_t num_send_frames,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ : _fc_sender(pkt_factory, epids), _epid(epids.first)
+ {
+ const sep_addr_t remote_sep_addr = addrs.second;
+ const sep_addr_t local_sep_addr = addrs.first;
+ const sep_id_t remote_epid = epids.second;
+ const sep_id_t local_epid = epids.first;
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Creating tx xport with local epid=" << local_epid
+ << ", remote epid=" << remote_epid);
+
+ _send_header.set_dst_epid(epids.second);
+ _send_packet = pkt_factory.make_generic();
+ _recv_packet = pkt_factory.make_generic();
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = send_link->get_send_frame_size() - pyld_offset;
+
+ _configure_sep(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ local_sep_addr,
+ local_epid,
+ remote_sep_addr,
+ remote_epid,
+ pyld_buff_fmt,
+ mdata_buff_fmt);
+
+ _initialize_flow_ctrl(io_srv,
+ recv_link,
+ send_link,
+ pkt_factory,
+ epids,
+ fc_freq_ratio,
+ fc_headroom_ratio);
+
+ // Now create the send I/O we will use for data
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ this->_send_callback(buff, send_link);
+ };
+
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ // Needs just a single recv frame for strs packets
+ _send_io = io_srv->make_send_client(send_link,
+ num_send_frames,
+ send_cb,
+ recv_link,
+ /* num_recv_frames */ 1,
+ recv_cb);
+ }
+
+ /*! Returns maximum number of payload bytes
+ *
+ * \return maximum number of payload bytes
+ */
+ size_t get_max_payload_size() const
+ {
+ return _max_payload_size;
+ }
+
+ /*!
+ * Gets a TX frame buffer
+ *
+ * \param timeout_ms timeout in milliseconds
+ * \return the frame buffer, or nullptr if timeout occurs
+ */
+ buff_t::uptr get_send_buff(const int32_t timeout_ms)
+ {
+ return _send_io->get_send_buff(timeout_ms);
+ }
+
+ /*!
+ * Sends a TX data packet
+ *
+ * \param buff the frame buffer containing the packet to send
+ */
+ void release_send_buff(buff_t::uptr buff)
+ {
+ _send_io->release_send_buff(std::move(buff));
+ }
+
+ /*!
+ * Writes header into frame buffer and returns payload pointer
+ *
+ * \param buff Frame buffer to write header into
+ * \param info Information to include in the header
+ * \return A pointer to the payload data area and the packet size in bytes
+ */
+ std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff,
+ const packet_info_t& info)
+ {
+ uint64_t tsf = 0;
+
+ if (info.has_tsf) {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS);
+ tsf = info.tsf;
+ } else {
+ _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS);
+ }
+
+ _send_header.set_eob(info.eob);
+ _send_header.set_seq_num(_data_seq_num++);
+
+ _send_packet->refresh(buff->data(), _send_header, tsf);
+ _send_packet->update_payload_size(info.payload_bytes);
+
+ return std::make_pair(
+ _send_packet->get_payload_ptr(),
+ _send_packet->get_chdr_header().get_length());
+ }
+
+private:
+ /*!
+ * Recv callback for I/O service
+ *
+ * The I/O service invokes this callback when it reads a packet from the
+ * recv link.
+ *
+ * \param buff the frame buffer containing the packet data
+ * \param recv_link the recv link from which buff was read
+ * \param send_link the send link for flow control messages
+ */
+ bool _recv_callback(buff_t::uptr& buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/)
+ {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ if (dst_epid != _epid) {
+ return false;
+ }
+
+ if (type == chdr::PKT_TYPE_STRS) {
+ chdr::strs_payload strs;
+ strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ _recv_packet->get_payload_size() / sizeof(uint64_t),
+ _recv_packet->conv_to_host<uint64_t>());
+
+ _fc_state.update_dest_recv_count({strs.xfer_count_bytes,
+ static_cast<uint32_t>(strs.xfer_count_pkts)});
+
+ // TODO: check strs status here and push into async msg queue
+
+ // Packet belongs to this transport, release buff and return true
+ recv_link->release_recv_buff(std::move(buff));
+ buff = nullptr;
+ return true;
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ }
+
+ /*!
+ * Send callback for I/O service
+ *
+ * The I/O service invokes this callback when it is requested to release
+ * a send buffer to the send link.
+ *
+ * \param buff the frame buffer to release
+ * \param send_link the send link for flow control messages
+ */
+ void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link)
+ {
+ const size_t packet_size = buff->packet_size();
+
+ if (_fc_state.dest_has_space(packet_size)) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+
+ _fc_state.data_sent(packet_size);
+
+ if (_fc_state.get_fc_resync_req_pending()
+ && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) {
+ const auto& xfer_counts = _fc_state.get_xfer_counts();
+ const size_t strc_size =
+ _fc_sender.send_strc_resync(send_link, xfer_counts);
+ _fc_state.clear_fc_resync_req_pending();
+ _fc_state.data_sent(strc_size);
+ }
+ }
+ }
+
+ /*!
+ * Configures the stream endpoint using mgmt_portal
+ */
+ void _configure_sep(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_addr_t& local_sep_addr,
+ const uhd::rfnoc::sep_id_t& local_epid,
+ const uhd::rfnoc::sep_addr_t& remote_sep_addr,
+ const uhd::rfnoc::sep_id_t& remote_epid,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt)
+ {
+ // Create a control transport with the tx data links to send mgmt packets
+ // needed to setup the stream. Only need one frame for this.
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ local_epid,
+ 1, // num_send_frames
+ 1); // num_recv_frames
+
+ // Create new temporary management portal with the transports used for this stream
+ // TODO: This is a bit excessive. Maybe we can pare down the functionality of the
+ // portal just for route setup purposes. Whatever we do, we *must* use xport in it
+ // though otherwise the transport will not behave correctly.
+ auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make(
+ *ctrl_xport, pkt_factory, local_sep_addr, local_epid);
+
+ // Setup a route to the EPID
+ data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid);
+ data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid);
+
+ data_mgmt_portal->config_local_tx_stream(
+ *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt);
+
+ // We no longer need the control xport and mgmt_portal, release them so
+ // the control xport is no longer connected to the I/O service.
+ data_mgmt_portal.reset();
+ ctrl_xport.reset();
+ }
+
+ /*!
+ * Initializes flow control
+ *
+ * To initialize flow control, we need to send an init strc packet, then
+ * receive a strs containing the stream endpoint ingress buffer size. We
+ * then repeat this (now that we know the buffer size) to configure the flow
+ * control frequency. To avoid having this logic within the data packet
+ * processing flow, we use temporary send and recv I/O instances with
+ * simple callbacks here.
+ */
+ void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const sep_id_pair_t sep_ids,
+ const double fc_freq_ratio,
+ const double fc_headroom_ratio)
+ {
+ // No flow control at initialization, just release all send buffs
+ auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
+ send_link->release_send_buff(std::move(buff));
+ buff = nullptr;
+ };
+
+ // For recv, just queue strs packets for recv_io to read
+ auto recv_cb = [this](buff_t::uptr& buff,
+ transport::recv_link_if* /*recv_link*/,
+ transport::send_link_if* /*send_link*/) {
+ _recv_packet->refresh(buff->data());
+ const auto header = _recv_packet->get_chdr_header();
+ const auto type = header.get_pkt_type();
+ const auto dst_epid = header.get_dst_epid();
+
+ return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS);
+ };
+
+ // No flow control at initialization, just release all recv buffs
+ auto fc_cb = [this](buff_t::uptr buff,
+ transport::recv_link_if* recv_link,
+ transport::send_link_if* /*send_link*/) {
+ recv_link->release_recv_buff(std::move(buff));
+ };
+
+ auto send_io = io_srv->make_send_client(send_link,
+ 1, // num_send_frames
+ send_cb,
+ nullptr,
+ 0, // num_recv_frames
+ nullptr);
+
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ 1, // num_recv_frames
+ recv_cb,
+ nullptr,
+ 0, // num_send_frames
+ fc_cb);
+
+ chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc();
+ chdr::chdr_packet::uptr& recv_packet = _recv_packet;
+
+ // Function to send a strc init
+ auto send_strc_init = [&send_io, sep_ids, &strc_packet](
+ const stream_buff_params_t fc_freq = {0, 0}) {
+ transport::frame_buff::uptr buff = send_io->get_send_buff(0);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out getting a send buffer for strc init");
+ }
+
+ chdr::chdr_header header;
+ header.set_seq_num(0);
+ header.set_dst_epid(sep_ids.second);
+
+ chdr::strc_payload strc_pyld;
+ strc_pyld.src_epid = sep_ids.first;
+ strc_pyld.op_code = chdr::STRC_INIT;
+ strc_pyld.num_bytes = fc_freq.bytes;
+ strc_pyld.num_pkts = fc_freq.packets;
+ strc_packet->refresh(buff->data(), header, strc_pyld);
+
+ const size_t size = header.get_length();
+ buff->set_packet_size(size);
+ send_io->release_send_buff(std::move(buff));
+ };
+
+ // Function to receive a strs, returns buffer capacity
+ auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t {
+ transport::frame_buff::uptr buff = recv_io->get_recv_buff(200);
+
+ if (!buff) {
+ throw uhd::runtime_error(
+ "tx xport timed out wating for a strs packet during initialization");
+ }
+
+ recv_packet->refresh(buff->data());
+ UHD_ASSERT_THROW(
+ recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS);
+ chdr::strs_payload strs;
+ strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(),
+ recv_packet->get_payload_size() / sizeof(uint64_t),
+ recv_packet->conv_to_host<uint64_t>());
+
+ recv_io->release_recv_buff(std::move(buff));
+
+ return {strs.capacity_bytes,
+ static_cast<uint32_t>(strs.capacity_pkts)};
+ };
+
+ // Send a strc init to get the buffer size
+ send_strc_init();
+ stream_buff_params_t capacity = recv_strs();
+ _fc_state.set_dest_capacity(capacity);
+
+ UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
+ "Received strs initializing buffer capacity to "
+ << capacity.bytes << " bytes");
+
+ // Calculate the requested fc_freq parameters
+ uhd::rfnoc::stream_buff_params_t fc_freq = {
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)),
+ static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_freq_ratio))};
+
+ const size_t headroom_bytes =
+ static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio));
+ const size_t headroom_packets = static_cast<uint32_t>(
+ std::ceil(double(capacity.packets) * fc_headroom_ratio));
+
+ fc_freq.bytes -= headroom_bytes;
+ fc_freq.packets -= headroom_packets;
+
+ // Send a strc init to configure fc freq
+ send_strc_init(fc_freq);
+ recv_strs();
+
+ // Release temporary I/O service interfaces to disconnect from it
+ send_io.reset();
+ recv_io.reset();
+ }
+
+ // Interface to the I/O service
+ transport::send_io_if::sptr _send_io;
+
+ // Flow control state
+ tx_flow_ctrl_state _fc_state;
+
+ // Maximum data payload in bytes
+ size_t _max_payload_size = 0;
+
+ // Sequence number for data packets
+ uint16_t _data_seq_num = 0;
+
+ // Header to write into send packets
+ chdr::chdr_header _send_header;
+
+ // Packet for send data
+ chdr::chdr_packet::uptr _send_packet;
+
+ // Packet to receive strs messages
+ chdr::chdr_packet::uptr _recv_packet;
+
+ // Handles sending of strc flow control ack packets
+ detail::tx_flow_ctrl_sender _fc_sender;
+
+ // Local / Source EPID
+ sep_id_t _epid;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
index 62b24ab61..1f14ea7d0 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp
@@ -482,6 +482,8 @@ public: // Members
uint64_t num_pkts = 0;
//! Number of bytes to use for operation (64 bits)
uint64_t num_bytes = 0;
+ //! Size of a strc packet (including header)
+ static constexpr size_t PACKET_SIZE = 24;
public: // Functions
strc_payload() = default;
diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
index 120b0e0f8..28fa8ec7c 100644
--- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
@@ -7,11 +7,14 @@
#ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
#define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP
+#include <uhd/stream.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/client_zero.hpp>
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -84,6 +87,7 @@ public:
virtual detail::client_zero::sptr get_client_zero(
sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0;
+
/*! Configure a flow controlled data stream from the endpoint with ID src_epid to the
* endpoint with ID dst_epid
*
@@ -102,7 +106,33 @@ public:
const double fc_headroom_ratio,
const bool reset = false) = 0;
- // TODO: Implement functions to get graph-wide streamers
+ /*! \brief Create a data stream going from the device to the host
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
+
+ /*! \brief Create a data stream going from the host to the device
+ *
+ * \param dst_addr The address of the destination stream endpoint
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
+ * \param xport_args The transport arguments
+ * \return An transport instance
+ */
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args) = 0;
/*!
* \brief Create a graph_stream_manager and return a unique_ptr to it
diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
index 79121a498..72f1cf1c7 100644
--- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
@@ -11,6 +11,7 @@
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <uhdlib/rfnoc/epid_allocator.hpp>
#include <uhdlib/rfnoc/mb_iface.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <functional>
#include <memory>
#include <set>
@@ -112,41 +113,29 @@ public:
/*! \brief Create a data stream going from the host to the device
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
/*! \brief Create a data stream going from the device to the host
*
* \param dst_addr The address of the destination stream endpoint
- * \param lossy_xport Is the transport lossy?
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
- * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params
- * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params
* \param xport_args The transport arguments
* \return An transport instance
*/
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
const sep_addr_t src_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args) = 0;
static uptr make(const chdr::chdr_packet_factory& pkt_factory,
diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
index 0a2790a34..33a0e3df0 100644
--- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
@@ -8,21 +8,14 @@
#define INCLUDED_LIBUHD_MB_IFACE_HPP
#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
-#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <uhdlib/rfnoc/clock_iface.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
-// FIXME: Update this
-class chdr_rx_data_xport
-{
-public:
- using uptr = std::unique_ptr<chdr_rx_data_xport>;
-};
-
-using chdr_tx_data_xport = chdr_rx_data_xport;
-
/*! Motherboard (backchannel) interface
*
* In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to
@@ -70,7 +63,8 @@ public:
/*! Return a reference to a clock iface
*/
- virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0;
+ virtual std::shared_ptr<clock_iface> get_clock_iface(
+ const std::string& clock_name) = 0;
/*! Create a control transport
*
@@ -89,30 +83,34 @@ public:
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (sink) streaming endpoint ID
- * \param remote_epid Remote device (source) streaming endpoint ID
+ * \param addrs Address of the device and host stream endpoints
+ * \param epids Endpoint IDs of the device and host stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR RX data transport
*/
- virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
/*! Create an TX data transport
*
* This is typically called once per streaming channel.
*
- * \param local_device_id ID for the host transport adapter to use
- * \param local_epid Host (source) streaming endpoint ID
- * \param remote_epid Remote device (sink) streaming endpoint ID
+ * \param addrs Address of the host and device stream endpoints
+ * \param epids Endpoint IDs of the host and device stream endpoints
+ * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
+ * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
* \return A CHDR TX data transport
*/
- virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id,
- const sep_id_t& local_epid,
- const sep_id_t& remote_epid,
+ virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs,
+ const sep_id_pair_t& epids,
+ const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
+ const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
const device_addr_t& xport_args) = 0;
};
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
index 7ec1b7bb2..bc56fd311 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
@@ -41,6 +41,8 @@ using device_id_t = uint16_t;
using sep_inst_t = uint16_t;
//! Stream Endpoint Physical Address Type
using sep_addr_t = std::pair<device_id_t, sep_inst_t>;
+//! Stream Endpoint Physical Address Type (first = source, second = destination)
+using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>;
//! Stream Endpoint ID Type
using sep_id_t = uint16_t;
//! Stream Endpoint pair Type (first = source, second = destination)
@@ -65,6 +67,19 @@ struct stream_buff_params_t
//! The data type of the buffer used to capture/generate data
enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
+//! Conversion from number of bits to sw_buff
+constexpr sw_buff_t bits_to_sw_buff(size_t bits)
+{
+ if (bits <= 8) {
+ return BUFF_U8;
+ } else if (bits <= 16) {
+ return BUFF_U16;
+ } else if (bits <= 32) {
+ return BUFF_U32;
+ } else {
+ return BUFF_U64;
+ }
+}
//----------------------------------------------
// Constants
@@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 };
constexpr uint16_t RFNOC_PROTO_VER = 0x0100;
+constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_CAPACITY_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_FREQ_BYTES = (uint64_t(1) << 40) - 1;
+constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1;
+constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1;
+constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1;
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
new file mode 100644
index 000000000..6ced60d19
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -0,0 +1,95 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_rx_streamer : public node_t,
+ public transport::rx_streamer_impl<chdr_rx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "RxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Implementation of rx_streamer API method
+ *
+ * \param stream_cmd the stream command to issue
+ */
+ void issue_stream_cmd(const stream_cmd_t& stream_cmd);
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_in;
+ std::vector<property_t<double>> _samp_rate_in;
+ std::vector<property_t<double>> _tick_rate_in;
+ std::vector<property_t<std::string>> _type_in;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
new file mode 100644
index 000000000..4acee45cc
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp
@@ -0,0 +1,90 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
+#include <uhdlib/transport/tx_streamer_impl.hpp>
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+/*!
+ * Extends the streamer_impl to be an rfnoc node so it can connect to the
+ * graph. Configures the streamer conversion and rate parameters with values
+ * received through property propagation.
+ */
+class rfnoc_tx_streamer : public node_t,
+ public transport::tx_streamer_impl<chdr_tx_data_xport>
+{
+public:
+ /*! Constructor
+ *
+ * \param num_ports The number of ports
+ * \param stream_args Arguments to aid the construction of the streamer
+ */
+ rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args);
+
+ /*! Returns a unique identifier string for this node. In every RFNoC graph,
+ * no two nodes cannot have the same ID. Returns a string in the form of
+ * "TxStreamer#0".
+ *
+ * \returns The unique ID as a string
+ */
+ std::string get_unique_id() const;
+
+ /*! Returns the number of input ports for this block.
+ *
+ * Always returns 0 for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_input_ports() const;
+
+ /*! Returns the number of output ports for this block.
+ *
+ * \return noc_id The number of ports
+ */
+ size_t get_num_output_ports() const;
+
+ /*! Returns stream args provided at creation
+ *
+ * \return stream args provided when streamer is created
+ */
+ const uhd::stream_args_t& get_stream_args() const;
+
+ /*! Check that all streamer ports are connected to blocks
+ *
+ * Overrides node_t to ensure there are no unconnected ports.
+ *
+ * \param connected_inputs A list of input ports that are connected
+ * \param connected_outputs A list of output ports that are connected
+ * \returns true if the block can deal with this configuration
+ */
+ bool check_topology(const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs);
+
+private:
+ void _register_props(const size_t chan, const std::string& otw_format);
+
+ // Properties
+ std::vector<property_t<double>> _scaling_out;
+ std::vector<property_t<double>> _samp_rate_out;
+ std::vector<property_t<double>> _tick_rate_out;
+ std::vector<property_t<std::string>> _type_out;
+
+ // Streamer unique ID
+ const std::string _unique_id;
+
+ // Stream args provided at construction
+ const uhd::stream_args_t _stream_args;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..937baf982
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp
@@ -0,0 +1,130 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage rx flow control state
+class rx_flow_ctrl_state
+{
+public:
+ //! Constructor
+ rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {}
+
+ //! Initialize frequency parameters
+ void initialize(const stream_buff_params_t fc_freq)
+ {
+ _fc_freq = fc_freq;
+ }
+
+ //! Resynchronize with transfer counts from the sender
+ void resynchronize(const stream_buff_params_t counts)
+ {
+ if (_recv_counts.bytes != counts.bytes
+ || _recv_counts.packets != counts.packets) {
+ // If there is a discrepancy between the amount of data sent by
+ // the device and received by the transport, adjust the counts
+ // of data received and transferred to include the dropped data.
+ auto bytes_dropped = counts.bytes - _recv_counts.bytes;
+ auto pkts_dropped = counts.packets - _recv_counts.packets;
+ _xfer_counts.bytes += bytes_dropped;
+ _xfer_counts.packets += pkts_dropped;
+
+ UHD_LOGGER_DEBUG("rx_flow_ctrl_state")
+ << "oh noes: bytes_sent=" << counts.bytes
+ << " bytes_received=" << _recv_counts.bytes
+ << " pkts_sent=" << counts.packets
+ << " pkts_received=" << _recv_counts.packets
+ << " src_epid=" << _epids.first << " dst_epid=" << _epids.second
+ << std::endl;
+
+ _recv_counts = counts;
+ }
+ }
+
+ //! Reset the transfer counts (happens during init)
+ void reset_counts()
+ {
+ UHD_LOGGER_TRACE("rx_flow_ctrl_state")
+ << "Resetting transfer counts" << std::endl;
+ _recv_counts = {0, 0};
+ _xfer_counts = {0, 0};
+ }
+
+ //! Update state when data is received
+ void data_received(const size_t bytes)
+ {
+ _recv_counts.bytes += bytes;
+ _recv_counts.packets++;
+ }
+
+ //! Update state when transfer is complete (buffer space freed)
+ void xfer_done(const size_t bytes)
+ {
+ _xfer_counts.bytes += bytes;
+ _xfer_counts.packets++;
+ }
+
+ //! Returns whether a flow control response is needed
+ bool fc_resp_due() const
+ {
+ stream_buff_params_t accum_counts = {
+ _xfer_counts.bytes - _last_fc_resp_counts.bytes,
+ _xfer_counts.packets - _last_fc_resp_counts.packets};
+
+ return accum_counts.bytes >= _fc_freq.bytes
+ || accum_counts.packets >= _fc_freq.packets;
+ }
+
+ //! Update state after flow control response was sent
+ void fc_resp_sent()
+ {
+ _last_fc_resp_counts = _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+ //! Returns counts for completed transfers
+ stream_buff_params_t get_recv_counts() const
+ {
+ return _recv_counts;
+ }
+
+ //! Returns configured flow control frequency
+ stream_buff_params_t get_fc_freq() const
+ {
+ return _fc_freq;
+ }
+
+private:
+ // Counts for data received, including any data still in use
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Counts for data read and whose buffer space is ok to reuse
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts sent in last flow control response
+ stream_buff_params_t _last_fc_resp_counts{0, 0};
+
+ // Frequency of flow control responses
+ stream_buff_params_t _fc_freq{0, 0};
+
+ // Endpoint ID for log messages
+ const sep_id_pair_t _epids;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
new file mode 100644
index 000000000..65fc1b093
--- /dev/null
+++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp
@@ -0,0 +1,99 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP
+
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+
+namespace uhd { namespace rfnoc {
+
+//! Class to manage tx flow control state
+class tx_flow_ctrl_state
+{
+public:
+ //! Updates destination capacity
+ void set_dest_capacity(const stream_buff_params_t& capacity)
+ {
+ _dest_capacity = capacity;
+ }
+
+ //! Updates destination received count
+ void update_dest_recv_count(const stream_buff_params_t& recv_count)
+ {
+ _recv_counts = recv_count;
+ }
+
+ /*! Returns whether the destination has buffer space for the requested
+ * packet size
+ */
+ bool dest_has_space(const size_t packet_size)
+ {
+ // The stream endpoint only cares about bytes, the packet count is not
+ // important to determine the space available.
+ const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes;
+ const auto space_available = _dest_capacity.bytes - buffer_fullness;
+ return space_available >= packet_size;
+ }
+
+ //! Increments transfer count with amount of data sent
+ void data_sent(const size_t packet_size)
+ {
+ _xfer_counts.bytes += packet_size;
+ _xfer_counts.packets++;
+
+ // Request an fc resync after we have transferred a number of bytes >=
+ // to the destination capacity. There is no strict requirement on how
+ // often we need to send this, as it is only needed to correct for
+ // dropped packets. One buffer's worth of bytes is probably a good
+ // cadence.
+ if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) {
+ _fc_resync_req = true;
+ }
+ }
+
+ /*! Returns whether an fc resync request is pending. The policy we use
+ * here is to send an fc resync every time we send a number of bytes
+ * equal to the destination buffer capacity.
+ */
+ bool get_fc_resync_req_pending() const
+ {
+ return _fc_resync_req;
+ }
+
+ //! Clears fc resync request pending status
+ void clear_fc_resync_req_pending()
+ {
+ _fc_resync_req = false;
+ _last_fc_resync_bytes = _xfer_counts.bytes;
+ }
+
+ //! Returns counts for packets sent
+ stream_buff_params_t get_xfer_counts() const
+ {
+ return _xfer_counts;
+ }
+
+private:
+ // Counts for data sent
+ stream_buff_params_t _xfer_counts{0, 0};
+
+ // Counts for data received by the destination
+ stream_buff_params_t _recv_counts{0, 0};
+
+ // Buffer size at the destination
+ stream_buff_params_t _dest_capacity{0, 0};
+
+ // Counts sent in last flow control resync
+ size_t _last_fc_resync_bytes = 0;
+
+ // Track when to send ack packets
+ bool _fc_resync_req = false;
+};
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
new file mode 100644
index 000000000..662be6d2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
@@ -0,0 +1,175 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <boost/format.hpp>
+
+namespace uhd { namespace transport {
+
+// Number of iterations that get_aligned_buffs will attempt to time align
+// packets before returning an alignment failure. get_aligned_buffs increments
+// the iteration count when it finds a timestamp that is larger than the
+// timestamps on channels it has already aligned and thus has to restart
+// aligning timestamps on all channels to the new timestamp.
+constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000;
+
+/*!
+ * Implementation of rx time alignment. This method reads packets from the
+ * transports for each channel and discards any packets whose tsf does not
+ * match those of other channels due to dropped packets. Packets that do not
+ * have a tsf are not checked for alignment and never dropped.
+ */
+template <typename transport_t>
+class get_aligned_buffs
+{
+public:
+ enum alignment_result_t {
+ SUCCESS,
+ TIMEOUT,
+ SEQUENCE_ERROR,
+ ALIGNMENT_FAILURE,
+ BAD_PACKET
+ };
+
+ get_aligned_buffs(std::vector<typename transport_t::uptr>& xports,
+ std::vector<typename transport_t::buff_t::uptr>& frame_buffs,
+ std::vector<typename transport_t::packet_info_t>& infos)
+ : _xports(xports)
+ , _frame_buffs(frame_buffs)
+ , _infos(infos)
+ , _prev_tsf(_xports.size(), 0)
+ , _channels_to_align(_xports.size())
+ {
+ }
+
+ alignment_result_t operator()(const int32_t timeout_ms)
+ {
+ // Clear state
+ _channels_to_align.set();
+ bool time_valid = false;
+ uint64_t tsf = 0;
+ size_t iterations = 0;
+
+ while (_channels_to_align.any()) {
+ const size_t chan = _channels_to_align.find_first();
+ auto& xport = _xports[chan];
+ auto& info = _infos[chan];
+ auto& frame_buff = _frame_buffs[chan];
+ bool seq_error = false;
+
+ // Receive a data packet for the channel if we don't have one. A
+ // packet may already be there if the previous call was interrupted
+ // by an error.
+ if (!frame_buff) {
+ try {
+ std::tie(frame_buff, info, seq_error) =
+ xport->get_recv_buff(timeout_ms);
+ } catch (const uhd::value_error& e) {
+ // Bad packet
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive transport caught a value exception.\n%s")
+ % e.what();
+ return BAD_PACKET;
+ }
+ }
+
+ if (!frame_buff) {
+ return TIMEOUT;
+ }
+
+ if (info.has_tsf) {
+ const bool time_out_of_order = _prev_tsf[chan] > info.tsf;
+ _prev_tsf[chan] = info.tsf;
+
+ // If the user changes the device time while streaming, we can
+ // receive a packet that comes before the previous packet in
+ // time. This would cause the alignment logic to discard future
+ // received packets. Therefore, when this occurs, we reset the
+ // info to restart the alignment.
+ if (time_out_of_order) {
+ time_valid = false;
+ }
+
+ // Check if the time is larger than packets received for other
+ // channels, and if so, use this time to align all channels
+ if (!time_valid || info.tsf > tsf) {
+ // If we haven't found a set of aligned packets after many
+ // iterations, return an alignment failure
+ if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << "The rx streamer failed to time-align packets.";
+ return ALIGNMENT_FAILURE;
+ }
+
+ // Release buffers for channels aligned previously. Keep
+ // buffers that don't have a tsf since we don't need to
+ // align those.
+ for (size_t i = 0; i < _xports.size(); i++) {
+ if (!_channels_to_align.test(i) && _infos[i].has_tsf) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ _frame_buffs[i] = nullptr;
+ }
+ }
+
+ // Mark only this channel as aligned and save its tsf
+ _channels_to_align.set();
+ _channels_to_align.reset(chan);
+ time_valid = true;
+ tsf = info.tsf;
+ }
+
+ // Check if the time matches that of other aligned channels
+ else if (info.tsf == tsf) {
+ _channels_to_align.reset(chan);
+ }
+
+ // Otherwise, time is smaller than other channels, release the buffer
+ else {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+ } else {
+ // Packet doesn't have a tsf, just mark it as aligned
+ _channels_to_align.reset(chan);
+ }
+
+ // If this packet had a sequence error, stop to return the error.
+ // Keep the packet for the next call to get_aligned_buffs.
+ if (seq_error) {
+ return SEQUENCE_ERROR;
+ }
+ }
+
+ // All channels aligned
+ return SUCCESS;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr>& _xports;
+
+ // Storage for buffers resulting from alignment
+ std::vector<typename transport_t::buff_t::uptr>& _frame_buffs;
+
+ // Packet info corresponding to aligned buffers
+ std::vector<typename transport_t::packet_info_t>& _infos;
+
+ // Time of previous packet for each channel
+ std::vector<uint64_t> _prev_tsf;
+
+ // Keeps track of channels that are aligned
+ boost::dynamic_bitset<> _channels_to_align;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
new file mode 100644
index 000000000..d66e867bc
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -0,0 +1,341 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/endianness.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/rx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for error handling
+ *
+ * If a recv call reads data from multiple packets, and an error occurs in the
+ * second or later packets, recv stops short of the num samps requested and
+ * returns no error. The error is cached for the next call to recv.
+ *
+ * Timeout errors are an exception. Timeouts that occur in the second or later
+ * packets of a recv call stop the recv method but the error is not returned in
+ * the next call. The user can check for this condition since fewer samples are
+ * returned than the number requested.
+ */
+class rx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache, ignoring timeout errors
+ UHD_FORCE_INLINE void store(const rx_metadata_t& metadata)
+ {
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE bool check(rx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ metadata = _metadata_cache;
+ _cached_metadata = false;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::rx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of rx streamer API
+ */
+template <typename transport_t>
+class rx_streamer_impl : public rx_streamer
+{
+public:
+ //! Constructor
+ rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_ports)
+ , _in_buffs(num_ports)
+ {
+ if (stream_args.cpu_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a cpu_format!");
+ }
+ if (stream_args.otw_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a otw_format!");
+ }
+ _setup_converters(num_ports, stream_args);
+ _zero_copy_streamer.set_samp_rate(_samp_rate);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ //! Implementation of rx_streamer API method
+ UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const double timeout,
+ const bool one_packet)
+ {
+ if (_error_metadata_cache.check(metadata)) {
+ return 0;
+ }
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ size_t total_samps_recv =
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+
+ if (one_packet or metadata.end_of_burst) {
+ return total_samps_recv;
+ }
+
+ // First set of packets recv had an error, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ return total_samps_recv;
+ }
+
+ // Loop until buffer is filled or error code. This method returns the
+ // metadata from the first packet received, with the exception of
+ // end-of-burst.
+ uhd::rx_metadata_t loop_metadata;
+
+ while (total_samps_recv < nsamps_per_buff) {
+ size_t num_samps = _recv_one_packet(buffs,
+ nsamps_per_buff - total_samps_recv,
+ loop_metadata,
+ timeout_ms,
+ total_samps_recv * _convert_info.bytes_per_cpu_item);
+
+ // If metadata had an error code set, store for next call and return
+ if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ _error_metadata_cache.store(loop_metadata);
+ break;
+ }
+
+ total_samps_recv += num_samps;
+
+ // Return immediately if end of burst
+ if (loop_metadata.end_of_burst) {
+ metadata.end_of_burst = true;
+ break;
+ }
+ }
+
+ return total_samps_recv;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ _zero_copy_streamer.set_samp_rate(rate);
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Receive a single packet
+ UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const int32_t timeout_ms,
+ const size_t buffer_offset_bytes = 0)
+ {
+ if (_buff_samps_remaining == 0) {
+ // Current set of buffers has expired, get the next one
+ _buff_samps_remaining =
+ _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _fragment_offset_in_samps = 0;
+ } else {
+ // There are samples still left in the current set of buffers
+ metadata = _last_fragment_metadata;
+ metadata.time_spec += time_spec_t::from_ticks(
+ _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate);
+ }
+
+ if (_buff_samps_remaining != 0) {
+ const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining);
+
+ // Convert samples to the streamer's output format
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ char* b = reinterpret_cast<char*>(buffs[i]);
+ const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes);
+ _convert_to_out_buff(out_buffs, i, num_samps);
+ }
+
+ _buff_samps_remaining -= num_samps;
+
+ // Write the fragment flags and offset
+ metadata.more_fragments = _buff_samps_remaining != 0;
+ metadata.fragment_offset = _fragment_offset_in_samps;
+
+ if (metadata.more_fragments) {
+ _fragment_offset_in_samps += num_samps;
+ _last_fragment_metadata = metadata;
+ }
+
+ return num_samps;
+ } else {
+ return 0;
+ }
+ }
+
+ //! Convert samples for one channel into its buffer
+ UHD_FORCE_INLINE void _convert_to_out_buff(
+ const uhd::rx_streamer::buffs_type& out_buffs,
+ const size_t chan,
+ const size_t num_samps)
+ {
+ const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]);
+
+ _converters[chan]->conv(buffer_ptr, out_buffs, num_samps);
+
+ // Advance the pointer for the source buffer
+ _in_buffs[chan] =
+ buffer_ptr + num_samps * _convert_info.bytes_per_otw_item;
+
+ if (_buff_samps_remaining == num_samps) {
+ _zero_copy_streamer.release_recv_buff(chan);
+ }
+ }
+
+ //! Create converters and initialize _convert_info
+ void _setup_converters(const size_t num_ports,
+ const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.otw_format + "_chdr";
+ id.num_inputs = 1;
+ id.output_format = stream_args.cpu_format;
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_ports; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(1 / 32767.0);
+ }
+ }
+
+ // Converter and item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Implementation of frame buffer management and packet info
+ rx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Container for buffer pointers used in recv method
+ std::vector<const void*> _in_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Num samps remaining in buffer currently held by zero copy streamer
+ size_t _buff_samps_remaining = 0;
+
+ // Metadata cache for error handling
+ detail::rx_metadata_cache _error_metadata_cache;
+
+ // Fragment (partially read packet) information
+ size_t _fragment_offset_in_samps = 0;
+ rx_metadata_t _last_fragment_metadata;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..36f568f2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -0,0 +1,207 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/get_aligned_buffs.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of rx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class rx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ rx_streamer_zero_copy(const size_t num_ports)
+ : _xports(num_ports)
+ , _frame_buffs(num_ports)
+ , _infos(num_ports)
+ , _get_aligned_buffs(_xports, _frame_buffs, _infos)
+ {
+ }
+
+ ~rx_streamer_zero_copy()
+ {
+ for (size_t i = 0; i < _frame_buffs.size(); i++) {
+ if (_frame_buffs[i]) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ }
+ }
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of time-aligned buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param metadata returns the metadata corresponding to the buffer
+ * \param timeout_ms timeout in milliseconds
+ * \return the size in samples of each packet, or 0 if timeout
+ */
+ size_t get_recv_buffs(std::vector<const void*>& buffs,
+ rx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ metadata.reset();
+
+ switch (_get_aligned_buffs(timeout_ms)) {
+ case get_aligned_buffs_t::SUCCESS:
+ break;
+
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return 0;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return 0;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return 0;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ metadata.has_time_spec = _last_read_time_info.has_time_spec;
+ metadata.time_spec =
+ _last_read_time_info.time_spec
+ + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ return 0;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ // Get payload pointers for each buffer and aggregate eob. We set eob to
+ // true if any channel has it set, since no more data will be received for
+ // that channel. In most cases, all channels should have the same value.
+ bool eob = false;
+ for (size_t i = 0; i < buffs.size(); i++) {
+ buffs[i] = _infos[i].payload;
+ eob |= _infos[i].eob;
+ }
+
+ // Set the metadata from the buffer information at index zero
+ const auto& info_0 = _infos[0];
+
+ metadata.has_time_spec = info_0.has_tsf;
+ metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate);
+ metadata.start_of_burst = false;
+ metadata.end_of_burst = eob;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ // Done with these packets, save timestamp info for next call
+ _last_read_time_info.has_time_spec = metadata.has_time_spec;
+ _last_read_time_info.time_spec = metadata.time_spec;
+ _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+
+ return _last_read_time_info.num_samps;
+ }
+
+ /*!
+ * Release the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ void release_recv_buff(const size_t channel)
+ {
+ _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel]));
+ _frame_buffs[channel] = typename transport_t::buff_t::uptr();
+ }
+
+private:
+ using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+
+ // Information recorded by streamer about the last data packet processed,
+ // used to create the metadata when there is a sequence error.
+ struct last_read_time_info_t
+ {
+ size_t num_samps = 0;
+ bool has_time_spec = false;
+ time_spec_t time_spec;
+ };
+
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Storage for buffers for each channel while they are in flight (between
+ // calls to get_recv_buff and release_recv_buff).
+ std::vector<typename transport_t::buff_t::uptr> _frame_buffs;
+
+ // Packet info corresponding to the packets in flight
+ std::vector<typename transport_t::packet_info_t> _infos;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _samp_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Implementation of packet time alignment
+ get_aligned_buffs_t _get_aligned_buffs;
+
+ // Information about the last data packet processed
+ last_read_time_info_t _last_read_time_info;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
new file mode 100644
index 000000000..60881dad2
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -0,0 +1,307 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhdlib/transport/tx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for send calls with zero samples
+ *
+ * Metadata is cached when we get a send requesting a start of burst with no
+ * samples. It is applied here on the next call to send() that actually has
+ * samples to send.
+ */
+class tx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache
+ UHD_FORCE_INLINE void store(const tx_metadata_t& metadata)
+ {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE void check(tx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ // Only use cached time_spec if metadata does not have one
+ if (!metadata.has_time_spec) {
+ metadata.has_time_spec = _metadata_cache.has_time_spec;
+ metadata.time_spec = _metadata_cache.time_spec;
+ }
+ metadata.start_of_burst = _metadata_cache.start_of_burst;
+ metadata.end_of_burst = _metadata_cache.end_of_burst;
+ _cached_metadata = false;
+ }
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::tx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of tx streamer API
+ */
+template <typename transport_t>
+class tx_streamer_impl : public tx_streamer
+{
+public:
+ tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_chans)
+ , _zero_buffs(num_chans, &_zero)
+ , _out_buffs(num_chans)
+ {
+ _setup_converters(num_chans, stream_args);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ }
+
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ size_t send(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t& metadata_,
+ const double timeout)
+ {
+ uhd::tx_metadata_t metadata(metadata_);
+
+ if (nsamps_per_buff == 0 && metadata.start_of_burst) {
+ _metadata_cache.store(metadata);
+ return 0;
+ }
+
+ _metadata_cache.check(metadata);
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ if (nsamps_per_buff == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ timeout_ms);
+
+ return 0;
+ } else if (nsamps_per_buff <= _spp) {
+ return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+
+ } else {
+ size_t total_num_samps_sent = 0;
+ const bool eob = metadata.end_of_burst;
+ metadata.end_of_burst = false;
+
+ const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
+ const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ const size_t num_samps_sent = _send_one_packet(
+ buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+
+ total_num_samps_sent += num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_num_samps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
+ }
+
+ // Send the final fragment
+ metadata.end_of_burst = eob;
+
+ size_t nsamps_sent =
+ total_num_samps_sent
+ + _send_one_packet(
+ buffs, total_num_samps_sent, final_length, metadata, timeout);
+
+ return nsamps_sent;
+ }
+ }
+
+ //! Implementation of rx_streamer API method
+ bool recv_async_msg(
+ uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+ {
+ // TODO: implement me
+ return false;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Convert samples for one channel and sends a packet
+ size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t buffer_offset_in_samps,
+ const size_t num_samples,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ if (!_zero_copy_streamer.get_send_buffs(
+ _out_buffs, num_samples, metadata, timeout_ms)) {
+ return 0;
+ }
+
+ size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset;
+ _converters[i]->conv(input_ptr, _out_buffs[i], num_samples);
+
+ _zero_copy_streamer.release_send_buff(i);
+ }
+
+ return num_samples;
+ }
+
+ //! Create converters and initialize _bytes_per_cpu_item
+ void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = stream_args.otw_format + "_chdr";
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(32767.0);
+ }
+ }
+
+ // Converter item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Manages frame buffers and packet info
+ tx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Buffer used to handle send calls with no data
+ std::vector<const void*> _zero_buffs;
+
+ const uint64_t _zero = 0;
+
+ // Container for buffer pointers used in send method
+ std::vector<void*> _out_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Metadata cache for send calls with no data
+ detail::tx_metadata_cache _metadata_cache;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..1b6f55238
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -0,0 +1,147 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of tx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class tx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ tx_streamer_zero_copy(const size_t num_chans)
+ : _xports(num_chans), _frame_buffs(num_chans)
+ {
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of frame buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param nsamps_per_buff the number of samples that will be written to each buffer
+ * \param metadata the metadata to write to the packet header
+ * \param timeout_ms timeout in milliseconds
+ * \return true if the operation was sucessful, false if timeout occurs
+ */
+ UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
+ const size_t nsamps_per_buff,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ // Try to get a buffer per channel
+ for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) {
+ _frame_buffs[_next_buff_to_get].first =
+ _xports[_next_buff_to_get]->get_send_buff(timeout_ms);
+
+ if (!_frame_buffs[_next_buff_to_get].first) {
+ return false;
+ }
+ }
+
+ // Got all the buffers, start from index 0 next call
+ _next_buff_to_get = 0;
+
+ // Store portions of metadata we care about
+ typename transport_t::packet_info_t info;
+ info.has_tsf = metadata.has_time_spec;
+
+ if (metadata.has_time_spec) {
+ info.tsf = metadata.time_spec.to_ticks(_tick_rate);
+ }
+
+ info.payload_bytes = nsamps_per_buff * _bytes_per_item;
+ info.eob = metadata.end_of_burst;
+
+ // Write packet header
+ for (size_t i = 0; i < buffs.size(); i++) {
+ std::tie(buffs[i], _frame_buffs[i].second) =
+ _xports[i]->write_packet_header(_frame_buffs[i].first, info);
+ }
+
+ return true;
+ }
+
+ /*!
+ * Send the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ UHD_FORCE_INLINE void release_send_buff(const size_t channel)
+ {
+ _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second);
+ _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first));
+
+ _frame_buffs[channel].first = nullptr;
+ _frame_buffs[channel].second = 0;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Container to hold frame buffers for each channel and their packet sizes
+ std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Next channel from which to get a buffer, stored as a member to
+ // allow the streamer to continue where it stopped due to timeouts.
+ size_t _next_buff_to_get = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index dfef4f90f..963458fe6 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -53,6 +53,8 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/tick_node_ctrl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tx_stream_terminator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp
# Default block control classes:
${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp
diff --git a/host/lib/rfnoc/chdr_packet.cpp b/host/lib/rfnoc/chdr_packet.cpp
index 653181c04..d4b7494e2 100644
--- a/host/lib/rfnoc/chdr_packet.cpp
+++ b/host/lib/rfnoc/chdr_packet.cpp
@@ -31,7 +31,7 @@ public:
{
assert(pkt_buff);
_pkt_buff = const_cast<uint64_t*>(reinterpret_cast<const uint64_t*>(pkt_buff));
- _compute_mdata_offset();
+ _mdata_offset = _compute_mdata_offset(get_chdr_header());
}
virtual void refresh(void* pkt_buff, chdr_header& header, uint64_t timestamp = 0)
@@ -42,7 +42,7 @@ public:
if (_has_timestamp(header)) {
_pkt_buff[1] = u64_from_host(timestamp);
}
- _compute_mdata_offset();
+ _mdata_offset = _compute_mdata_offset(get_chdr_header());
}
virtual void update_payload_size(size_t payload_size_bytes)
@@ -115,19 +115,27 @@ public:
+ (chdr_w_stride * (_mdata_offset + get_chdr_header().get_num_mdata())));
}
+ virtual size_t calculate_payload_offset(const packet_type_t pkt_type,
+ const uint8_t num_mdata = 0) const
+ {
+ chdr_header header;
+ header.set_pkt_type(pkt_type);
+ return (_compute_mdata_offset(header) + num_mdata) * chdr_w_bytes;
+ }
+
private:
inline bool _has_timestamp(const chdr_header& header) const
{
return (header.get_pkt_type() == PKT_TYPE_DATA_WITH_TS);
}
- inline void _compute_mdata_offset() const
+ inline size_t _compute_mdata_offset(const chdr_header& header) const
{
// The metadata offset depends on the chdr_w and whether we have a timestamp
if (chdr_w == 64) {
- _mdata_offset = _has_timestamp(get_chdr_header()) ? 2 : 1;
+ return _has_timestamp(header) ? 2 : 1;
} else {
- _mdata_offset = 1;
+ return 1;
}
}
diff --git a/host/lib/rfnoc/graph_stream_manager.cpp b/host/lib/rfnoc/graph_stream_manager.cpp
index f2024786a..2db68db04 100644
--- a/host/lib/rfnoc/graph_stream_manager.cpp
+++ b/host/lib/rfnoc/graph_stream_manager.cpp
@@ -8,7 +8,9 @@
#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/graph_stream_manager.hpp>
#include <uhdlib/rfnoc/link_stream_manager.hpp>
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <boost/format.hpp>
+#include <boost/make_shared.hpp>
#include <map>
#include <set>
@@ -175,6 +177,38 @@ public:
"specified source endpoint");
}
+ chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ const sep_addr_t src_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args)
+ {
+ // TODO: choose a route
+ const device_id_t via_device = NULL_DEVICE_ID;
+
+ return _link_mgrs.at(_check_dst_and_find_src(src_addr, via_device))
+ ->create_device_to_host_data_stream(src_addr,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
+ }
+
+ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
+ sep_addr_t dst_addr,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const device_addr_t& xport_args)
+ {
+ // TODO: choose a route
+ const device_id_t via_device = NULL_DEVICE_ID;
+
+ return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device))
+ ->create_host_to_device_data_stream(dst_addr,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
+ }
+
private:
device_id_t _check_dst_and_find_src(sep_addr_t dst_addr, device_id_t via_device) const
{
diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp
index 4fe183529..6855162de 100644
--- a/host/lib/rfnoc/link_stream_manager.cpp
+++ b/host/lib/rfnoc/link_stream_manager.cpp
@@ -203,86 +203,71 @@ public:
false,
STREAM_SETUP_TIMEOUT);
- // Compute FC frequency and headroom based on buff parameters
- stream_buff_params_t fc_freq{
- static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * fc_freq_ratio)),
- static_cast<uint32_t>(
- std::ceil(double(buff_params.packets) * fc_freq_ratio))};
- stream_buff_params_t fc_headroom{
- static_cast<uint64_t>(
- std::ceil(double(buff_params.bytes) * fc_headroom_ratio)),
- static_cast<uint32_t>(
- std::ceil(double(buff_params.packets) * fc_headroom_ratio))};
-
// Reconfigure flow control using the new frequency and headroom
return _mgmt_portal->config_remote_stream(*_ctrl_xport,
dst_epid,
src_epid,
lossy_xport,
- fc_freq,
- fc_headroom,
+ _get_buff_params_ratio(buff_params, fc_freq_ratio),
+ _get_buff_params_ratio(buff_params, fc_headroom_ratio),
reset,
STREAM_SETUP_TIMEOUT);
}
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
- const bool lossy_xport,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args)
{
- // Create a new source endpoint and EPID
- sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++));
- sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr);
- _allocated_epids.insert(src_epid);
_ensure_ep_is_reachable(dst_addr);
- // Generate a new destination EPID instance
+ // Generate a new destination (device) EPID instance
sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid);
- // Create the data transport that we will return to the client
- chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport(
- _my_device_id, src_epid, dst_epid, xport_args);
-
- chdr_ctrl_xport::sptr mgmt_xport =
- _mb_iface.make_ctrl_transport(_my_device_id, src_epid);
-
- // Create new temporary management portal with the transports used for this stream
- // TODO: This is a bit excessive. Maybe we can pair down the functionality of the
- // portal just for route setup purposes. Whatever we do, we *must* use xport in it
- // though otherwise the transport will not behave correctly.
- mgmt_portal::uptr data_mgmt_portal =
- mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid);
-
- // Setup a route to the EPID
- data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid);
- data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid);
if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) {
throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");
}
- // TODO: Implement data transport setup logic here
-
+ // Create a new destination (host) endpoint and EPID
+ sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++));
+ sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr);
+ _allocated_epids.insert(src_epid);
- // Delete the portal when done
- data_mgmt_portal.reset();
- return xport;
+ return _mb_iface.make_tx_data_transport({sw_epid_addr, dst_addr},
+ {src_epid, dst_epid},
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
}
- virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream(
- const sep_addr_t src_addr,
- const bool lossy_xport,
+ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
+ sep_addr_t src_addr,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const double fc_freq_ratio,
- const double fc_headroom_ratio,
const device_addr_t& xport_args)
{
- // TODO: Implement me
- return chdr_tx_data_xport::uptr();
+ _ensure_ep_is_reachable(src_addr);
+
+ // Generate a new source (device) EPID instance
+ sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr);
+ _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid);
+
+ if (!_mgmt_portal->get_endpoint_info(src_epid).has_data) {
+ throw uhd::rfnoc_error("Downstream endpoint does not support data traffic");
+ }
+
+ // Create a new destination (host) endpoint and EPID
+ sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++));
+ sep_id_t dst_epid = _epid_alloc->allocate_epid(sw_epid_addr);
+ _allocated_epids.insert(dst_epid);
+
+ return _mb_iface.make_rx_data_transport({src_addr, sw_epid_addr},
+ {src_epid, dst_epid},
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ xport_args);
}
private:
@@ -295,7 +280,14 @@ private:
throw uhd::routing_error("Specified endpoint is not reachable");
}
- // A reference to the packet factor
+ stream_buff_params_t _get_buff_params_ratio(
+ const stream_buff_params_t& buff_params, const double ratio)
+ {
+ return {static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * ratio)),
+ static_cast<uint32_t>(std::ceil(double(buff_params.packets) * ratio))};
+ }
+
+ // A reference to the packet factory
const chdr::chdr_packet_factory& _pkt_factory;
// The device address of this software endpoint
const device_id_t _my_device_id;
diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp
index b490e0baf..a8b72cbdf 100644
--- a/host/lib/rfnoc/mgmt_portal.cpp
+++ b/host/lib/rfnoc/mgmt_portal.cpp
@@ -794,12 +794,11 @@ private: // Functions
mgmt_hop_t& hop)
{
// Validate flow control parameters
- if (fc_freq.bytes >= (uint64_t(1) << 40)
- || fc_freq.packets >= (uint64_t(1) << 24)) {
+ if (fc_freq.bytes > MAX_FC_FREQ_BYTES || fc_freq.packets > MAX_FC_FREQ_PKTS) {
throw uhd::value_error("Flow control frequency parameters out of bounds");
}
- if (fc_headroom.bytes >= (uint64_t(1) << 16)
- || fc_headroom.packets >= (uint64_t(1) << 8)) {
+ if (fc_headroom.bytes > MAX_FC_HEADROOM_BYTES
+ || fc_headroom.packets > MAX_FC_HEADROOM_PKTS) {
throw uhd::value_error("Flow control headroom parameters out of bounds");
}
@@ -992,7 +991,8 @@ private: // Functions
auto send_buff = xport.get_send_buff(timeout * 1000);
if (not send_buff) {
- UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction");
+ UHD_LOG_ERROR(
+ "RFNOC::MGMT", "Timed out getting send buff for management transaction");
throw uhd::io_error("Timed out getting send buff for management transaction");
}
_send_pkt->refresh(send_buff->data(), header, payload);
diff --git a/host/lib/rfnoc/rfnoc_graph.cpp b/host/lib/rfnoc/rfnoc_graph.cpp
index dd3dd7b90..4bf35cff1 100644
--- a/host/lib/rfnoc/rfnoc_graph.cpp
+++ b/host/lib/rfnoc/rfnoc_graph.cpp
@@ -15,12 +15,18 @@
#include <uhdlib/rfnoc/graph.hpp>
#include <uhdlib/rfnoc/graph_stream_manager.hpp>
#include <uhdlib/rfnoc/rfnoc_device.hpp>
+#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
+#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp>
#include <uhdlib/utils/narrow.hpp>
+#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp> // FIXME remove when rfnoc_device is ready
#include <memory>
using namespace uhd::rfnoc;
+namespace {
+const std::string LOG_ID("RFNOC::GRAPH");
+}
class rfnoc_graph_impl : public rfnoc_graph
{
@@ -33,6 +39,7 @@ public:
, _graph(std::make_unique<uhd::rfnoc::detail::graph_t>())
{
setup_graph(dev_addr);
+ _init_sep_map();
_init_static_connections();
}
@@ -76,31 +83,136 @@ public:
}
if (!has_block(dst_blk)) {
throw uhd::lookup_error(
- std::string("Cannot connect blocks, source block not found: ")
- + src_blk.to_string());
+ std::string("Cannot connect blocks, destination block not found: ")
+ + dst_blk.to_string());
}
+ auto edge_type = _physical_connect(src_blk, src_port, dst_blk, dst_port);
_connect(get_block(src_blk),
src_port,
get_block(dst_blk),
dst_port,
+ edge_type,
skip_property_propagation);
- _physical_connect(src_blk, src_port, dst_blk, dst_port);
}
- void connect(uhd::tx_streamer& /*streamer*/,
- size_t /*strm_port*/,
- const block_id_t& /*dst_blk*/,
- size_t /*dst_port*/)
+ void connect(uhd::tx_streamer::sptr streamer,
+ size_t strm_port,
+ const block_id_t& dst_blk,
+ size_t dst_port)
{
- throw uhd::not_implemented_error("");
+ // Verify the streamer was created by us
+ auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_tx_streamer>(streamer);
+ if (!rfnoc_streamer) {
+ throw uhd::type_error("Streamer is not rfnoc capable");
+ }
+
+ // Verify src_blk even exists in this graph
+ if (!has_block(dst_blk)) {
+ throw uhd::lookup_error(
+ std::string("Cannot connect block to streamer, source block not found: ")
+ + dst_blk.to_string());
+ }
+
+ // Verify src_blk has an SEP upstream
+ graph_edge_t dst_static_edge = _assert_edge(
+ _get_static_edge(
+ [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) {
+ return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port;
+ }),
+ dst_blk.to_string());
+ if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ dst_blk.to_string() + ":" + std::to_string(dst_port)
+ + " is not connected to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+
+ // Now get the name and address of the SEP
+ const std::string sep_block_id = dst_static_edge.src_blockid;
+ const sep_addr_t sep_addr = _sep_map.at(sep_block_id);
+
+ const sw_buff_t pyld_fmt =
+ bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width());
+ const sw_buff_t mdata_fmt = BUFF_U64;
+
+ auto xport = _gsm->create_host_to_device_data_stream(sep_addr,
+ pyld_fmt,
+ mdata_fmt,
+ rfnoc_streamer->get_stream_args().args);
+
+ rfnoc_streamer->connect_channel(strm_port, std::move(xport));
+
+ //// If this worked, then also connect the streamer in the BGL graph
+ auto dst = get_block(dst_blk);
+ graph_edge_t edge_info(strm_port, dst_port, graph_edge_t::TX_STREAM, true);
+ _graph->connect(rfnoc_streamer.get(), dst.get(), edge_info);
}
- void connect(const block_id_t& /*src_blk*/,
- size_t /*src_port*/,
- uhd::rx_streamer& /*streamer*/,
- size_t /*strm_port*/)
+ void connect(const block_id_t& src_blk,
+ size_t src_port,
+ uhd::rx_streamer::sptr streamer,
+ size_t strm_port)
{
- throw uhd::not_implemented_error("");
+ // Verify the streamer was created by us
+ auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_rx_streamer>(streamer);
+ if (!rfnoc_streamer) {
+ throw uhd::type_error("Streamer is not rfnoc capable");
+ }
+
+ // Verify src_blk even exists in this graph
+ if (!has_block(src_blk)) {
+ throw uhd::lookup_error(
+ std::string("Cannot connect block to streamer, source block not found: ")
+ + src_blk.to_string());
+ }
+
+ // Verify src_blk has an SEP downstream
+ graph_edge_t src_static_edge = _assert_edge(
+ _get_static_edge(
+ [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) {
+ return edge.src_blockid == src_blk_id && edge.src_port == src_port;
+ }),
+ src_blk.to_string());
+ if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ src_blk.to_string() + ":" + std::to_string(src_port)
+ + " is not connected to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+
+ // Now get the name and address of the SEP
+ const std::string sep_block_id = src_static_edge.dst_blockid;
+ const sep_addr_t sep_addr = _sep_map.at(sep_block_id);
+
+ const sw_buff_t pyld_fmt =
+ bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width());
+ const sw_buff_t mdata_fmt = BUFF_U64;
+
+ auto xport = _gsm->create_device_to_host_data_stream(sep_addr,
+ pyld_fmt,
+ mdata_fmt,
+ rfnoc_streamer->get_stream_args().args);
+
+ rfnoc_streamer->connect_channel(strm_port, std::move(xport));
+
+ // If this worked, then also connect the streamer in the BGL graph
+ auto src = get_block(src_blk);
+ graph_edge_t edge_info(src_port, strm_port, graph_edge_t::RX_STREAM, true);
+ _graph->connect(src.get(), rfnoc_streamer.get(), edge_info);
+ }
+
+ uhd::rx_streamer::sptr create_rx_streamer(
+ const size_t num_chans, const uhd::stream_args_t& args)
+ {
+ return boost::make_shared<rfnoc_rx_streamer>(num_chans, args);
+ }
+
+ uhd::tx_streamer::sptr create_tx_streamer(
+ const size_t num_chans, const uhd::stream_args_t& args)
+ {
+ return boost::make_shared<rfnoc_tx_streamer>(num_chans, args);
}
std::shared_ptr<mb_controller> get_mb_controller(const size_t mb_index = 0)
@@ -152,7 +264,7 @@ private:
throw uhd::key_error(std::string("Found no RFNoC devices for ----->\n")
+ dev_addr.to_pp_string());
}
- _tree = _device->get_tree();
+ _tree = _device->get_tree();
_num_mboards = _tree->list("/mboards").size();
for (size_t i = 0; i < _num_mboards; ++i) {
_mb_controllers.emplace(i, _device->get_mb_controller(i));
@@ -170,7 +282,8 @@ private:
try {
_gsm = graph_stream_manager::make(pkt_factory, epid_alloc, links);
} catch (uhd::io_error& ex) {
- UHD_LOG_ERROR("RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what());
+ UHD_LOG_ERROR(
+ "RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what());
throw;
}
@@ -187,6 +300,9 @@ private:
_gsm->connect_host_to_device(ctrl_sep_addr);
// Grab and stash the Client Zero for this mboard
detail::client_zero::sptr mb_cz = _gsm->get_client_zero(ctrl_sep_addr);
+ // Client zero port numbers are based on the control xbar numbers,
+ // which have the client 0 interface first, followed by stream
+ // endpoints, and then the blocks.
_client_zeros.emplace(mb_idx, mb_cz);
const size_t num_blocks = mb_cz->get_num_blocks();
@@ -204,7 +320,7 @@ private:
// Iterate through and register each of the blocks in this mboard
for (size_t portno = 0; portno < num_blocks; ++portno) {
- auto noc_id = mb_cz->get_noc_id(portno + first_block_port);
+ auto noc_id = mb_cz->get_noc_id(portno + first_block_port);
auto block_factory_info = factory::get_block_factory(noc_id);
auto block_info = mb_cz->get_block_info(portno + first_block_port);
block_id_t block_id(mb_idx,
@@ -222,24 +338,25 @@ private:
// iface object through the mb_iface
auto ctrlport_clk_iface =
mb.get_clock_iface(block_factory_info.ctrlport_clk);
- auto tb_clk_iface = (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) ?
- std::make_shared<clock_iface>(CLOCK_KEY_GRAPH) :
- mb.get_clock_iface(block_factory_info.timebase_clk);
+ auto tb_clk_iface =
+ (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH)
+ ? std::make_shared<clock_iface>(CLOCK_KEY_GRAPH)
+ : mb.get_clock_iface(block_factory_info.timebase_clk);
// A "graph" clock is always "running"
if (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) {
tb_clk_iface->set_running(true);
}
- auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr,
+ auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr,
portno,
*ctrlport_clk_iface.get(),
*tb_clk_iface.get());
- auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>();
+ auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>();
make_args_uptr->noc_id = noc_id;
- make_args_uptr->block_id = block_id;
- make_args_uptr->num_input_ports = block_info.num_inputs;
- make_args_uptr->num_output_ports = block_info.num_outputs;
- make_args_uptr->reg_iface = block_reg_iface;
- make_args_uptr->tb_clk_iface = tb_clk_iface;
+ make_args_uptr->block_id = block_id;
+ make_args_uptr->num_input_ports = block_info.num_inputs;
+ make_args_uptr->num_output_ports = block_info.num_outputs;
+ make_args_uptr->reg_iface = block_reg_iface;
+ make_args_uptr->tb_clk_iface = tb_clk_iface;
make_args_uptr->ctrlport_clk_iface = ctrlport_clk_iface;
make_args_uptr->mb_control = (factory::has_requested_mb_access(noc_id)
? _mb_controllers.at(mb_idx)
@@ -262,40 +379,43 @@ private:
_block_registry->init_props();
}
+ void _init_sep_map()
+ {
+ for (size_t mb_idx = 0; mb_idx < get_num_mboards(); ++mb_idx) {
+ auto remote_device_id = _device->get_mb_iface(mb_idx).get_remote_device_id();
+ auto& cz = _client_zeros.at(mb_idx);
+ for (size_t sep_idx = 0; sep_idx < cz->get_num_stream_endpoints();
+ ++sep_idx) {
+ // Register ID in _port_block_map
+ block_id_t id(mb_idx, NODE_ID_SEP, sep_idx);
+ _port_block_map.insert({{mb_idx, sep_idx + 1}, id});
+ _sep_map.insert({id.to_string(), sep_addr_t(remote_device_id, sep_idx)});
+ }
+ }
+ }
+
void _init_static_connections()
{
UHD_LOG_TRACE("RFNOC::GRAPH", "Identifying static connections...");
for (auto& kv_cz : _client_zeros) {
- auto& adjacency_list = kv_cz.second->get_adjacency_list();
- const size_t first_block_port = 1 + kv_cz.second->get_num_stream_endpoints();
-
+ auto& adjacency_list = kv_cz.second->get_adjacency_list();
for (auto& edge : adjacency_list) {
// Assemble edge
auto graph_edge = graph_edge_t();
- if (edge.src_blk_index < first_block_port) {
- block_id_t id(kv_cz.first, NODE_ID_SEP, edge.src_blk_index - 1);
- _port_block_map.insert({{kv_cz.first, edge.src_blk_index}, id});
- graph_edge.src_blockid = id.to_string();
- } else {
- graph_edge.src_blockid =
- _port_block_map.at({kv_cz.first, edge.src_blk_index});
- }
- if (edge.dst_blk_index < first_block_port) {
- block_id_t id(kv_cz.first, NODE_ID_SEP, edge.dst_blk_index - 1);
- _port_block_map.insert({{kv_cz.first, edge.dst_blk_index}, id});
- graph_edge.dst_blockid = id.to_string();
- } else {
- graph_edge.dst_blockid =
- _port_block_map.at({kv_cz.first, edge.dst_blk_index});
- }
+ UHD_ASSERT_THROW(
+ _port_block_map.count({kv_cz.first, edge.src_blk_index}));
+ graph_edge.src_blockid =
+ _port_block_map.at({kv_cz.first, edge.src_blk_index});
+ UHD_ASSERT_THROW(
+ _port_block_map.count({kv_cz.first, edge.dst_blk_index}));
+ graph_edge.dst_blockid =
+ _port_block_map.at({kv_cz.first, edge.dst_blk_index});
graph_edge.src_port = edge.src_blk_port;
graph_edge.dst_port = edge.dst_blk_port;
graph_edge.edge = graph_edge_t::edge_t::STATIC;
_static_edges.push_back(graph_edge);
- UHD_LOG_TRACE("RFNOC::GRAPH",
- "Static connection: "
- << graph_edge.src_blockid << ":" << graph_edge.src_port << " -> "
- << graph_edge.dst_blockid << ":" << graph_edge.dst_port);
+ UHD_LOG_TRACE(
+ "RFNOC::GRAPH", "Static connection: " << graph_edge.to_string());
}
}
}
@@ -312,214 +432,98 @@ private:
size_t src_port,
std::shared_ptr<node_t> dst_blk,
size_t dst_port,
+ graph_edge_t::edge_t edge_type,
bool skip_property_propagation)
{
graph_edge_t edge_info(
- src_port, dst_port, graph_edge_t::DYNAMIC, not skip_property_propagation);
+ src_port, dst_port, edge_type, not skip_property_propagation);
edge_info.src_blockid = src_blk->get_unique_id();
edge_info.dst_blockid = dst_blk->get_unique_id();
_graph->connect(src_blk.get(), dst_blk.get(), edge_info);
}
- /*! Helper method to find a stream endpoint connected to a block
- *
- * \param blk_id the block connected to the stream endpoint
- * \param port the port connected to the stream endpoint
- * \param blk_is_src true if the block is a data source, false if it is a
- * destination
- * \return the address of the stream endpoint, or boost::none if it is not
- * directly connected to a stream endpoint
- */
- boost::optional<sep_addr_t> _get_adjacent_sep(
- const block_id_t& blk_id, const size_t port, const bool blk_is_src) const
- {
- const std::string block_id_str = get_block(blk_id)->get_block_id().to_string();
- UHD_LOG_TRACE("RFNOC::GRAPH",
- "Finding SEP for " << (blk_is_src ? "source" : "dst") << " block "
- << block_id_str << ":" << port);
- // TODO: This is an attempt to simplify the algo, but it turns out to be
- // as many lines as before:
- //auto edge_predicate = [blk_is_src, block_id_str](const graph_edge_t edge) {
- //if (blk_is_src) {
- //return edge.src_blockid == block_id_str;
- //}
- //return edge.dst_blockid == block_id_str;
- //};
-
- //auto blk_edge_it =
- //std::find_if(_static_edges.cbegin(), _static_edges.cend(), edge_predicate);
- //if (blk_edge_it == _static_edges.cend()) {
- //return boost::none;
- //}
-
- //const std::string sep_block_id = blk_is_src ?
- //blk_edge_it->dst_blockid : blk_edge_it->src_blockid;
- //UHD_LOG_TRACE("RFNOC::GRAPH",
- //"Found SEP: " << sep_block_id);
-
- //auto port_map_result = std::find_if(_port_block_map.cbegin(),
- //_port_block_map.cend,
- //[sep_block_id](std::pair<std::pair<size_t, size_t>, block_id_t> port_block) {
- //return port_block.second == sep_block_id;
- //});
- //if (port_map_result == _port_block_map.cend()) {
- //throw uhd::lookup_error(
- //std::string("SEP `") + sep_block_id + "' not found in port/block map!");
- //}
- //const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id();
- //const sep_inst_t sep_inst = blk_is_src ?
- //edge.dst_blk_index - 1 : edge.src_blk_index - 1;
- //return sep_addr_t(dev, sep_inst);
-
- const auto& info = _xbar_block_config.at(block_id_str);
- const auto mb_idx = blk_id.get_device_no();
- const auto cz = _client_zeros.at(mb_idx);
-
- const size_t first_block_port = 1 + cz->get_num_stream_endpoints();
-
- for (const auto& edge : cz->get_adjacency_list()) {
- const auto edge_blk_idx = blk_is_src ? edge.src_blk_index
- : edge.dst_blk_index;
- const auto edge_blk_port = blk_is_src ? edge.src_blk_port : edge.dst_blk_port;
-
- if ((edge_blk_idx == info.xbar_port + first_block_port)
- and (edge_blk_port == port)) {
- UHD_LOGGER_DEBUG("RFNOC::GRAPH")
- << boost::format("Block found in adjacency list. %d:%d->%d:%d")
- % edge.src_blk_index
- % static_cast<unsigned int>(edge.src_blk_port)
- % edge.dst_blk_index
- % static_cast<unsigned int>(edge.dst_blk_port);
-
- // Check that the block is connected to a stream endpoint. The
- // minus one here is because index zero is client 0.
- const sep_inst_t sep_inst = blk_is_src ?
- edge.dst_blk_index - 1 : edge.src_blk_index - 1;
-
- if (sep_inst < cz->get_num_stream_endpoints()) {
- const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id();
- return sep_addr_t(dev, sep_inst);
- } else {
- // Block is connected to another block
- return boost::none;
- }
- }
- }
- return boost::none;
- }
-
/*! Internal physical connection helper
*
* Make the connections in the physical device
*
- * \throws connect_disallowed_on_src
- * if the source port is statically connected to a *different* block
- * \throws connect_disallowed_on_dst
- * if the destination port is statically connected to a *different* block
+ * \throws uhd::routing_error
+ * if the blocks are statically connected to something else
*/
- void _physical_connect(const block_id_t& src_blk,
+ graph_edge_t::edge_t _physical_connect(const block_id_t& src_blk,
size_t src_port,
const block_id_t& dst_blk,
size_t dst_port)
{
- auto src_blk_ctrl = get_block(src_blk);
- auto dst_blk_ctrl = get_block(dst_blk);
-
- /*
- * Start by determining if the connection can be made
- * Get the adjacency list and check if the connection is in it already
- */
- // Read the adjacency list for the source and destination blocks
- auto src_mb_idx = src_blk.get_device_no();
- auto src_cz = _gsm->get_client_zero(
- sep_addr_t(_device->get_mb_iface(src_mb_idx).get_remote_device_id(), 0));
- std::vector<detail::client_zero::edge_def_t>& adj_list =
- src_cz->get_adjacency_list();
- // Check the src_blk
- auto src_blk_xbar_info =
- _xbar_block_config.at(src_blk_ctrl->get_block_id().to_string());
- // This "xbar_port" starts at the first block, so we need to add the client zero
- // and stream endpoint ports
- const auto src_xbar_port =
- src_blk_xbar_info.xbar_port + src_cz->get_num_stream_endpoints() + 1;
- // We can also find out which stream endpoint the src block is connected to
- sep_inst_t src_sep;
- for (detail::client_zero::edge_def_t edge : adj_list) {
- if ((edge.src_blk_index == src_xbar_port)
- and (edge.src_blk_port == src_port)) {
- UHD_LOGGER_DEBUG("RFNOC::GRAPH")
- << boost::format("Block found in adjacency list. %d:%d->%d:%d")
- % edge.src_blk_index
- % static_cast<unsigned int>(edge.src_blk_port)
- % edge.dst_blk_index
- % static_cast<unsigned int>(edge.dst_blk_port);
- if (edge.dst_blk_index <= src_cz->get_num_stream_endpoints()) {
- src_sep =
- edge.dst_blk_index - 1 /* minus 1 because port 0 is client zero*/;
- } else {
- // TODO connect_disallowed_on_src?
- // TODO put more info in exception
- throw uhd::routing_error(
- "Unable to connect to statically connected source port");
- }
- }
+ const std::string src_blk_info =
+ src_blk.to_string() + ":" + std::to_string(src_port);
+ const std::string dst_blk_info =
+ dst_blk.to_string() + ":" + std::to_string(dst_port);
+
+ // Find the static edge for src_blk:src_port
+ graph_edge_t src_static_edge = _assert_edge(
+ _get_static_edge(
+ [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) {
+ return edge.src_blockid == src_blk_id && edge.src_port == src_port;
+ }),
+ src_blk_info);
+
+ // Now see if it's already connected to the destination
+ if (src_static_edge.dst_blockid == dst_blk.to_string()
+ && src_static_edge.dst_port == dst_port) {
+ UHD_LOG_TRACE(LOG_ID,
+ "Blocks " << src_blk_info << " and " << dst_blk_info
+ << " are already statically connected, no physical connection "
+ "required.");
+ return graph_edge_t::STATIC;
}
- // Read the dst adjacency list if its different
- // TODO they may be on the same mboard, which would make this redundant
- auto dst_mb_idx = dst_blk.get_device_no();
- auto dst_cz = _gsm->get_client_zero(
- sep_addr_t(_device->get_mb_iface(dst_mb_idx).get_remote_device_id(), 0));
- adj_list = dst_cz->get_adjacency_list();
- // Check the dst blk
- auto dst_blk_xbar_info =
- _xbar_block_config.at(dst_blk_ctrl->get_block_id().to_string());
- // This "xbar_port" starts at the first block, so we need to add the client zero
- // and stream endpoint ports
- const auto dst_xbar_port =
- dst_blk_xbar_info.xbar_port + dst_cz->get_num_stream_endpoints() + 1;
- // We can also find out which stream endpoint the dst block is connected to
- sep_inst_t dst_sep;
- for (detail::client_zero::edge_def_t edge : adj_list) {
- if ((edge.dst_blk_index == dst_xbar_port)
- and (edge.dst_blk_port == dst_port)) {
- UHD_LOGGER_DEBUG("RFNOC::GRAPH")
- << boost::format("Block found in adjacency list. %d:%d->%d:%d")
- % edge.src_blk_index
- % static_cast<unsigned int>(edge.src_blk_port)
- % edge.dst_blk_index
- % static_cast<unsigned int>(edge.dst_blk_port);
- if (edge.src_blk_index <= dst_cz->get_num_stream_endpoints()) {
- dst_sep =
- edge.src_blk_index - 1 /* minus 1 because port 0 is client zero*/;
- } else {
- // TODO connect_disallowed_on_dst?
- // TODO put more info in exception
- throw uhd::routing_error(
- "Unable to connect to statically connected destination port");
- }
- }
+ // If they're not statically connected, the source *must* be connected
+ // to an SEP, or this route is impossible
+ if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ src_blk_info + " is neither statically connected to " + dst_blk_info
+ + " nor to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
}
- /* TODO: we checked if either port is used in a static connection (and its not if
- * we've made it this far). We also need to check something else, but I can't
- * remember what...
- */
-
- // At this point, we know the attempted connection is valid, so let's go ahead and
- // make it
- sep_addr_t src_sep_addr(
- _device->get_mb_iface(src_mb_idx).get_remote_device_id(), src_sep);
- sep_addr_t dst_sep_addr(
- _device->get_mb_iface(dst_mb_idx).get_remote_device_id(), dst_sep);
+ // OK, now we know which source SEP we have
+ const std::string src_sep_info = src_static_edge.dst_blockid;
+ const sep_addr_t src_sep_addr = _sep_map.at(src_sep_info);
+
+ // Now find the static edge for the destination SEP
+ auto dst_static_edge = _assert_edge(
+ _get_static_edge(
+ [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) {
+ return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port;
+ }),
+ dst_blk_info);
+
+ // If they're not statically connected, the source *must* be connected
+ // to an SEP, or this route is impossible
+ if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) {
+ const std::string err_msg =
+ dst_blk_info + " is neither statically connected to " + src_blk_info
+ + " nor to an SEP! Routing impossible.";
+ UHD_LOG_ERROR(LOG_ID, err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+
+ // OK, now we know which destination SEP we have
+ const std::string dst_sep_info = dst_static_edge.src_blockid;
+ const sep_addr_t dst_sep_addr = _sep_map.at(dst_sep_info);
+
+ // Now all we need to do is dynamically connect those SEPs
auto strm_info = _gsm->create_device_to_device_data_stream(
dst_sep_addr, src_sep_addr, false, 0.1, 0.0, false);
- UHD_LOGGER_INFO("RFNOC::GRAPH")
+ UHD_LOGGER_DEBUG(LOG_ID)
<< boost::format("Data stream between EPID %d and EPID %d established "
"where downstream buffer can hold %lu bytes and %u packets")
% std::get<0>(strm_info).first % std::get<0>(strm_info).second
% std::get<1>(strm_info).bytes % std::get<1>(strm_info).packets;
+
+ return graph_edge_t::DYNAMIC;
}
//! Flush and reset each connected port on the mboard
@@ -541,6 +545,35 @@ private:
mb_cz->reset_ctrl(block_portno);
}
}
+
+ /*! Find the static edge that matches \p pred
+ *
+ * \throws uhd::assertion_error if the edge can't be found. So be careful!
+ */
+ template <typename UnaryPredicate>
+ boost::optional<graph_edge_t> _get_static_edge(UnaryPredicate&& pred)
+ {
+ auto edge_it = std::find_if(_static_edges.cbegin(), _static_edges.cend(), pred);
+ if (edge_it == _static_edges.cend()) {
+ return boost::none;
+ }
+ return *edge_it;
+ }
+
+ /*! Make sure an optional edge info is valid, or throw.
+ */
+ graph_edge_t _assert_edge(
+ boost::optional<graph_edge_t> edge_o, const std::string& blk_info)
+ {
+ if (!bool(edge_o)) {
+ const std::string err_msg = std::string("Cannot connect block ") + blk_info
+ + ", port is unconnected in the FPGA!";
+ UHD_LOG_ERROR("RFNOC::GRAPH", err_msg);
+ throw uhd::routing_error(err_msg);
+ }
+ return edge_o.get();
+ }
+
/**************************************************************************
* Attributes
*************************************************************************/
@@ -580,6 +613,9 @@ private:
// or SEP
std::map<std::pair<size_t, size_t>, block_id_t> _port_block_map;
+ //! Map SEP block ID (e.g. 0/SEP#0) onto a sep_addr_t
+ std::unordered_map<std::string, sep_addr_t> _sep_map;
+
//! List of statically connected edges. Includes SEPs too!
std::vector<graph_edge_t> _static_edges;
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
new file mode 100644
index 000000000..4340faff0
--- /dev/null
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -0,0 +1,141 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/defaults.hpp>
+#include <uhdlib/rfnoc/node_accessor.hpp>
+#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
+#include <atomic>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+
+const std::string STREAMER_ID = "RxStreamer";
+static std::atomic<uint64_t> streamer_inst_ctr;
+
+rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
+ const uhd::stream_args_t stream_args)
+ : rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args)
+ , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
+ , _stream_args(stream_args)
+{
+ // No block to which to forward properties or actions
+ set_prop_forwarding_policy(forwarding_policy_t::DROP);
+ set_action_forwarding_policy(forwarding_policy_t::DROP);
+
+ // Initialize properties
+ _scaling_in.reserve(num_chans);
+ _samp_rate_in.reserve(num_chans);
+ _tick_rate_in.reserve(num_chans);
+ _type_in.reserve(num_chans);
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _register_props(i, stream_args.otw_format);
+ }
+ node_accessor_t node_accessor{};
+ node_accessor.init_props(this);
+}
+
+std::string rfnoc_rx_streamer::get_unique_id() const
+{
+ return _unique_id;
+}
+
+size_t rfnoc_rx_streamer::get_num_input_ports() const
+{
+ return get_num_channels();
+}
+
+size_t rfnoc_rx_streamer::get_num_output_ports() const
+{
+ return 0;
+}
+
+void rfnoc_rx_streamer::issue_stream_cmd(const stream_cmd_t& stream_cmd)
+{
+ if (get_num_channels() > 1 and stream_cmd.stream_now
+ and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) {
+ throw uhd::runtime_error(
+ "Invalid recv stream command - stream now on multiple channels in a "
+ "single streamer will fail to time align.");
+ }
+
+ auto cmd = stream_cmd_action_info::make(stream_cmd.stream_mode);
+ cmd->stream_cmd = stream_cmd;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const res_source_info info(res_source_info::INPUT_EDGE, i);
+ post_action(info, cmd);
+ }
+}
+
+const uhd::stream_args_t& rfnoc_rx_streamer::get_stream_args() const
+{
+ return _stream_args;
+}
+
+bool rfnoc_rx_streamer::check_topology(
+ const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs)
+{
+ // Check that all channels are connected
+ if (connected_inputs.size() != get_num_input_ports()) {
+ return false;
+ }
+
+ // Call base class to check that connections are valid
+ return node_t::check_topology(connected_inputs, connected_outputs);
+}
+
+void rfnoc_rx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
+{
+ // Create actual properties and store them
+ _scaling_in.push_back(property_t<double>(
+ PROP_KEY_SCALING, {res_source_info::INPUT_EDGE, chan}));
+ _samp_rate_in.push_back(
+ property_t<double>(PROP_KEY_SAMP_RATE, {res_source_info::INPUT_EDGE, chan}));
+ _tick_rate_in.push_back(property_t<double>(
+ PROP_KEY_TICK_RATE, {res_source_info::INPUT_EDGE, chan}));
+ _type_in.emplace_back(property_t<std::string>(
+ PROP_KEY_TYPE, otw_format, {res_source_info::INPUT_EDGE, chan}));
+
+ // Give us some shorthands for the rest of this function
+ property_t<double>* scaling_in = &_scaling_in.back();
+ property_t<double>* samp_rate_in = &_samp_rate_in.back();
+ property_t<double>* tick_rate_in = &_tick_rate_in.back();
+ property_t<std::string>* type_in = &_type_in.back();
+
+ // Register them
+ register_property(scaling_in);
+ register_property(samp_rate_in);
+ register_property(tick_rate_in);
+ register_property(type_in);
+
+ // Add resolvers
+ add_property_resolver({scaling_in}, {},
+ [&scaling_in = *scaling_in, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `scaling_in'@" << chan);
+ if (scaling_in.is_valid()) {
+ this->set_scale_factor(chan, scaling_in.get() / 32767.0);
+ }
+ });
+
+ add_property_resolver({samp_rate_in}, {},
+ [&samp_rate_in = *samp_rate_in, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `samp_rate_in'@" << chan);
+ if (samp_rate_in.is_valid()) {
+ this->set_samp_rate(samp_rate_in.get());
+ }
+ });
+
+ add_property_resolver({tick_rate_in}, {},
+ [&tick_rate_in = *tick_rate_in, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `tick_rate_in'@" << chan);
+ if (tick_rate_in.is_valid()) {
+ this->set_tick_rate(tick_rate_in.get());
+ }
+ });
+}
diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
new file mode 100644
index 000000000..82feeaf1f
--- /dev/null
+++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp
@@ -0,0 +1,124 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/defaults.hpp>
+#include <uhdlib/rfnoc/node_accessor.hpp>
+#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp>
+#include <atomic>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+
+const std::string STREAMER_ID = "TxStreamer";
+static std::atomic<uint64_t> streamer_inst_ctr;
+
+rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans,
+ const uhd::stream_args_t stream_args)
+ : tx_streamer_impl<chdr_tx_data_xport>(num_chans, stream_args)
+ , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
+ , _stream_args(stream_args)
+{
+ // No block to which to forward properties or actions
+ set_prop_forwarding_policy(forwarding_policy_t::DROP);
+ set_action_forwarding_policy(forwarding_policy_t::DROP);
+
+ // Initialize properties
+ _scaling_out.reserve(num_chans);
+ _samp_rate_out.reserve(num_chans);
+ _tick_rate_out.reserve(num_chans);
+ _type_out.reserve(num_chans);
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _register_props(i, stream_args.otw_format);
+ }
+
+ node_accessor_t node_accessor;
+ node_accessor.init_props(this);
+}
+
+std::string rfnoc_tx_streamer::get_unique_id() const
+{
+ return _unique_id;
+}
+
+size_t rfnoc_tx_streamer::get_num_input_ports() const
+{
+ return 0;
+}
+
+size_t rfnoc_tx_streamer::get_num_output_ports() const
+{
+ return get_num_channels();
+}
+
+const uhd::stream_args_t& rfnoc_tx_streamer::get_stream_args() const
+{
+ return _stream_args;
+}
+
+bool rfnoc_tx_streamer::check_topology(
+ const std::vector<size_t>& connected_inputs,
+ const std::vector<size_t>& connected_outputs)
+{
+ // Check that all channels are connected
+ if (connected_outputs.size() != get_num_output_ports()) {
+ return false;
+ }
+
+ // Call base class to check that connections are valid
+ return node_t::check_topology(connected_inputs, connected_outputs);
+}
+
+void rfnoc_tx_streamer::_register_props(const size_t chan,
+ const std::string& otw_format)
+{
+ // Create actual properties and store them
+ _scaling_out.push_back(property_t<double>(
+ PROP_KEY_SCALING, {res_source_info::OUTPUT_EDGE, chan}));
+ _samp_rate_out.push_back(property_t<double>(
+ PROP_KEY_SAMP_RATE, {res_source_info::OUTPUT_EDGE, chan}));
+ _tick_rate_out.push_back(property_t<double>(
+ PROP_KEY_TICK_RATE, {res_source_info::OUTPUT_EDGE, chan}));
+ _type_out.emplace_back(property_t<std::string>(
+ PROP_KEY_TYPE, otw_format, {res_source_info::OUTPUT_EDGE, chan}));
+
+ // Give us some shorthands for the rest of this function
+ property_t<double>* scaling_out = &_scaling_out.back();
+ property_t<double>* samp_rate_out = &_samp_rate_out.back();
+ property_t<double>* tick_rate_out = &_tick_rate_out.back();
+ property_t<std::string>* type_out = &_type_out.back();
+
+ // Register them
+ register_property(scaling_out);
+ register_property(samp_rate_out);
+ register_property(tick_rate_out);
+ register_property(type_out);
+
+ // Add resolvers
+ add_property_resolver({scaling_out}, {},
+ [&scaling_out = *scaling_out, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan);
+ if (scaling_out.is_valid()) {
+ this->set_scale_factor(chan, 32767.0 / scaling_out.get());
+ }
+ });
+
+ add_property_resolver({samp_rate_out}, {},
+ [&samp_rate_out = *samp_rate_out, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan);
+ if (samp_rate_out.is_valid()) {
+ this->set_samp_rate(samp_rate_out.get());
+ }
+ });
+
+ add_property_resolver({tick_rate_out}, {},
+ [&tick_rate_out = *tick_rate_out, chan, this]() {
+ RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan);
+ if (tick_rate_out.is_valid()) {
+ this->set_tick_rate(tick_rate_out.get());
+ }
+ });
+}
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index d6ad6d777..1cdb42b96 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -55,6 +55,8 @@ set(test_sources
fe_conn_test.cpp
rfnoc_node_test.cpp
link_test.cpp
+ rx_streamer_test.cpp
+ tx_streamer_test.cpp
)
set(benchmark_sources
diff --git a/host/tests/common/mock_link.hpp b/host/tests/common/mock_link.hpp
index 34ea15540..73a65916c 100644
--- a/host/tests/common/mock_link.hpp
+++ b/host/tests/common/mock_link.hpp
@@ -94,6 +94,14 @@ public:
}
/*!
+ * Return the number of packets stored in the mock link.
+ */
+ size_t get_num_packets() const
+ {
+ return _tx_mems.size();
+ }
+
+ /*!
* Retrieve the contents of a packet sent by the link. The link
* stores packets in a queue in the order they were sent.
*/
diff --git a/host/tests/rfnoc_chdr_test.cpp b/host/tests/rfnoc_chdr_test.cpp
index 417ed2f96..1c63d5976 100644
--- a/host/tests/rfnoc_chdr_test.cpp
+++ b/host/tests/rfnoc_chdr_test.cpp
@@ -222,3 +222,49 @@ BOOST_AUTO_TEST_CASE(chdr_strc_packet_no_swap_64)
std::cout << pyld.to_string();
}
}
+
+BOOST_AUTO_TEST_CASE(chdr_generic_packet_calculate_pyld_offset_64)
+{
+ // Check calculation without timestamp
+ auto test_pyld_offset = [](chdr_packet::uptr& pkt,
+ const packet_type_t pkt_type,
+ const size_t num_mdata)
+ {
+ uint64_t buff[MAX_BUF_SIZE_WORDS];
+ chdr_header header;
+ header.set_pkt_type(pkt_type);
+ header.set_num_mdata(num_mdata);
+
+ pkt->refresh(reinterpret_cast<void*>(buff), header, 0);
+
+ const size_t pyld_offset = pkt->calculate_payload_offset(
+ pkt_type, num_mdata);
+
+ void* pyld_ptr = pkt->get_payload_ptr();
+
+ const size_t non_pyld_bytes = static_cast<size_t>(
+ reinterpret_cast<uint8_t*>(pyld_ptr) -
+ reinterpret_cast<uint8_t*>(buff));
+
+ BOOST_CHECK(pyld_offset == non_pyld_bytes);
+ };
+
+ {
+ chdr_packet::uptr pkt = chdr64_be_factory.make_generic();
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2);
+ }
+ {
+ chdr_packet::uptr pkt = chdr256_be_factory.make_generic();
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2);
+ }
+}
diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp
new file mode 100644
index 000000000..cd4daf569
--- /dev/null
+++ b/host/tests/rx_streamer_test.cpp
@@ -0,0 +1,744 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "../common/mock_link.hpp"
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Contents of mock packet header
+ */
+struct mock_header_t
+{
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ bool ignore_seq = true;
+ size_t seq_num = 0;
+};
+
+/*!
+ * Mock rx data xport which doesn't use I/O service, and just interacts with
+ * the link directly.
+ */
+class mock_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<mock_rx_data_xport>;
+ using buff_t = uhd::transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {}
+
+ std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms);
+ mock_header_t header = *(reinterpret_cast<mock_header_t*>(buff->data()));
+
+ packet_info_t info;
+ info.eob = header.eob;
+ info.has_tsf = header.has_tsf;
+ info.tsf = header.tsf;
+ info.payload_bytes = header.payload_bytes;
+ info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t);
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_link->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad header or invalid packet length.");
+ }
+
+ const bool seq_match = header.seq_num == _seq_num;
+ const bool seq_error = !header.ignore_seq && !seq_match;
+ _seq_num = header.seq_num + 1;
+
+ return std::make_tuple(std::move(buff), info, seq_error);
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ _recv_link->release_recv_buff(std::move(buff));
+ }
+
+ size_t get_max_payload_size() const
+ {
+ return _recv_link->get_recv_frame_size() - sizeof(packet_info_t);
+ }
+
+private:
+ mock_recv_link::sptr _recv_link;
+ size_t _seq_num = 0;
+};
+
+/*!
+ * Mock rx streamer for testing
+ */
+class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport>
+{
+public:
+ mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args)
+ : rx_streamer_impl(num_chans, stream_args)
+ {
+ }
+
+ void issue_stream_cmd(const stream_cmd_t&) {}
+
+ void set_tick_rate(double rate)
+ {
+ rx_streamer_impl::set_tick_rate(rate);
+ }
+
+ void set_samp_rate(double rate)
+ {
+ rx_streamer_impl::set_samp_rate(rate);
+ }
+
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ rx_streamer_impl::set_scale_factor(chan, scale_factor);
+ }
+};
+
+}} // namespace uhd::transport
+
+using namespace uhd::transport;
+
+using rx_streamer = rx_streamer_impl<mock_rx_data_xport>;
+
+static const double TICK_RATE = 100e6;
+static const double SAMP_RATE = 10e6;
+static const size_t FRAME_SIZE = 1000;
+static const double SCALE_FACTOR = 2;
+
+/*!
+ * Helper functions
+ */
+static std::vector<mock_recv_link::sptr> make_links(const size_t num)
+{
+ const mock_recv_link::link_params params = {FRAME_SIZE, 1};
+
+ std::vector<mock_recv_link::sptr> links;
+
+ for (size_t i = 0; i < num; i++) {
+ links.push_back(std::make_shared<mock_recv_link>(params));
+ }
+
+ return links;
+}
+
+static boost::shared_ptr<mock_rx_streamer> make_rx_streamer(
+ std::vector<mock_recv_link::sptr> recv_links,
+ const std::string& host_format,
+ const std::string& otw_format = "sc16")
+{
+ const uhd::stream_args_t stream_args(host_format, otw_format);
+ auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args);
+ streamer->set_tick_rate(TICK_RATE);
+ streamer->set_samp_rate(SAMP_RATE);
+
+ for (size_t i = 0; i < recv_links.size(); i++) {
+ mock_rx_data_xport::uptr xport(
+ std::make_unique<mock_rx_data_xport>(recv_links[i]));
+
+ streamer->set_scale_factor(i, SCALE_FACTOR);
+ streamer->connect_channel(i, std::move(xport));
+ }
+
+ return streamer;
+}
+
+static void push_back_recv_packet(mock_recv_link::sptr recv_link,
+ mock_header_t header,
+ size_t num_samps,
+ uint16_t start_data = 0)
+{
+ // Allocate buffer
+ const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>);
+ const size_t buff_len = sizeof(header) + pyld_bytes;
+ boost::shared_array<uint8_t> data(new uint8_t[buff_len]);
+
+ // Write header to buffer
+ header.payload_bytes = pyld_bytes;
+ *(reinterpret_cast<mock_header_t*>(data.get())) = header;
+
+ // Write data to buffer
+ auto data_ptr =
+ reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header));
+
+ for (size_t i = 0; i < num_samps; i++) {
+ uint16_t val = (start_data + i) * 2;
+ data_ptr[i] = std::complex<uint16_t>(val, val + 1);
+ }
+
+ // Push back buffer for link to recv
+ recv_link->push_back_recv_packet(data, buff_len);
+}
+
+/*!
+ * Tests
+ */
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ const bool even_iteration = (i % 2 == 0);
+ const bool odd_iteration = (i % 2 != 0);
+ mock_header_t header;
+ header.eob = even_iteration;
+ header.has_tsf = odd_iteration;
+ header.tsf = i;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ for (size_t j = 0; j < num_samps; j++) {
+ const auto value =
+ std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[j]);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet)
+{
+ const size_t NUM_BUFFS_TO_TEST = 5;
+ const std::string format("fc64");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 4;
+ std::vector<std::complex<double>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = i;
+
+ size_t samps_written = 0;
+ while (samps_written < num_samps) {
+ size_t samps_to_write = std::min(num_samps - samps_written, spp);
+ push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written);
+ samps_written += samps_to_write;
+ }
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, false);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ for (size_t j = 0; j < num_samps; j++) {
+ const auto value =
+ std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[j]);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob)
+{
+ // EOB should terminate a multi-packet recv, test that it does
+ const std::string format("sc16");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_packets = 4;
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * num_packets;
+ std::vector<std::complex<double>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ // Queue 4 packets, with eob set in every other packet
+ for (size_t i = 0; i < num_packets; i++) {
+ mock_header_t header;
+ header.has_tsf = false;
+ header.eob = (i % 2) != 0;
+ push_back_recv_packet(recv_links[0], header, spp);
+ }
+
+ // Now call recv and check that eob terminates a recv call
+ for (size_t i = 0; i < num_packets / 2; i++) {
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, spp * 2);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, false);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("sc16");
+
+ const size_t num_chans = 2;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+
+ std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ const bool even_iteration = (i % 2 == 0);
+ const bool odd_iteration = (i % 2 != 0);
+ mock_header_t header;
+ header.eob = even_iteration;
+ header.has_tsf = odd_iteration;
+ header.tsf = i;
+
+ size_t samps_pushed = 0;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed);
+ samps_pushed += num_samps;
+ }
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ size_t samps_checked = 0;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ for (size_t samp = 0; samp < num_samps; samp++) {
+ const size_t n = samps_checked + samp;
+ const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1));
+ BOOST_CHECK_EQUAL(value, buffer[ch][samp]);
+ }
+ samps_checked += num_samps;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ // Push back five packets, then read them 1/4 of a packet at a time
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t reads_per_packet = 4;
+ const size_t num_samps = spp / reads_per_packet;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = true;
+ header.has_tsf = true;
+ header.tsf = 0;
+ push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet);
+ }
+
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t total_samps_read = 0;
+ for (size_t j = 0; j < reads_per_packet; j++) {
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1);
+ BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read);
+
+ const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE);
+ const size_t expected_ticks = ticks_per_sample * total_samps_read;
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks);
+
+ for (size_t samp = 0; samp < num_samps; samp++) {
+ const size_t pkt_idx = samp + total_samps_read;
+ const auto value = std::complex<float>(
+ (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[samp]);
+ }
+
+ total_samps_read += num_samps_ret;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_seq_error)
+{
+ // Test that when we get a sequence error the error is returned in the
+ // metadata with a time spec that corresponds to the time spec of the
+ // last sample in the previous packet plus one sample clock. Test that
+ // the packet that causes the sequence error is not discarded.
+ const size_t NUM_PKTS_TO_TEST = 2;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+ size_t seq_num = 0;
+ size_t tsf = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.ignore_seq = false;
+
+ // Push back three packets but skip a seq_num after the second
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ tsf += num_samps;
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ seq_num++; // dropped packet
+ tsf += num_samps;
+
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // First two reads should succeed
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ size_t prev_tsf = metadata.time_spec.to_ticks(TICK_RATE);
+ size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE);
+
+ // Third read should be a sequence error
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+ size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE);
+ BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf);
+
+ // Next read should succeed
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, false);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_bad_packet)
+{
+ // Test that when we receive a packet with invalid chdr header or length
+ // the streamer returns the correct error in meatadata.
+ auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) {
+ mock_header_t header;
+ header.payload_bytes = 1000;
+
+ // Allocate a buffer that is too small for the payload
+ const size_t buff_len = 100;
+ boost::shared_array<uint8_t> data(new uint8_t[buff_len]);
+
+ // Write header to buffer
+ *(reinterpret_cast<mock_header_t*>(data.get())) = header;
+
+ // Push back buffer for link to recv
+ recv_link->push_back_recv_packet(data, buff_len);
+ };
+
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ mock_header_t header;
+
+ // Push back a regular packet
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // Push back a bad packet
+ push_back_bad_packet(recv_links[0]);
+
+ // Push back another regular packet
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // First read should succeed
+ size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+
+ // Second read should be an error
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET);
+
+ // Third read should succeed
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf)
+{
+ // Test that we can receive packets without tsf. Start by pushing
+ // a packet with a tsf followed by a few packets without.
+ const size_t NUM_PKTS_TO_TEST = 6;
+ const std::string format("fc64");
+
+ const size_t num_chans = 10;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 21;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = (i == NUM_PKTS_TO_TEST - 1);
+ header.has_tsf = (i == 0);
+ header.tsf = 500;
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error)
+{
+ // Test that the streamer handles dropped packets correctly by injecting
+ // a sequence error in one channel. The streamer should discard
+ // corresponding packets from all other channels.
+ const std::string format("fc64");
+
+ const size_t num_chans = 100;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 99;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = 0;
+ header.ignore_seq = false;
+ header.seq_num = 0;
+
+ // Drop a packet from an arbitrary channel right at the start
+ if (ch != num_chans / 2) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ // Add a regular packet to check the streamer drops the first
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+
+ // Drop a packet from the first channel
+ header.seq_num++;
+ header.tsf++;
+ if (ch != 0) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ // Add a regular packet
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+
+ // Drop a few packets from the last channel
+ for (size_t j = 0; j < 10; j++) {
+ header.seq_num++;
+ header.tsf++;
+ if (ch != num_chans - 1) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+ }
+
+ // Add a regular packet
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ // First recv should result in error
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 1 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1);
+
+ // Next recv should result in error
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 3 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3);
+
+ // Next recv should result in error
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 14 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14);
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_alignment_error)
+{
+ // Test that the alignment procedure returns an alignment error if it can't
+ // time align packets.
+ const std::string format("fc64");
+
+ const size_t num_chans = 4;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 2;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ mock_header_t header;
+ header.eob = true;
+ header.has_tsf = true;
+ header.tsf = 500;
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+
+ for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) {
+ header.tsf = header.tsf + num_samps;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ if (ch == num_chans - 1) {
+ // Misalign this time stamp
+ header.tsf += 1;
+ }
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+ }
+
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT);
+}
diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp
new file mode 100644
index 000000000..cb07cffad
--- /dev/null
+++ b/host/tests/tx_streamer_test.cpp
@@ -0,0 +1,393 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "../common/mock_link.hpp"
+#include <uhdlib/transport/tx_streamer_impl.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Mock tx data xport which doesn't use I/O service, and just interacts with
+ * the link directly. Transport copies packet info directly into the frame
+ * buffer.
+ */
+class mock_tx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<mock_tx_data_xport>;
+ using buff_t = uhd::transport::frame_buff;
+
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ };
+
+ mock_tx_data_xport(mock_send_link::sptr send_link) : _send_link(send_link) {}
+
+ buff_t::uptr get_send_buff(const int32_t timeout_ms)
+ {
+ return _send_link->get_send_buff(timeout_ms);
+ }
+
+ std::pair<void*, size_t> write_packet_header(
+ buff_t::uptr& buff, const packet_info_t& info)
+ {
+ uint8_t* data = static_cast<uint8_t*>(buff->data());
+ *(reinterpret_cast<packet_info_t*>(data)) = info;
+ return std::make_pair(data + sizeof(info), sizeof(info) + info.payload_bytes);
+ }
+
+ void release_send_buff(buff_t::uptr buff)
+ {
+ _send_link->release_send_buff(std::move(buff));
+ }
+
+ size_t get_max_payload_size() const
+ {
+ return _send_link->get_send_frame_size() - sizeof(packet_info_t);
+ ;
+ }
+
+private:
+ mock_send_link::sptr _send_link;
+};
+
+/*!
+ * Mock tx streamer for testing
+ */
+class mock_tx_streamer : public tx_streamer_impl<mock_tx_data_xport>
+{
+public:
+ mock_tx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args)
+ : tx_streamer_impl(num_chans, stream_args)
+ {
+ }
+
+ void set_tick_rate(double rate)
+ {
+ tx_streamer_impl::set_tick_rate(rate);
+ }
+
+ void set_samp_rate(double rate)
+ {
+ tx_streamer_impl::set_samp_rate(rate);
+ }
+
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ tx_streamer_impl::set_scale_factor(chan, scale_factor);
+ }
+};
+
+}} // namespace uhd::transport
+
+using namespace uhd::transport;
+
+using tx_streamer = tx_streamer_impl<mock_tx_data_xport>;
+
+static const double TICK_RATE = 100e6;
+static const double SAMP_RATE = 10e6;
+static const size_t FRAME_SIZE = 1000;
+static const double SCALE_FACTOR = 2;
+
+/*!
+ * Helper functions
+ */
+static std::vector<mock_send_link::sptr> make_links(const size_t num)
+{
+ const mock_send_link::link_params params = {FRAME_SIZE, 1};
+
+ std::vector<mock_send_link::sptr> links;
+
+ for (size_t i = 0; i < num; i++) {
+ links.push_back(std::make_shared<mock_send_link>(params));
+ }
+
+ return links;
+}
+
+static boost::shared_ptr<mock_tx_streamer> make_tx_streamer(
+ std::vector<mock_send_link::sptr> send_links, const std::string& format)
+{
+ const uhd::stream_args_t stream_args(format, "sc16");
+ auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args);
+ streamer->set_tick_rate(TICK_RATE);
+ streamer->set_samp_rate(SAMP_RATE);
+
+ for (size_t i = 0; i < send_links.size(); i++) {
+ mock_tx_data_xport::uptr xport(
+ std::make_unique<mock_tx_data_xport>(send_links[i]));
+
+ streamer->set_scale_factor(i, SCALE_FACTOR);
+ streamer->connect_channel(i, std::move(xport));
+ }
+
+ return streamer;
+}
+
+std::tuple<mock_tx_data_xport::packet_info_t, std::complex<uint16_t>*, size_t, boost::shared_array<uint8_t>>
+pop_send_packet(mock_send_link::sptr send_link)
+{
+ auto packet = send_link->pop_send_packet();
+
+ const size_t packet_samps =
+ (packet.second - sizeof(mock_tx_data_xport::packet_info_t))
+ / sizeof(std::complex<uint16_t>);
+
+ uint8_t* buff_ptr = packet.first.get();
+ auto info = *(reinterpret_cast<mock_tx_data_xport::packet_info_t*>(buff_ptr));
+
+ std::complex<uint16_t>* data = reinterpret_cast<std::complex<uint16_t>*>(
+ buff_ptr + sizeof(mock_tx_data_xport::packet_info_t));
+
+ return std::make_tuple(info, data, packet_samps, packet.first);
+}
+
+/*!
+ * Tests
+ */
+BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 30;
+ const std::string format("fc32");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ std::vector<std::complex<float>> buff(20);
+ for (size_t i = 0; i < buff.size(); i++) {
+ buff[i] = std::complex<float>(i * 2, i * 2 + 1);
+ }
+
+ // Send packets and check data
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ // Vary num_samps for each packet
+ const size_t num_samps = 10 + i % 10;
+ metadata.end_of_burst = (i == NUM_PKTS_TO_TEST - 1);
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+ BOOST_CHECK_EQUAL(num_samps, packet_samps);
+
+ // Check data
+ for (size_t j = 0; j < num_samps; j++) {
+ const std::complex<uint16_t> value(
+ (j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, data[j]);
+ }
+
+ BOOST_CHECK_EQUAL(num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>));
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE);
+ BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1);
+ num_accum_samps += num_samps;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet)
+{
+ const size_t NUM_BUFFS_TO_TEST = 5;
+ const std::string format("fc64");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 4;
+ std::vector<std::complex<double>> buff(num_samps);
+ for (size_t i = 0; i < buff.size(); i++) {
+ buff[i] = std::complex<double>(i * 2, i * 2 + 1);
+ }
+
+ // Send packets and check data
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ metadata.end_of_burst = true;
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ size_t samps_checked = 0;
+
+ while (samps_checked < num_samps) {
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+
+ for (size_t j = 0; j < packet_samps; j++) {
+ const size_t n = j + samps_checked;
+ const std::complex<uint16_t> value(
+ (n * 2) * SCALE_FACTOR, (n * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, data[j]);
+ }
+
+ BOOST_CHECK_EQUAL(
+ packet_samps, info.payload_bytes / sizeof(std::complex<uint16_t>));
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(
+ info.tsf, (num_accum_samps + samps_checked) * TICK_RATE / SAMP_RATE);
+ samps_checked += packet_samps;
+
+ BOOST_CHECK_EQUAL(info.eob, samps_checked == num_samps);
+ }
+
+ BOOST_CHECK_EQUAL(samps_checked, num_samps);
+ num_accum_samps += samps_checked;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_two_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 30;
+ const std::string format("sc16");
+
+ auto send_links = make_links(2);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ std::vector<std::complex<uint16_t>> buff(20);
+ for (size_t i = 0; i < buff.size(); i++) {
+ buff[i] = std::complex<uint16_t>(i * 2, i * 2 + 1);
+ }
+ std::vector<void*> buffs;
+ for (size_t ch = 0; ch < 2; ch++) {
+ buffs.push_back(buff.data()); // same buffer for each channel
+ }
+
+ // Send packets and check data
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ // Vary num_samps for each packet
+ const size_t num_samps = 10 + i % 10;
+ metadata.end_of_burst = (i == NUM_PKTS_TO_TEST - 1);
+ const size_t num_sent = streamer->send(buffs, num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ for (size_t ch = 0; ch < 2; ch++) {
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[ch]);
+ BOOST_CHECK_EQUAL(num_samps, packet_samps);
+
+ // Check data
+ for (size_t j = 0; j < num_samps; j++) {
+ const std::complex<uint16_t> value((j * 2), (j * 2 + 1));
+ BOOST_CHECK_EQUAL(value, data[j]);
+ }
+
+ BOOST_CHECK_EQUAL(
+ num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>));
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE);
+ BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1);
+ }
+ num_accum_samps += num_samps;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_meta_data_cache)
+{
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, "fc32");
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.start_of_burst = true;
+ metadata.end_of_burst = true;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ std::vector<std::complex<float>> buff(20);
+
+ size_t num_sent = streamer->send(buff.data(), 0, metadata, 1.0);
+ BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), 0);
+ BOOST_CHECK_EQUAL(num_sent, 0);
+ uhd::tx_metadata_t metadata2;
+ num_sent = streamer->send(buff.data(), 10, metadata2, 1.0);
+
+ mock_tx_data_xport::packet_info_t info;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, std::ignore, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+ BOOST_CHECK_EQUAL(packet_samps, num_sent);
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK(info.eob);
+}
+
+BOOST_AUTO_TEST_CASE(test_spp)
+{
+ // Test the spp calculation when it is limited by the stream args
+ {
+ auto send_links = make_links(1);
+ uhd::stream_args_t stream_args("fc64", "sc16");
+ stream_args.args["spp"] = std::to_string(10);
+ auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args);
+ mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0]));
+ streamer->connect_channel(0, std::move(xport));
+ BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), 10);
+ }
+
+ // Test the spp calculation when it is limited by the frame size
+ {
+ auto send_links = make_links(1);
+ uhd::stream_args_t stream_args("fc64", "sc16");
+ stream_args.args["spp"] = std::to_string(10000);
+ auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args);
+ mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0]));
+ const size_t max_pyld = xport->get_max_payload_size();
+ streamer->connect_channel(0, std::move(xport));
+ BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), max_pyld / sizeof(std::complex<uint16_t>));
+ }
+}