diff options
31 files changed, 4540 insertions, 334 deletions
diff --git a/host/include/uhd/rfnoc_graph.hpp b/host/include/uhd/rfnoc_graph.hpp index c13939ac9..08d5fc095 100644 --- a/host/include/uhd/rfnoc_graph.hpp +++ b/host/include/uhd/rfnoc_graph.hpp @@ -159,16 +159,17 @@ public: /*! Connect a RFNOC block with block ID \p src_block to another with block ID \p * dst_block. * + * Note you need to also call this on statically connected blocks if you + * desire to use them. + * * \param src_blk The block ID of the source block to connect. * \param src_port The port of the source block to connect. * \param dst_blk The block ID of the destination block to connect to. * \param dst_port The port of the destination block to connect to. * \param skip_property_propagation Skip property propagation for this edge * - * \throws connect_disallowed_on_src - * if the source port is statically connected to a *different* block - * \throws connect_disallowed_on_dst - * if the destination port is statically connected to a *different* block + * \throws uhd::routing_error if the source or destination ports are + * statically connected to a *different* block */ virtual void connect(const block_id_t& src_blk, size_t src_port, @@ -186,7 +187,7 @@ public: * \throws connect_disallowed_on_dst * if the destination port is statically connected to a *different* block */ - virtual void connect(uhd::tx_streamer& streamer, + virtual void connect(uhd::tx_streamer::sptr streamer, size_t strm_port, const block_id_t& dst_blk, size_t dst_port) = 0; @@ -203,7 +204,7 @@ public: */ virtual void connect(const block_id_t& src_blk, size_t src_port, - uhd::rx_streamer& streamer, + uhd::rx_streamer::sptr streamer, size_t strm_port) = 0; /*! Enumerate all the possible static connections in the graph @@ -244,10 +245,12 @@ public: * start using it. If a different streamer is already connected * to the intended source then that call may fail. * + * \param num_ports Number of ports that will be connected to the streamer * \param args Arguments to aid the construction of the streamer * \return a shared pointer to a new streamer */ - //virtual rx_streamer::sptr create_rx_streamer(const stream_args_t& args) = 0; + virtual rx_streamer::sptr create_rx_streamer(const size_t num_ports, + const stream_args_t& args) = 0; /*! Create a new transmit streamer from the streamer arguments * The created streamer is still not connected to anything yet. @@ -255,10 +258,12 @@ public: * start using it. If a different streamer is already connected * to the intended sink then that call may fail. * + * \param num_ports Number of ports that will be connected to the streamer * \param args Arguments to aid the construction of the streamer * \return a shared pointer to a new streamer */ - //virtual tx_streamer::sptr create_tx_streamer(const stream_args_t& args) = 0; + virtual tx_streamer::sptr create_tx_streamer(const size_t num_ports, + const stream_args_t& args) = 0; /************************************************************************** * Hardware Control diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp index 770c6cf6f..cc729de6c 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp @@ -114,6 +114,15 @@ public: */ virtual void* get_payload_ptr() = 0; + /*! Return the payload offset in bytes for a given type and num_mdata + * + * \param pkt_type The packet type for calculation + * \param num_mdata The number of metadata words for calculation + * \return The offset of the payload in a packet with the given params + */ + virtual size_t calculate_payload_offset(const packet_type_t pkt_type, + const uint8_t num_mdata = 0) const = 0; + //! Shortcut to return the const metadata pointer cast as a specific type template <typename data_t> inline const data_t* get_mdata_const_ptr_as() const diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp new file mode 100644 index 000000000..69dceebe1 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -0,0 +1,481 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP + +#include <uhd/config.hpp> +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send rx flow control responses + */ +class rx_flow_ctrl_sender +{ +public: + //! Constructor + rx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.first) + { + _fc_packet = pkt_factory.make_strs(); + _fc_strs_pyld.src_epid = sep_ids.second; + } + + /*! Configure buffer capacity + * \param recv_capacity The buffer capacity of the receive link + */ + void set_capacity(const stream_buff_params_t& recv_capacity) + { + _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; + _fc_strs_pyld.capacity_pkts = recv_capacity.packets; + } + + /*! Send a flow control response packet + * + * \param send_link the link to use to send the packet + * \counts transfer counts for packet contents + */ + void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts) + { + auto buff = send_link->get_send_buff(0); + if (!buff) { + throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); + } + + chdr::chdr_header header; + header.set_seq_num(_fc_seq_num++); + header.set_dst_epid(_dst_epid); + + chdr::strs_payload fc_payload(_fc_strs_pyld); + fc_payload.xfer_count_bytes = counts.bytes; + fc_payload.xfer_count_pkts = counts.packets; + + _fc_packet->refresh(buff->data(), header, fc_payload); + const size_t size = header.get_length(); + + buff->set_packet_size(size); + send_link->release_send_buff(std::move(buff)); + } + +private: + // Endpoint ID for recipient of flow control response + const sep_id_t _dst_epid; + + // Packet for writing flow control info + chdr::chdr_strs_packet::uptr _fc_packet; + + // Pre-configured strs payload to hold values that don't change + chdr::strs_payload _fc_strs_pyld; + + // Sequence number for flow control packets + uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for RX chdr data + * + * This transport provides the streamer an interface to read RX data packets. + * The transport implements flow control and sequence number checking. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For an rx transport, the device sends data packets, and the host sends strs + * packets letting the device know that buffer space in the host has been freed. + * For lossy links, the device also sends strc packets to resynchronize the + * transfer counts between host and device, to correct for any dropped packets + * in the link. + */ +class chdr_rx_data_xport +{ +public: + using uptr = std::unique_ptr<chdr_rx_data_xport>; + using buff_t = transport::frame_buff; + + //! Values extracted from received RX data packets + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + const void* payload = nullptr; + }; + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param addrs Source and destination addresses + * \param epids Source and destination endpoint IDs + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param num_recv_frames Num frames to reserve from the recv link + * \param recv_capacity Total capacity of the recv link + * \param fc_freq Frequency of flow control status messages + * \param fc_headroom Headroom for flow control status messages + * \param lossy_xport Whether the xport is lossy, for flow control configuration + */ + chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_addr_pair_t& addrs, + const uhd::rfnoc::sep_id_pair_t& epids, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt, + const size_t num_recv_frames, + const stream_buff_params_t& recv_capacity, + const stream_buff_params_t& fc_freq, + const stream_buff_params_t& fc_headroom, + const bool lossy_xport) + : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second) + { + const sep_addr_t remote_sep_addr = addrs.first; + const sep_addr_t local_sep_addr = addrs.second; + const sep_id_t remote_epid = epids.first; + const sep_id_t local_epid = epids.second; + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Creating rx xport with local epid=" << local_epid + << ", remote epid=" << remote_epid); + + _recv_packet = pkt_factory.make_generic(); + _fc_sender.set_capacity(recv_capacity); + + // Calculate max payload size + const size_t pyld_offset = + _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + + // Make data transport + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + auto fc_cb = [this](buff_t::uptr buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + this->_fc_callback(std::move(buff), recv_link, send_link); + }; + + // Needs just a single send frame for responses + _recv_io = io_srv->make_recv_client(recv_link, + num_recv_frames, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + // Create a control transport with the rx data links to send mgmt packets + // needed to setup the stream + // Piggyback on frames from the recv_io_if + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + local_epid, + 0, // num_send_frames + 0); // num_recv_frames + + // Create new temporary management portal with the transports used for this stream + // TODO: This is a bit excessive. Maybe we can pare down the functionality of the + // portal just for route setup purposes. Whatever we do, we *must* use xport in it + // though otherwise the transport will not behave correctly. + auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( + *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + + // Setup a route to the EPID + // Note that this may be gratuitous--The endpoint may already have been set up + data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); + data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + + // Initialize flow control - management portal sends a stream command + // containing its requested flow control frequency, the rx transport + // responds with a stream status containing its buffer capacity. + data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport, + remote_epid, + lossy_xport, + pyld_buff_fmt, + mdata_buff_fmt, + fc_freq, + fc_headroom); + + data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Stream endpoint was configured with:" + << std::endl + << "capacity bytes=" << recv_capacity.bytes + << ", packets=" << recv_capacity.packets << std::endl + << "fc headroom bytes=" << fc_headroom.bytes + << ", packets=" << fc_headroom.packets << std::endl + << "fc frequency bytes=" << fc_freq.bytes + << ", packets=" << fc_freq.packets); + + // We no longer need the control xport and mgmt_portal, release them so + // the control xport is no longer connected to the I/O service. + data_mgmt_portal.reset(); + ctrl_xport.reset(); + } + + /*! Returns maximum number payload bytes + * + * \return maximum payload bytes per packet + */ + size_t get_max_payload_size() const + { + return _max_payload_size; + } + + /*! + * Gets an RX frame buffer containing a recv packet + * + * \param timeout_ms timeout in milliseconds + * \return returns a tuple containing: + * - a frame_buff, or null if timeout occurs + * - info struct corresponding to the packet + * - whether the packet was out of sequence + */ + std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff( + const int32_t timeout_ms) + { + buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms); + + if (!buff) { + return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false); + } + + auto info = _read_data_packet_info(buff); + bool seq_error = _is_out_of_sequence(std::get<1>(info)); + + return std::make_tuple(std::move(buff), std::get<0>(info), seq_error); + } + + /*! + * Releases an RX frame buffer + * + * \param buff the frame buffer to release + */ + void release_recv_buff(typename buff_t::uptr buff) + { + _recv_io->release_recv_buff(std::move(buff)); + } + +private: + /*! + * Recv callback for I/O service + * + * The I/O service invokes this callback when it reads a packet from the + * recv link. + * + * \param buff the frame buffer containing the packet data + * \param recv_link the recv link from which buff was read + * \param send_link the send link for flow control messages + */ + bool _recv_callback(buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) + { + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + const auto packet_size = buff->packet_size(); + + if (dst_epid != _epid) { + return false; + } + + if (type == chdr::PKT_TYPE_STRC) { + chdr::strc_payload strc; + strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), + _recv_packet->get_payload_size() / sizeof(uint64_t), + _recv_packet->conv_to_host<uint64_t>()); + + const stream_buff_params_t strc_counts = { + strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}; + + if (strc.op_code == chdr::STRC_RESYNC) { + // Resynchronize before updating fc_state, the strc payload + // contains counts before the strc packet itself + _fc_state.resynchronize(strc_counts); + + // Update state that we received a packet + _fc_state.data_received(packet_size); + + recv_link->release_recv_buff(std::move(buff)); + buff = buff_t::uptr(); + _fc_state.xfer_done(packet_size); + _send_fc_response(send_link); + } else if (strc.op_code == chdr::STRC_INIT) { + _fc_state.initialize( + {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Received strc init with fc freq" + << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + + // Make sure flow control was initialized + assert(_fc_state.get_fc_freq().bytes > 0); + assert(_fc_state.get_fc_freq().packets > 0); + + // Send a strs response to configure flow control on the sender + _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); + + // Reset counts, since mgmt_portal will do it to FPGA + _fc_state.reset_counts(); + + recv_link->release_recv_buff(std::move(buff)); + buff = buff_t::uptr(); + } else { + throw uhd::value_error("Unexpected opcode value in STRC packet."); + } + + // For stream commands, we return true (packet was destined to this + // client) but release the buffer. The I/O service won't queue this + // packet in the recv_io_if. + return true; + + } else if (type == chdr::PKT_TYPE_DATA_NO_TS + || type == chdr::PKT_TYPE_DATA_WITH_TS) { + // Update state that we received a packet + _fc_state.data_received(packet_size); + + // If this is a data packet, just claim it by returning true. The + // I/O service will queue this packet in the recv_io_if. + return true; + + } else { + return false; + } + } + + /*! + * Flow control callback for I/O service + * + * The I/O service invokes this callback when a packet needs to be released + * to the recv link. + * + * \param buff the frame buffer containing the packet data + * \param recv_link the recv link to which to release the buffer + * \param send_link the send link for flow control messages + */ + void _fc_callback(buff_t::uptr buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) + { + const size_t packet_size = buff->packet_size(); + recv_link->release_recv_buff(std::move(buff)); + _fc_state.xfer_done(packet_size); + _send_fc_response(send_link); + } + + /*! + * Sends a flow control response packet if necessary. + * + * \param send_link the send link for flow control messages + */ + void _send_fc_response(transport::send_link_if* send_link) + { + if (_fc_state.fc_resp_due()) { + _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); + _fc_state.fc_resp_sent(); + } + } + + /*! + * Checks if the sequence number is out of sequence, prints 'D' if it is + * and returns result of check. + * + * \return true if a sequence error occurred + */ + UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num) + { + const uint16_t expected_packet_count = _data_seq_num; + _data_seq_num = seq_num + 1; + + if (expected_packet_count != seq_num) { + UHD_LOG_FASTPATH("D"); + return true; + } + return false; + } + + /*! + * Reads packet header and returns information in a struct. + * + * \return a tuple containing the packet info and packet sequence number + */ + std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff) + { + const void* data = buff->data(); + _recv_packet->refresh(data); + const auto header = _recv_packet->get_chdr_header(); + const auto optional_time = _recv_packet->get_timestamp(); + + packet_info_t info; + info.eob = header.get_eob(); + info.has_tsf = optional_time.is_initialized(); + info.tsf = optional_time ? *optional_time : 0; + info.payload_bytes = _recv_packet->get_payload_size(); + info.payload = _recv_packet->get_payload_const_ptr(); + + const uint8_t* pkt_end = + reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); + const size_t pyld_pkt_len = + pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + + if (pyld_pkt_len < info.payload_bytes) { + _recv_io->release_recv_buff(std::move(buff)); + throw uhd::value_error("Bad CHDR header or invalid packet length."); + } + + return std::make_tuple(info, header.get_seq_num()); + } + + // Interface to the I/O service + transport::recv_io_if::sptr _recv_io; + + // Flow control state + rx_flow_ctrl_state _fc_state; + + // Maximum data payload in bytes + size_t _max_payload_size = 0; + + // Sequence number for data packets + uint16_t _data_seq_num = 0; + + // Packet for received data + chdr::chdr_packet::uptr _recv_packet; + + // Handles sending of strs flow control response packets + detail::rx_flow_ctrl_sender _fc_sender; + + // Local / Sink EPID + sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp new file mode 100644 index 000000000..62b811bf5 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -0,0 +1,550 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP + +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send tx flow control messages + */ +class tx_flow_ctrl_sender +{ +public: + //! Constructor + tx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.second) + { + _fc_packet = pkt_factory.make_strc(); + _fc_strc_pyld.src_epid = sep_ids.first; + _fc_strc_pyld.op_code = chdr::STRC_RESYNC; + } + + /*! + * Sends a flow control resync packet + * + * Sends a strc packet with the resync opcode to make the device transfer + * counts match those of the host, to correct for dropped packets. + * + * \param send_link the link to use to send the packet + * \counts transfer counts for packet contents + */ + size_t send_strc_resync( + transport::send_link_if* send_link, const stream_buff_params_t& counts) + { + auto buff = send_link->get_send_buff(0); + if (!buff) { + throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer"); + } + + chdr::chdr_header header; + header.set_seq_num(_fc_seq_num++); + header.set_dst_epid(_dst_epid); + + chdr::strc_payload fc_payload(_fc_strc_pyld); + fc_payload.num_bytes = counts.bytes; + fc_payload.num_pkts = counts.packets; + + _fc_packet->refresh(buff->data(), header, fc_payload); + const size_t size = header.get_length(); + + buff->set_packet_size(size); + send_link->release_send_buff(std::move(buff)); + return size; + } + +private: + // Endpoint ID for recipient of flow control response + const sep_id_t _dst_epid; + + // Packet for writing flow control info + chdr::chdr_strc_packet::uptr _fc_packet; + + // Pre-configured strc payload to hold values that don't change + chdr::strc_payload _fc_strc_pyld; + + // Sequence number for flow control packets + uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for TX chdr data + * + * This transport provides the streamer an interface to send TX data packets. + * The transport implements flow control and keeps track of sequence numbers. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For a tx transport, the host sends data packets, and the device sends strs + * packets letting the host know that buffer space in the device stream endpoint + * has been freed. For lossy links, the host also sends strc packets to + * resynchronize the transfer counts between host and device, to correct for + * any dropped packets in the link. + */ +class chdr_tx_data_xport +{ +public: + using uptr = std::unique_ptr<chdr_tx_data_xport>; + using buff_t = transport::frame_buff; + + //! Information about data packet + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + }; + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param addrs Source and destination addresses + * \param epids Source and destination endpoint IDs + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param num_send_frames Num frames to reserve from the send link + * \param fc_freq_ratio Ratio to use to configure the device fc frequency + * \param fc_headroom_ratio Ratio to use to configure the device fc headroom + */ + chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_addr_pair_t& addrs, + const uhd::rfnoc::sep_id_pair_t& epids, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt, + const size_t num_send_frames, + const double fc_freq_ratio, + const double fc_headroom_ratio) + : _fc_sender(pkt_factory, epids), _epid(epids.first) + { + const sep_addr_t remote_sep_addr = addrs.second; + const sep_addr_t local_sep_addr = addrs.first; + const sep_id_t remote_epid = epids.second; + const sep_id_t local_epid = epids.first; + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Creating tx xport with local epid=" << local_epid + << ", remote epid=" << remote_epid); + + _send_header.set_dst_epid(epids.second); + _send_packet = pkt_factory.make_generic(); + _recv_packet = pkt_factory.make_generic(); + + // Calculate max payload size + const size_t pyld_offset = + _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + + _configure_sep(io_srv, + recv_link, + send_link, + pkt_factory, + local_sep_addr, + local_epid, + remote_sep_addr, + remote_epid, + pyld_buff_fmt, + mdata_buff_fmt); + + _initialize_flow_ctrl(io_srv, + recv_link, + send_link, + pkt_factory, + epids, + fc_freq_ratio, + fc_headroom_ratio); + + // Now create the send I/O we will use for data + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + this->_send_callback(buff, send_link); + }; + + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + // Needs just a single recv frame for strs packets + _send_io = io_srv->make_send_client(send_link, + num_send_frames, + send_cb, + recv_link, + /* num_recv_frames */ 1, + recv_cb); + } + + /*! Returns maximum number of payload bytes + * + * \return maximum number of payload bytes + */ + size_t get_max_payload_size() const + { + return _max_payload_size; + } + + /*! + * Gets a TX frame buffer + * + * \param timeout_ms timeout in milliseconds + * \return the frame buffer, or nullptr if timeout occurs + */ + buff_t::uptr get_send_buff(const int32_t timeout_ms) + { + return _send_io->get_send_buff(timeout_ms); + } + + /*! + * Sends a TX data packet + * + * \param buff the frame buffer containing the packet to send + */ + void release_send_buff(buff_t::uptr buff) + { + _send_io->release_send_buff(std::move(buff)); + } + + /*! + * Writes header into frame buffer and returns payload pointer + * + * \param buff Frame buffer to write header into + * \param info Information to include in the header + * \return A pointer to the payload data area and the packet size in bytes + */ + std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff, + const packet_info_t& info) + { + uint64_t tsf = 0; + + if (info.has_tsf) { + _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS); + tsf = info.tsf; + } else { + _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS); + } + + _send_header.set_eob(info.eob); + _send_header.set_seq_num(_data_seq_num++); + + _send_packet->refresh(buff->data(), _send_header, tsf); + _send_packet->update_payload_size(info.payload_bytes); + + return std::make_pair( + _send_packet->get_payload_ptr(), + _send_packet->get_chdr_header().get_length()); + } + +private: + /*! + * Recv callback for I/O service + * + * The I/O service invokes this callback when it reads a packet from the + * recv link. + * + * \param buff the frame buffer containing the packet data + * \param recv_link the recv link from which buff was read + * \param send_link the send link for flow control messages + */ + bool _recv_callback(buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* /*send_link*/) + { + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + if (dst_epid != _epid) { + return false; + } + + if (type == chdr::PKT_TYPE_STRS) { + chdr::strs_payload strs; + strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), + _recv_packet->get_payload_size() / sizeof(uint64_t), + _recv_packet->conv_to_host<uint64_t>()); + + _fc_state.update_dest_recv_count({strs.xfer_count_bytes, + static_cast<uint32_t>(strs.xfer_count_pkts)}); + + // TODO: check strs status here and push into async msg queue + + // Packet belongs to this transport, release buff and return true + recv_link->release_recv_buff(std::move(buff)); + buff = nullptr; + return true; + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + } + + /*! + * Send callback for I/O service + * + * The I/O service invokes this callback when it is requested to release + * a send buffer to the send link. + * + * \param buff the frame buffer to release + * \param send_link the send link for flow control messages + */ + void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link) + { + const size_t packet_size = buff->packet_size(); + + if (_fc_state.dest_has_space(packet_size)) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + + _fc_state.data_sent(packet_size); + + if (_fc_state.get_fc_resync_req_pending() + && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) { + const auto& xfer_counts = _fc_state.get_xfer_counts(); + const size_t strc_size = + _fc_sender.send_strc_resync(send_link, xfer_counts); + _fc_state.clear_fc_resync_req_pending(); + _fc_state.data_sent(strc_size); + } + } + } + + /*! + * Configures the stream endpoint using mgmt_portal + */ + void _configure_sep(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_addr_t& local_sep_addr, + const uhd::rfnoc::sep_id_t& local_epid, + const uhd::rfnoc::sep_addr_t& remote_sep_addr, + const uhd::rfnoc::sep_id_t& remote_epid, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt) + { + // Create a control transport with the tx data links to send mgmt packets + // needed to setup the stream. Only need one frame for this. + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + local_epid, + 1, // num_send_frames + 1); // num_recv_frames + + // Create new temporary management portal with the transports used for this stream + // TODO: This is a bit excessive. Maybe we can pare down the functionality of the + // portal just for route setup purposes. Whatever we do, we *must* use xport in it + // though otherwise the transport will not behave correctly. + auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( + *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + + // Setup a route to the EPID + data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); + data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + + data_mgmt_portal->config_local_tx_stream( + *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + + // We no longer need the control xport and mgmt_portal, release them so + // the control xport is no longer connected to the I/O service. + data_mgmt_portal.reset(); + ctrl_xport.reset(); + } + + /*! + * Initializes flow control + * + * To initialize flow control, we need to send an init strc packet, then + * receive a strs containing the stream endpoint ingress buffer size. We + * then repeat this (now that we know the buffer size) to configure the flow + * control frequency. To avoid having this logic within the data packet + * processing flow, we use temporary send and recv I/O instances with + * simple callbacks here. + */ + void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const sep_id_pair_t sep_ids, + const double fc_freq_ratio, + const double fc_headroom_ratio) + { + // No flow control at initialization, just release all send buffs + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + }; + + // For recv, just queue strs packets for recv_io to read + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* /*recv_link*/, + transport::send_link_if* /*send_link*/) { + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS); + }; + + // No flow control at initialization, just release all recv buffs + auto fc_cb = [this](buff_t::uptr buff, + transport::recv_link_if* recv_link, + transport::send_link_if* /*send_link*/) { + recv_link->release_recv_buff(std::move(buff)); + }; + + auto send_io = io_srv->make_send_client(send_link, + 1, // num_send_frames + send_cb, + nullptr, + 0, // num_recv_frames + nullptr); + + auto recv_io = io_srv->make_recv_client(recv_link, + 1, // num_recv_frames + recv_cb, + nullptr, + 0, // num_send_frames + fc_cb); + + chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); + chdr::chdr_packet::uptr& recv_packet = _recv_packet; + + // Function to send a strc init + auto send_strc_init = [&send_io, sep_ids, &strc_packet]( + const stream_buff_params_t fc_freq = {0, 0}) { + transport::frame_buff::uptr buff = send_io->get_send_buff(0); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out getting a send buffer for strc init"); + } + + chdr::chdr_header header; + header.set_seq_num(0); + header.set_dst_epid(sep_ids.second); + + chdr::strc_payload strc_pyld; + strc_pyld.src_epid = sep_ids.first; + strc_pyld.op_code = chdr::STRC_INIT; + strc_pyld.num_bytes = fc_freq.bytes; + strc_pyld.num_pkts = fc_freq.packets; + strc_packet->refresh(buff->data(), header, strc_pyld); + + const size_t size = header.get_length(); + buff->set_packet_size(size); + send_io->release_send_buff(std::move(buff)); + }; + + // Function to receive a strs, returns buffer capacity + auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { + transport::frame_buff::uptr buff = recv_io->get_recv_buff(200); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out wating for a strs packet during initialization"); + } + + recv_packet->refresh(buff->data()); + UHD_ASSERT_THROW( + recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); + chdr::strs_payload strs; + strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), + recv_packet->get_payload_size() / sizeof(uint64_t), + recv_packet->conv_to_host<uint64_t>()); + + recv_io->release_recv_buff(std::move(buff)); + + return {strs.capacity_bytes, + static_cast<uint32_t>(strs.capacity_pkts)}; + }; + + // Send a strc init to get the buffer size + send_strc_init(); + stream_buff_params_t capacity = recv_strs(); + _fc_state.set_dest_capacity(capacity); + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Received strs initializing buffer capacity to " + << capacity.bytes << " bytes"); + + // Calculate the requested fc_freq parameters + uhd::rfnoc::stream_buff_params_t fc_freq = { + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), + static_cast<uint32_t>( + std::ceil(double(capacity.packets) * fc_freq_ratio))}; + + const size_t headroom_bytes = + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); + const size_t headroom_packets = static_cast<uint32_t>( + std::ceil(double(capacity.packets) * fc_headroom_ratio)); + + fc_freq.bytes -= headroom_bytes; + fc_freq.packets -= headroom_packets; + + // Send a strc init to configure fc freq + send_strc_init(fc_freq); + recv_strs(); + + // Release temporary I/O service interfaces to disconnect from it + send_io.reset(); + recv_io.reset(); + } + + // Interface to the I/O service + transport::send_io_if::sptr _send_io; + + // Flow control state + tx_flow_ctrl_state _fc_state; + + // Maximum data payload in bytes + size_t _max_payload_size = 0; + + // Sequence number for data packets + uint16_t _data_seq_num = 0; + + // Header to write into send packets + chdr::chdr_header _send_header; + + // Packet for send data + chdr::chdr_packet::uptr _send_packet; + + // Packet to receive strs messages + chdr::chdr_packet::uptr _recv_packet; + + // Handles sending of strc flow control ack packets + detail::tx_flow_ctrl_sender _fc_sender; + + // Local / Source EPID + sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp index 62b24ab61..1f14ea7d0 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp @@ -482,6 +482,8 @@ public: // Members uint64_t num_pkts = 0; //! Number of bytes to use for operation (64 bits) uint64_t num_bytes = 0; + //! Size of a strc packet (including header) + static constexpr size_t PACKET_SIZE = 24; public: // Functions strc_payload() = default; diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp index 120b0e0f8..28fa8ec7c 100644 --- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp @@ -7,11 +7,14 @@ #ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP #define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP +#include <uhd/stream.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/client_zero.hpp> #include <uhdlib/rfnoc/ctrlport_endpoint.hpp> #include <uhdlib/rfnoc/epid_allocator.hpp> #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> #include <functional> #include <memory> #include <set> @@ -84,6 +87,7 @@ public: virtual detail::client_zero::sptr get_client_zero( sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0; + /*! Configure a flow controlled data stream from the endpoint with ID src_epid to the * endpoint with ID dst_epid * @@ -102,7 +106,33 @@ public: const double fc_headroom_ratio, const bool reset = false) = 0; - // TODO: Implement functions to get graph-wide streamers + /*! \brief Create a data stream going from the device to the host + * + * \param dst_addr The address of the destination stream endpoint + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param xport_args The transport arguments + * \return An transport instance + */ + virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( + sep_addr_t dst_addr, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const device_addr_t& xport_args) = 0; + + /*! \brief Create a data stream going from the host to the device + * + * \param dst_addr The address of the destination stream endpoint + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param xport_args The transport arguments + * \return An transport instance + */ + virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( + sep_addr_t dst_addr, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const device_addr_t& xport_args) = 0; /*! * \brief Create a graph_stream_manager and return a unique_ptr to it diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp index 79121a498..72f1cf1c7 100644 --- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp @@ -11,6 +11,7 @@ #include <uhdlib/rfnoc/ctrlport_endpoint.hpp> #include <uhdlib/rfnoc/epid_allocator.hpp> #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> #include <functional> #include <memory> #include <set> @@ -112,41 +113,29 @@ public: /*! \brief Create a data stream going from the host to the device * * \param dst_addr The address of the destination stream endpoint - * \param lossy_xport Is the transport lossy? * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params - * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params * \param xport_args The transport arguments * \return An transport instance */ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( const sep_addr_t dst_addr, - const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, - const double fc_freq_ratio, - const double fc_headroom_ratio, const device_addr_t& xport_args) = 0; /*! \brief Create a data stream going from the device to the host * * \param dst_addr The address of the destination stream endpoint - * \param lossy_xport Is the transport lossy? * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params - * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params * \param xport_args The transport arguments * \return An transport instance */ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( const sep_addr_t src_addr, - const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, - const double fc_freq_ratio, - const double fc_headroom_ratio, const device_addr_t& xport_args) = 0; static uptr make(const chdr::chdr_packet_factory& pkt_factory, diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp index 0a2790a34..33a0e3df0 100644 --- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp +++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp @@ -8,21 +8,14 @@ #define INCLUDED_LIBUHD_MB_IFACE_HPP #include <uhdlib/rfnoc/chdr_ctrl_xport.hpp> -#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> #include <uhdlib/rfnoc/clock_iface.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> #include <memory> namespace uhd { namespace rfnoc { -// FIXME: Update this -class chdr_rx_data_xport -{ -public: - using uptr = std::unique_ptr<chdr_rx_data_xport>; -}; - -using chdr_tx_data_xport = chdr_rx_data_xport; - /*! Motherboard (backchannel) interface * * In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to @@ -70,7 +63,8 @@ public: /*! Return a reference to a clock iface */ - virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0; + virtual std::shared_ptr<clock_iface> get_clock_iface( + const std::string& clock_name) = 0; /*! Create a control transport * @@ -89,30 +83,34 @@ public: * * This is typically called once per streaming channel. * - * \param local_device_id ID for the host transport adapter to use - * \param local_epid Host (sink) streaming endpoint ID - * \param remote_epid Remote device (source) streaming endpoint ID + * \param addrs Address of the device and host stream endpoints + * \param epids Endpoint IDs of the device and host stream endpoints + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata * \param xport_args Transport configuration args * \return A CHDR RX data transport */ - virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id, - const sep_id_t& local_epid, - const sep_id_t& remote_epid, + virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, const device_addr_t& xport_args) = 0; /*! Create an TX data transport * * This is typically called once per streaming channel. * - * \param local_device_id ID for the host transport adapter to use - * \param local_epid Host (source) streaming endpoint ID - * \param remote_epid Remote device (sink) streaming endpoint ID + * \param addrs Address of the host and device stream endpoints + * \param epids Endpoint IDs of the host and device stream endpoints + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata * \param xport_args Transport configuration args * \return A CHDR TX data transport */ - virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id, - const sep_id_t& local_epid, - const sep_id_t& remote_epid, + virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs, + const sep_id_pair_t& epids, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt, const device_addr_t& xport_args) = 0; }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp index 7ec1b7bb2..bc56fd311 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp @@ -41,6 +41,8 @@ using device_id_t = uint16_t; using sep_inst_t = uint16_t; //! Stream Endpoint Physical Address Type using sep_addr_t = std::pair<device_id_t, sep_inst_t>; +//! Stream Endpoint Physical Address Type (first = source, second = destination) +using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>; //! Stream Endpoint ID Type using sep_id_t = uint16_t; //! Stream Endpoint pair Type (first = source, second = destination) @@ -65,6 +67,19 @@ struct stream_buff_params_t //! The data type of the buffer used to capture/generate data enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; +//! Conversion from number of bits to sw_buff +constexpr sw_buff_t bits_to_sw_buff(size_t bits) +{ + if (bits <= 8) { + return BUFF_U8; + } else if (bits <= 16) { + return BUFF_U16; + } else if (bits <= 32) { + return BUFF_U32; + } else { + return BUFF_U64; + } +} //---------------------------------------------- // Constants @@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; constexpr uint16_t RFNOC_PROTO_VER = 0x0100; +constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_CAPACITY_PKTS = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_FREQ_BYTES = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1; +constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp new file mode 100644 index 000000000..6ced60d19 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -0,0 +1,95 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + * Extends the streamer_impl to be an rfnoc node so it can connect to the + * graph. Configures the streamer conversion and rate parameters with values + * received through property propagation. + */ +class rfnoc_rx_streamer : public node_t, + public transport::rx_streamer_impl<chdr_rx_data_xport> +{ +public: + /*! Constructor + * + * \param num_ports The number of ports + * \param stream_args Arguments to aid the construction of the streamer + */ + rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args); + + /*! Returns a unique identifier string for this node. In every RFNoC graph, + * no two nodes cannot have the same ID. Returns a string in the form of + * "RxStreamer#0". + * + * \returns The unique ID as a string + */ + std::string get_unique_id() const; + + /*! Returns the number of input ports for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_input_ports() const; + + /*! Returns the number of output ports for this block. + * + * Always returns 0 for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_output_ports() const; + + /*! Implementation of rx_streamer API method + * + * \param stream_cmd the stream command to issue + */ + void issue_stream_cmd(const stream_cmd_t& stream_cmd); + + /*! Returns stream args provided at creation + * + * \return stream args provided when streamer is created + */ + const uhd::stream_args_t& get_stream_args() const; + + /*! Check that all streamer ports are connected to blocks + * + * Overrides node_t to ensure there are no unconnected ports. + * + * \param connected_inputs A list of input ports that are connected + * \param connected_outputs A list of output ports that are connected + * \returns true if the block can deal with this configuration + */ + bool check_topology(const std::vector<size_t>& connected_inputs, + const std::vector<size_t>& connected_outputs); +private: + void _register_props(const size_t chan, const std::string& otw_format); + + // Properties + std::vector<property_t<double>> _scaling_in; + std::vector<property_t<double>> _samp_rate_in; + std::vector<property_t<double>> _tick_rate_in; + std::vector<property_t<std::string>> _type_in; + + // Streamer unique ID + const std::string _unique_id; + + // Stream args provided at construction + const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp new file mode 100644 index 000000000..4acee45cc --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -0,0 +1,90 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + * Extends the streamer_impl to be an rfnoc node so it can connect to the + * graph. Configures the streamer conversion and rate parameters with values + * received through property propagation. + */ +class rfnoc_tx_streamer : public node_t, + public transport::tx_streamer_impl<chdr_tx_data_xport> +{ +public: + /*! Constructor + * + * \param num_ports The number of ports + * \param stream_args Arguments to aid the construction of the streamer + */ + rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args); + + /*! Returns a unique identifier string for this node. In every RFNoC graph, + * no two nodes cannot have the same ID. Returns a string in the form of + * "TxStreamer#0". + * + * \returns The unique ID as a string + */ + std::string get_unique_id() const; + + /*! Returns the number of input ports for this block. + * + * Always returns 0 for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_input_ports() const; + + /*! Returns the number of output ports for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_output_ports() const; + + /*! Returns stream args provided at creation + * + * \return stream args provided when streamer is created + */ + const uhd::stream_args_t& get_stream_args() const; + + /*! Check that all streamer ports are connected to blocks + * + * Overrides node_t to ensure there are no unconnected ports. + * + * \param connected_inputs A list of input ports that are connected + * \param connected_outputs A list of output ports that are connected + * \returns true if the block can deal with this configuration + */ + bool check_topology(const std::vector<size_t>& connected_inputs, + const std::vector<size_t>& connected_outputs); + +private: + void _register_props(const size_t chan, const std::string& otw_format); + + // Properties + std::vector<property_t<double>> _scaling_out; + std::vector<property_t<double>> _samp_rate_out; + std::vector<property_t<double>> _tick_rate_out; + std::vector<property_t<std::string>> _type_out; + + // Streamer unique ID + const std::string _unique_id; + + // Stream args provided at construction + const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp new file mode 100644 index 000000000..937baf982 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp @@ -0,0 +1,130 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP + +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage rx flow control state +class rx_flow_ctrl_state +{ +public: + //! Constructor + rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {} + + //! Initialize frequency parameters + void initialize(const stream_buff_params_t fc_freq) + { + _fc_freq = fc_freq; + } + + //! Resynchronize with transfer counts from the sender + void resynchronize(const stream_buff_params_t counts) + { + if (_recv_counts.bytes != counts.bytes + || _recv_counts.packets != counts.packets) { + // If there is a discrepancy between the amount of data sent by + // the device and received by the transport, adjust the counts + // of data received and transferred to include the dropped data. + auto bytes_dropped = counts.bytes - _recv_counts.bytes; + auto pkts_dropped = counts.packets - _recv_counts.packets; + _xfer_counts.bytes += bytes_dropped; + _xfer_counts.packets += pkts_dropped; + + UHD_LOGGER_DEBUG("rx_flow_ctrl_state") + << "oh noes: bytes_sent=" << counts.bytes + << " bytes_received=" << _recv_counts.bytes + << " pkts_sent=" << counts.packets + << " pkts_received=" << _recv_counts.packets + << " src_epid=" << _epids.first << " dst_epid=" << _epids.second + << std::endl; + + _recv_counts = counts; + } + } + + //! Reset the transfer counts (happens during init) + void reset_counts() + { + UHD_LOGGER_TRACE("rx_flow_ctrl_state") + << "Resetting transfer counts" << std::endl; + _recv_counts = {0, 0}; + _xfer_counts = {0, 0}; + } + + //! Update state when data is received + void data_received(const size_t bytes) + { + _recv_counts.bytes += bytes; + _recv_counts.packets++; + } + + //! Update state when transfer is complete (buffer space freed) + void xfer_done(const size_t bytes) + { + _xfer_counts.bytes += bytes; + _xfer_counts.packets++; + } + + //! Returns whether a flow control response is needed + bool fc_resp_due() const + { + stream_buff_params_t accum_counts = { + _xfer_counts.bytes - _last_fc_resp_counts.bytes, + _xfer_counts.packets - _last_fc_resp_counts.packets}; + + return accum_counts.bytes >= _fc_freq.bytes + || accum_counts.packets >= _fc_freq.packets; + } + + //! Update state after flow control response was sent + void fc_resp_sent() + { + _last_fc_resp_counts = _xfer_counts; + } + + //! Returns counts for completed transfers + stream_buff_params_t get_xfer_counts() const + { + return _xfer_counts; + } + + //! Returns counts for completed transfers + stream_buff_params_t get_recv_counts() const + { + return _recv_counts; + } + + //! Returns configured flow control frequency + stream_buff_params_t get_fc_freq() const + { + return _fc_freq; + } + +private: + // Counts for data received, including any data still in use + stream_buff_params_t _recv_counts{0, 0}; + + // Counts for data read and whose buffer space is ok to reuse + stream_buff_params_t _xfer_counts{0, 0}; + + // Counts sent in last flow control response + stream_buff_params_t _last_fc_resp_counts{0, 0}; + + // Frequency of flow control responses + stream_buff_params_t _fc_freq{0, 0}; + + // Endpoint ID for log messages + const sep_id_pair_t _epids; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp new file mode 100644 index 000000000..65fc1b093 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp @@ -0,0 +1,99 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP + +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage tx flow control state +class tx_flow_ctrl_state +{ +public: + //! Updates destination capacity + void set_dest_capacity(const stream_buff_params_t& capacity) + { + _dest_capacity = capacity; + } + + //! Updates destination received count + void update_dest_recv_count(const stream_buff_params_t& recv_count) + { + _recv_counts = recv_count; + } + + /*! Returns whether the destination has buffer space for the requested + * packet size + */ + bool dest_has_space(const size_t packet_size) + { + // The stream endpoint only cares about bytes, the packet count is not + // important to determine the space available. + const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes; + const auto space_available = _dest_capacity.bytes - buffer_fullness; + return space_available >= packet_size; + } + + //! Increments transfer count with amount of data sent + void data_sent(const size_t packet_size) + { + _xfer_counts.bytes += packet_size; + _xfer_counts.packets++; + + // Request an fc resync after we have transferred a number of bytes >= + // to the destination capacity. There is no strict requirement on how + // often we need to send this, as it is only needed to correct for + // dropped packets. One buffer's worth of bytes is probably a good + // cadence. + if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) { + _fc_resync_req = true; + } + } + + /*! Returns whether an fc resync request is pending. The policy we use + * here is to send an fc resync every time we send a number of bytes + * equal to the destination buffer capacity. + */ + bool get_fc_resync_req_pending() const + { + return _fc_resync_req; + } + + //! Clears fc resync request pending status + void clear_fc_resync_req_pending() + { + _fc_resync_req = false; + _last_fc_resync_bytes = _xfer_counts.bytes; + } + + //! Returns counts for packets sent + stream_buff_params_t get_xfer_counts() const + { + return _xfer_counts; + } + +private: + // Counts for data sent + stream_buff_params_t _xfer_counts{0, 0}; + + // Counts for data received by the destination + stream_buff_params_t _recv_counts{0, 0}; + + // Buffer size at the destination + stream_buff_params_t _dest_capacity{0, 0}; + + // Counts sent in last flow control resync + size_t _last_fc_resync_bytes = 0; + + // Track when to send ack packets + bool _fc_resync_req = false; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/dynamic_bitset.hpp> +#include <boost/format.hpp> + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template <typename transport_t> +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector<typename transport_t::uptr>& xports, + std::vector<typename transport_t::buff_t::uptr>& frame_buffs, + std::vector<typename transport_t::packet_info_t>& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr>& _xports; + + // Storage for buffers resulting from alignment + std::vector<typename transport_t::buff_t::uptr>& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector<typename transport_t::packet_info_t>& _infos; + + // Time of previous packet for each channel + std::vector<uint64_t> _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp new file mode 100644 index 000000000..d66e867bc --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -0,0 +1,341 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/exception.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/endianness.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/rx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for error handling + * + * If a recv call reads data from multiple packets, and an error occurs in the + * second or later packets, recv stops short of the num samps requested and + * returns no error. The error is cached for the next call to recv. + * + * Timeout errors are an exception. Timeouts that occur in the second or later + * packets of a recv call stop the recv method but the error is not returned in + * the next call. The user can check for this condition since fewer samples are + * returned than the number requested. + */ +class rx_metadata_cache +{ +public: + //! Stores metadata in the cache, ignoring timeout errors + UHD_FORCE_INLINE void store(const rx_metadata_t& metadata) + { + if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) { + _metadata_cache = metadata; + _cached_metadata = true; + } + } + + //! Checks for cached metadata + UHD_FORCE_INLINE bool check(rx_metadata_t& metadata) + { + if (_cached_metadata) { + metadata = _metadata_cache; + _cached_metadata = false; + return true; + } + return false; + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::rx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of rx streamer API + */ +template <typename transport_t> +class rx_streamer_impl : public rx_streamer +{ +public: + //! Constructor + rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_ports) + , _in_buffs(num_ports) + { + if (stream_args.cpu_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a cpu_format!"); + } + if (stream_args.otw_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a otw_format!"); + } + _setup_converters(num_ports, stream_args); + _zero_copy_streamer.set_samp_rate(_samp_rate); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + //! Implementation of rx_streamer API method + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + //! Implementation of rx_streamer API method + size_t get_max_num_samps() const + { + return _spp; + } + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + //! Implementation of rx_streamer API method + UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const double timeout, + const bool one_packet) + { + if (_error_metadata_cache.check(metadata)) { + return 0; + } + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + size_t total_samps_recv = + _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + + if (one_packet or metadata.end_of_burst) { + return total_samps_recv; + } + + // First set of packets recv had an error, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + return total_samps_recv; + } + + // Loop until buffer is filled or error code. This method returns the + // metadata from the first packet received, with the exception of + // end-of-burst. + uhd::rx_metadata_t loop_metadata; + + while (total_samps_recv < nsamps_per_buff) { + size_t num_samps = _recv_one_packet(buffs, + nsamps_per_buff - total_samps_recv, + loop_metadata, + timeout_ms, + total_samps_recv * _convert_info.bytes_per_cpu_item); + + // If metadata had an error code set, store for next call and return + if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + _error_metadata_cache.store(loop_metadata); + break; + } + + total_samps_recv += num_samps; + + // Return immediately if end of burst + if (loop_metadata.end_of_burst) { + metadata.end_of_burst = true; + break; + } + } + + return total_samps_recv; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + _zero_copy_streamer.set_samp_rate(rate); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Receive a single packet + UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const int32_t timeout_ms, + const size_t buffer_offset_bytes = 0) + { + if (_buff_samps_remaining == 0) { + // Current set of buffers has expired, get the next one + _buff_samps_remaining = + _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); + _fragment_offset_in_samps = 0; + } else { + // There are samples still left in the current set of buffers + metadata = _last_fragment_metadata; + metadata.time_spec += time_spec_t::from_ticks( + _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate); + } + + if (_buff_samps_remaining != 0) { + const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining); + + // Convert samples to the streamer's output format + for (size_t i = 0; i < get_num_channels(); i++) { + char* b = reinterpret_cast<char*>(buffs[i]); + const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes); + _convert_to_out_buff(out_buffs, i, num_samps); + } + + _buff_samps_remaining -= num_samps; + + // Write the fragment flags and offset + metadata.more_fragments = _buff_samps_remaining != 0; + metadata.fragment_offset = _fragment_offset_in_samps; + + if (metadata.more_fragments) { + _fragment_offset_in_samps += num_samps; + _last_fragment_metadata = metadata; + } + + return num_samps; + } else { + return 0; + } + } + + //! Convert samples for one channel into its buffer + UHD_FORCE_INLINE void _convert_to_out_buff( + const uhd::rx_streamer::buffs_type& out_buffs, + const size_t chan, + const size_t num_samps) + { + const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]); + + _converters[chan]->conv(buffer_ptr, out_buffs, num_samps); + + // Advance the pointer for the source buffer + _in_buffs[chan] = + buffer_ptr + num_samps * _convert_info.bytes_per_otw_item; + + if (_buff_samps_remaining == num_samps) { + _zero_copy_streamer.release_recv_buff(chan); + } + } + + //! Create converters and initialize _convert_info + void _setup_converters(const size_t num_ports, + const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.otw_format + "_chdr"; + id.num_inputs = 1; + id.output_format = stream_args.cpu_format; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_ports; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(1 / 32767.0); + } + } + + // Converter and item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Implementation of frame buffer management and packet info + rx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Container for buffer pointers used in recv method + std::vector<const void*> _in_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Num samps remaining in buffer currently held by zero copy streamer + size_t _buff_samps_remaining = 0; + + // Metadata cache for error handling + detail::rx_metadata_cache _error_metadata_cache; + + // Fragment (partially read packet) information + size_t _fragment_offset_in_samps = 0; + rx_metadata_t _last_fragment_metadata; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp new file mode 100644 index 000000000..36f568f2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -0,0 +1,207 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/get_aligned_buffs.hpp> +#include <boost/format.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of rx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class rx_streamer_zero_copy +{ +public: + //! Constructor + rx_streamer_zero_copy(const size_t num_ports) + : _xports(num_ports) + , _frame_buffs(num_ports) + , _infos(num_ports) + , _get_aligned_buffs(_xports, _frame_buffs, _infos) + { + } + + ~rx_streamer_zero_copy() + { + for (size_t i = 0; i < _frame_buffs.size(); i++) { + if (_frame_buffs[i]) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + } + } + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of time-aligned buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param metadata returns the metadata corresponding to the buffer + * \param timeout_ms timeout in milliseconds + * \return the size in samples of each packet, or 0 if timeout + */ + size_t get_recv_buffs(std::vector<const void*>& buffs, + rx_metadata_t& metadata, + const int32_t timeout_ms) + { + metadata.reset(); + + switch (_get_aligned_buffs(timeout_ms)) { + case get_aligned_buffs_t::SUCCESS: + break; + + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return 0; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return 0; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return 0; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + metadata.has_time_spec = _last_read_time_info.has_time_spec; + metadata.time_spec = + _last_read_time_info.time_spec + + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + return 0; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + + // Get payload pointers for each buffer and aggregate eob. We set eob to + // true if any channel has it set, since no more data will be received for + // that channel. In most cases, all channels should have the same value. + bool eob = false; + for (size_t i = 0; i < buffs.size(); i++) { + buffs[i] = _infos[i].payload; + eob |= _infos[i].eob; + } + + // Set the metadata from the buffer information at index zero + const auto& info_0 = _infos[0]; + + metadata.has_time_spec = info_0.has_tsf; + metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate); + metadata.start_of_burst = false; + metadata.end_of_burst = eob; + metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + // Done with these packets, save timestamp info for next call + _last_read_time_info.has_time_spec = metadata.has_time_spec; + _last_read_time_info.time_spec = metadata.time_spec; + _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item; + + return _last_read_time_info.num_samps; + } + + /*! + * Release the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + void release_recv_buff(const size_t channel) + { + _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel])); + _frame_buffs[channel] = typename transport_t::buff_t::uptr(); + } + +private: + using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + + // Information recorded by streamer about the last data packet processed, + // used to create the metadata when there is a sequence error. + struct last_read_time_info_t + { + size_t num_samps = 0; + bool has_time_spec = false; + time_spec_t time_spec; + }; + + // Transports for each channel + std::vector<typename transport_t::uptr> _xports; + + // Storage for buffers for each channel while they are in flight (between + // calls to get_recv_buff and release_recv_buff). + std::vector<typename transport_t::buff_t::uptr> _frame_buffs; + + // Packet info corresponding to the packets in flight + std::vector<typename transport_t::packet_info_t> _infos; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Rate used in conversion of timestamp to time_spec_t + double _samp_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Implementation of packet time alignment + get_aligned_buffs_t _get_aligned_buffs; + + // Information about the last data packet processed + last_read_time_info_t _last_read_time_info; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp new file mode 100644 index 000000000..60881dad2 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -0,0 +1,307 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhdlib/transport/tx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for send calls with zero samples + * + * Metadata is cached when we get a send requesting a start of burst with no + * samples. It is applied here on the next call to send() that actually has + * samples to send. + */ +class tx_metadata_cache +{ +public: + //! Stores metadata in the cache + UHD_FORCE_INLINE void store(const tx_metadata_t& metadata) + { + _metadata_cache = metadata; + _cached_metadata = true; + } + + //! Checks for cached metadata + UHD_FORCE_INLINE void check(tx_metadata_t& metadata) + { + if (_cached_metadata) { + // Only use cached time_spec if metadata does not have one + if (!metadata.has_time_spec) { + metadata.has_time_spec = _metadata_cache.has_time_spec; + metadata.time_spec = _metadata_cache.time_spec; + } + metadata.start_of_burst = _metadata_cache.start_of_burst; + metadata.end_of_burst = _metadata_cache.end_of_burst; + _cached_metadata = false; + } + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::tx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of tx streamer API + */ +template <typename transport_t> +class tx_streamer_impl : public tx_streamer +{ +public: + tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_chans) + , _zero_buffs(num_chans, &_zero) + , _out_buffs(num_chans) + { + _setup_converters(num_chans, stream_args); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + _spp = stream_args.args.cast<size_t>("spp", _spp); + } + + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + size_t get_max_num_samps() const + { + return _spp; + } + + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + size_t send(const uhd::tx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t& metadata_, + const double timeout) + { + uhd::tx_metadata_t metadata(metadata_); + + if (nsamps_per_buff == 0 && metadata.start_of_burst) { + _metadata_cache.store(metadata); + return 0; + } + + _metadata_cache.check(metadata); + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + if (nsamps_per_buff == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + timeout_ms); + + return 0; + } else if (nsamps_per_buff <= _spp) { + return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + + } else { + size_t total_num_samps_sent = 0; + const bool eob = metadata.end_of_burst; + metadata.end_of_burst = false; + + const size_t num_fragments = (nsamps_per_buff - 1) / _spp; + const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + + for (size_t i = 0; i < num_fragments; i++) { + const size_t num_samps_sent = _send_one_packet( + buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + + total_num_samps_sent += num_samps_sent; + + if (num_samps_sent == 0) { + return total_num_samps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; + } + + // Send the final fragment + metadata.end_of_burst = eob; + + size_t nsamps_sent = + total_num_samps_sent + + _send_one_packet( + buffs, total_num_samps_sent, final_length, metadata, timeout); + + return nsamps_sent; + } + } + + //! Implementation of rx_streamer API method + bool recv_async_msg( + uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) + { + // TODO: implement me + return false; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Convert samples for one channel and sends a packet + size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs, + const size_t buffer_offset_in_samps, + const size_t num_samples, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + if (!_zero_copy_streamer.get_send_buffs( + _out_buffs, num_samples, metadata, timeout_ms)) { + return 0; + } + + size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item; + + for (size_t i = 0; i < get_num_channels(); i++) { + const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset; + _converters[i]->conv(input_ptr, _out_buffs[i], num_samples); + + _zero_copy_streamer.release_send_buff(i); + } + + return num_samples; + } + + //! Create converters and initialize _bytes_per_cpu_item + void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.cpu_format; + id.num_inputs = 1; + id.output_format = stream_args.otw_format + "_chdr"; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_chans; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(32767.0); + } + } + + // Converter item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Manages frame buffers and packet info + tx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Buffer used to handle send calls with no data + std::vector<const void*> _zero_buffs; + + const uint64_t _zero = 0; + + // Container for buffer pointers used in send method + std::vector<void*> _out_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Metadata cache for send calls with no data + detail::tx_metadata_cache _metadata_cache; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp new file mode 100644 index 000000000..1b6f55238 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -0,0 +1,147 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of tx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class tx_streamer_zero_copy +{ +public: + //! Constructor + tx_streamer_zero_copy(const size_t num_chans) + : _xports(num_chans), _frame_buffs(num_chans) + { + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of frame buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param nsamps_per_buff the number of samples that will be written to each buffer + * \param metadata the metadata to write to the packet header + * \param timeout_ms timeout in milliseconds + * \return true if the operation was sucessful, false if timeout occurs + */ + UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs, + const size_t nsamps_per_buff, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + // Try to get a buffer per channel + for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) { + _frame_buffs[_next_buff_to_get].first = + _xports[_next_buff_to_get]->get_send_buff(timeout_ms); + + if (!_frame_buffs[_next_buff_to_get].first) { + return false; + } + } + + // Got all the buffers, start from index 0 next call + _next_buff_to_get = 0; + + // Store portions of metadata we care about + typename transport_t::packet_info_t info; + info.has_tsf = metadata.has_time_spec; + + if (metadata.has_time_spec) { + info.tsf = metadata.time_spec.to_ticks(_tick_rate); + } + + info.payload_bytes = nsamps_per_buff * _bytes_per_item; + info.eob = metadata.end_of_burst; + + // Write packet header + for (size_t i = 0; i < buffs.size(); i++) { + std::tie(buffs[i], _frame_buffs[i].second) = + _xports[i]->write_packet_header(_frame_buffs[i].first, info); + } + + return true; + } + + /*! + * Send the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + UHD_FORCE_INLINE void release_send_buff(const size_t channel) + { + _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second); + _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first)); + + _frame_buffs[channel].first = nullptr; + _frame_buffs[channel].second = 0; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr> _xports; + + // Container to hold frame buffers for each channel and their packet sizes + std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Next channel from which to get a buffer, stored as a member to + // allow the streamer to continue where it stopped due to timeouts. + size_t _next_buff_to_get = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index dfef4f90f..963458fe6 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -53,6 +53,8 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/tick_node_ctrl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tx_stream_terminator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp # Default block control classes: ${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp diff --git a/host/lib/rfnoc/chdr_packet.cpp b/host/lib/rfnoc/chdr_packet.cpp index 653181c04..d4b7494e2 100644 --- a/host/lib/rfnoc/chdr_packet.cpp +++ b/host/lib/rfnoc/chdr_packet.cpp @@ -31,7 +31,7 @@ public: { assert(pkt_buff); _pkt_buff = const_cast<uint64_t*>(reinterpret_cast<const uint64_t*>(pkt_buff)); - _compute_mdata_offset(); + _mdata_offset = _compute_mdata_offset(get_chdr_header()); } virtual void refresh(void* pkt_buff, chdr_header& header, uint64_t timestamp = 0) @@ -42,7 +42,7 @@ public: if (_has_timestamp(header)) { _pkt_buff[1] = u64_from_host(timestamp); } - _compute_mdata_offset(); + _mdata_offset = _compute_mdata_offset(get_chdr_header()); } virtual void update_payload_size(size_t payload_size_bytes) @@ -115,19 +115,27 @@ public: + (chdr_w_stride * (_mdata_offset + get_chdr_header().get_num_mdata()))); } + virtual size_t calculate_payload_offset(const packet_type_t pkt_type, + const uint8_t num_mdata = 0) const + { + chdr_header header; + header.set_pkt_type(pkt_type); + return (_compute_mdata_offset(header) + num_mdata) * chdr_w_bytes; + } + private: inline bool _has_timestamp(const chdr_header& header) const { return (header.get_pkt_type() == PKT_TYPE_DATA_WITH_TS); } - inline void _compute_mdata_offset() const + inline size_t _compute_mdata_offset(const chdr_header& header) const { // The metadata offset depends on the chdr_w and whether we have a timestamp if (chdr_w == 64) { - _mdata_offset = _has_timestamp(get_chdr_header()) ? 2 : 1; + return _has_timestamp(header) ? 2 : 1; } else { - _mdata_offset = 1; + return 1; } } diff --git a/host/lib/rfnoc/graph_stream_manager.cpp b/host/lib/rfnoc/graph_stream_manager.cpp index f2024786a..2db68db04 100644 --- a/host/lib/rfnoc/graph_stream_manager.cpp +++ b/host/lib/rfnoc/graph_stream_manager.cpp @@ -8,7 +8,9 @@ #include <uhd/utils/log.hpp> #include <uhdlib/rfnoc/graph_stream_manager.hpp> #include <uhdlib/rfnoc/link_stream_manager.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> #include <boost/format.hpp> +#include <boost/make_shared.hpp> #include <map> #include <set> @@ -175,6 +177,38 @@ public: "specified source endpoint"); } + chdr_rx_data_xport::uptr create_device_to_host_data_stream( + const sep_addr_t src_addr, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const device_addr_t& xport_args) + { + // TODO: choose a route + const device_id_t via_device = NULL_DEVICE_ID; + + return _link_mgrs.at(_check_dst_and_find_src(src_addr, via_device)) + ->create_device_to_host_data_stream(src_addr, + pyld_buff_fmt, + mdata_buff_fmt, + xport_args); + } + + virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( + sep_addr_t dst_addr, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const device_addr_t& xport_args) + { + // TODO: choose a route + const device_id_t via_device = NULL_DEVICE_ID; + + return _link_mgrs.at(_check_dst_and_find_src(dst_addr, via_device)) + ->create_host_to_device_data_stream(dst_addr, + pyld_buff_fmt, + mdata_buff_fmt, + xport_args); + } + private: device_id_t _check_dst_and_find_src(sep_addr_t dst_addr, device_id_t via_device) const { diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp index 4fe183529..6855162de 100644 --- a/host/lib/rfnoc/link_stream_manager.cpp +++ b/host/lib/rfnoc/link_stream_manager.cpp @@ -203,86 +203,71 @@ public: false, STREAM_SETUP_TIMEOUT); - // Compute FC frequency and headroom based on buff parameters - stream_buff_params_t fc_freq{ - static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * fc_freq_ratio)), - static_cast<uint32_t>( - std::ceil(double(buff_params.packets) * fc_freq_ratio))}; - stream_buff_params_t fc_headroom{ - static_cast<uint64_t>( - std::ceil(double(buff_params.bytes) * fc_headroom_ratio)), - static_cast<uint32_t>( - std::ceil(double(buff_params.packets) * fc_headroom_ratio))}; - // Reconfigure flow control using the new frequency and headroom return _mgmt_portal->config_remote_stream(*_ctrl_xport, dst_epid, src_epid, lossy_xport, - fc_freq, - fc_headroom, + _get_buff_params_ratio(buff_params, fc_freq_ratio), + _get_buff_params_ratio(buff_params, fc_headroom_ratio), reset, STREAM_SETUP_TIMEOUT); } virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( const sep_addr_t dst_addr, - const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, - const double fc_freq_ratio, - const double fc_headroom_ratio, const device_addr_t& xport_args) { - // Create a new source endpoint and EPID - sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++)); - sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr); - _allocated_epids.insert(src_epid); _ensure_ep_is_reachable(dst_addr); - // Generate a new destination EPID instance + // Generate a new destination (device) EPID instance sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr); + _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid); - // Create the data transport that we will return to the client - chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport( - _my_device_id, src_epid, dst_epid, xport_args); - - chdr_ctrl_xport::sptr mgmt_xport = - _mb_iface.make_ctrl_transport(_my_device_id, src_epid); - - // Create new temporary management portal with the transports used for this stream - // TODO: This is a bit excessive. Maybe we can pair down the functionality of the - // portal just for route setup purposes. Whatever we do, we *must* use xport in it - // though otherwise the transport will not behave correctly. - mgmt_portal::uptr data_mgmt_portal = - mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid); - - // Setup a route to the EPID - data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid); - data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid); if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) { throw uhd::rfnoc_error("Downstream endpoint does not support data traffic"); } - // TODO: Implement data transport setup logic here - + // Create a new destination (host) endpoint and EPID + sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++)); + sep_id_t src_epid = _epid_alloc->allocate_epid(sw_epid_addr); + _allocated_epids.insert(src_epid); - // Delete the portal when done - data_mgmt_portal.reset(); - return xport; + return _mb_iface.make_tx_data_transport({sw_epid_addr, dst_addr}, + {src_epid, dst_epid}, + pyld_buff_fmt, + mdata_buff_fmt, + xport_args); } - virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream( - const sep_addr_t src_addr, - const bool lossy_xport, + virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( + sep_addr_t src_addr, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, - const double fc_freq_ratio, - const double fc_headroom_ratio, const device_addr_t& xport_args) { - // TODO: Implement me - return chdr_tx_data_xport::uptr(); + _ensure_ep_is_reachable(src_addr); + + // Generate a new source (device) EPID instance + sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr); + _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid); + + if (!_mgmt_portal->get_endpoint_info(src_epid).has_data) { + throw uhd::rfnoc_error("Downstream endpoint does not support data traffic"); + } + + // Create a new destination (host) endpoint and EPID + sep_addr_t sw_epid_addr(_my_device_id, SEP_INST_DATA_BASE + (_data_ep_inst++)); + sep_id_t dst_epid = _epid_alloc->allocate_epid(sw_epid_addr); + _allocated_epids.insert(dst_epid); + + return _mb_iface.make_rx_data_transport({src_addr, sw_epid_addr}, + {src_epid, dst_epid}, + pyld_buff_fmt, + mdata_buff_fmt, + xport_args); } private: @@ -295,7 +280,14 @@ private: throw uhd::routing_error("Specified endpoint is not reachable"); } - // A reference to the packet factor + stream_buff_params_t _get_buff_params_ratio( + const stream_buff_params_t& buff_params, const double ratio) + { + return {static_cast<uint64_t>(std::ceil(double(buff_params.bytes) * ratio)), + static_cast<uint32_t>(std::ceil(double(buff_params.packets) * ratio))}; + } + + // A reference to the packet factory const chdr::chdr_packet_factory& _pkt_factory; // The device address of this software endpoint const device_id_t _my_device_id; diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp index b490e0baf..a8b72cbdf 100644 --- a/host/lib/rfnoc/mgmt_portal.cpp +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -794,12 +794,11 @@ private: // Functions mgmt_hop_t& hop) { // Validate flow control parameters - if (fc_freq.bytes >= (uint64_t(1) << 40) - || fc_freq.packets >= (uint64_t(1) << 24)) { + if (fc_freq.bytes > MAX_FC_FREQ_BYTES || fc_freq.packets > MAX_FC_FREQ_PKTS) { throw uhd::value_error("Flow control frequency parameters out of bounds"); } - if (fc_headroom.bytes >= (uint64_t(1) << 16) - || fc_headroom.packets >= (uint64_t(1) << 8)) { + if (fc_headroom.bytes > MAX_FC_HEADROOM_BYTES + || fc_headroom.packets > MAX_FC_HEADROOM_PKTS) { throw uhd::value_error("Flow control headroom parameters out of bounds"); } @@ -992,7 +991,8 @@ private: // Functions auto send_buff = xport.get_send_buff(timeout * 1000); if (not send_buff) { - UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction"); + UHD_LOG_ERROR( + "RFNOC::MGMT", "Timed out getting send buff for management transaction"); throw uhd::io_error("Timed out getting send buff for management transaction"); } _send_pkt->refresh(send_buff->data(), header, payload); diff --git a/host/lib/rfnoc/rfnoc_graph.cpp b/host/lib/rfnoc/rfnoc_graph.cpp index dd3dd7b90..4bf35cff1 100644 --- a/host/lib/rfnoc/rfnoc_graph.cpp +++ b/host/lib/rfnoc/rfnoc_graph.cpp @@ -15,12 +15,18 @@ #include <uhdlib/rfnoc/graph.hpp> #include <uhdlib/rfnoc/graph_stream_manager.hpp> #include <uhdlib/rfnoc/rfnoc_device.hpp> +#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp> +#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp> #include <uhdlib/utils/narrow.hpp> +#include <boost/make_shared.hpp> #include <boost/shared_ptr.hpp> // FIXME remove when rfnoc_device is ready #include <memory> using namespace uhd::rfnoc; +namespace { +const std::string LOG_ID("RFNOC::GRAPH"); +} class rfnoc_graph_impl : public rfnoc_graph { @@ -33,6 +39,7 @@ public: , _graph(std::make_unique<uhd::rfnoc::detail::graph_t>()) { setup_graph(dev_addr); + _init_sep_map(); _init_static_connections(); } @@ -76,31 +83,136 @@ public: } if (!has_block(dst_blk)) { throw uhd::lookup_error( - std::string("Cannot connect blocks, source block not found: ") - + src_blk.to_string()); + std::string("Cannot connect blocks, destination block not found: ") + + dst_blk.to_string()); } + auto edge_type = _physical_connect(src_blk, src_port, dst_blk, dst_port); _connect(get_block(src_blk), src_port, get_block(dst_blk), dst_port, + edge_type, skip_property_propagation); - _physical_connect(src_blk, src_port, dst_blk, dst_port); } - void connect(uhd::tx_streamer& /*streamer*/, - size_t /*strm_port*/, - const block_id_t& /*dst_blk*/, - size_t /*dst_port*/) + void connect(uhd::tx_streamer::sptr streamer, + size_t strm_port, + const block_id_t& dst_blk, + size_t dst_port) { - throw uhd::not_implemented_error(""); + // Verify the streamer was created by us + auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_tx_streamer>(streamer); + if (!rfnoc_streamer) { + throw uhd::type_error("Streamer is not rfnoc capable"); + } + + // Verify src_blk even exists in this graph + if (!has_block(dst_blk)) { + throw uhd::lookup_error( + std::string("Cannot connect block to streamer, source block not found: ") + + dst_blk.to_string()); + } + + // Verify src_blk has an SEP upstream + graph_edge_t dst_static_edge = _assert_edge( + _get_static_edge( + [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) { + return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port; + }), + dst_blk.to_string()); + if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) { + const std::string err_msg = + dst_blk.to_string() + ":" + std::to_string(dst_port) + + " is not connected to an SEP! Routing impossible."; + UHD_LOG_ERROR(LOG_ID, err_msg); + throw uhd::routing_error(err_msg); + } + + // Now get the name and address of the SEP + const std::string sep_block_id = dst_static_edge.src_blockid; + const sep_addr_t sep_addr = _sep_map.at(sep_block_id); + + const sw_buff_t pyld_fmt = + bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width()); + const sw_buff_t mdata_fmt = BUFF_U64; + + auto xport = _gsm->create_host_to_device_data_stream(sep_addr, + pyld_fmt, + mdata_fmt, + rfnoc_streamer->get_stream_args().args); + + rfnoc_streamer->connect_channel(strm_port, std::move(xport)); + + //// If this worked, then also connect the streamer in the BGL graph + auto dst = get_block(dst_blk); + graph_edge_t edge_info(strm_port, dst_port, graph_edge_t::TX_STREAM, true); + _graph->connect(rfnoc_streamer.get(), dst.get(), edge_info); } - void connect(const block_id_t& /*src_blk*/, - size_t /*src_port*/, - uhd::rx_streamer& /*streamer*/, - size_t /*strm_port*/) + void connect(const block_id_t& src_blk, + size_t src_port, + uhd::rx_streamer::sptr streamer, + size_t strm_port) { - throw uhd::not_implemented_error(""); + // Verify the streamer was created by us + auto rfnoc_streamer = boost::dynamic_pointer_cast<rfnoc_rx_streamer>(streamer); + if (!rfnoc_streamer) { + throw uhd::type_error("Streamer is not rfnoc capable"); + } + + // Verify src_blk even exists in this graph + if (!has_block(src_blk)) { + throw uhd::lookup_error( + std::string("Cannot connect block to streamer, source block not found: ") + + src_blk.to_string()); + } + + // Verify src_blk has an SEP downstream + graph_edge_t src_static_edge = _assert_edge( + _get_static_edge( + [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) { + return edge.src_blockid == src_blk_id && edge.src_port == src_port; + }), + src_blk.to_string()); + if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) { + const std::string err_msg = + src_blk.to_string() + ":" + std::to_string(src_port) + + " is not connected to an SEP! Routing impossible."; + UHD_LOG_ERROR(LOG_ID, err_msg); + throw uhd::routing_error(err_msg); + } + + // Now get the name and address of the SEP + const std::string sep_block_id = src_static_edge.dst_blockid; + const sep_addr_t sep_addr = _sep_map.at(sep_block_id); + + const sw_buff_t pyld_fmt = + bits_to_sw_buff(rfnoc_streamer->get_otw_item_comp_bit_width()); + const sw_buff_t mdata_fmt = BUFF_U64; + + auto xport = _gsm->create_device_to_host_data_stream(sep_addr, + pyld_fmt, + mdata_fmt, + rfnoc_streamer->get_stream_args().args); + + rfnoc_streamer->connect_channel(strm_port, std::move(xport)); + + // If this worked, then also connect the streamer in the BGL graph + auto src = get_block(src_blk); + graph_edge_t edge_info(src_port, strm_port, graph_edge_t::RX_STREAM, true); + _graph->connect(src.get(), rfnoc_streamer.get(), edge_info); + } + + uhd::rx_streamer::sptr create_rx_streamer( + const size_t num_chans, const uhd::stream_args_t& args) + { + return boost::make_shared<rfnoc_rx_streamer>(num_chans, args); + } + + uhd::tx_streamer::sptr create_tx_streamer( + const size_t num_chans, const uhd::stream_args_t& args) + { + return boost::make_shared<rfnoc_tx_streamer>(num_chans, args); } std::shared_ptr<mb_controller> get_mb_controller(const size_t mb_index = 0) @@ -152,7 +264,7 @@ private: throw uhd::key_error(std::string("Found no RFNoC devices for ----->\n") + dev_addr.to_pp_string()); } - _tree = _device->get_tree(); + _tree = _device->get_tree(); _num_mboards = _tree->list("/mboards").size(); for (size_t i = 0; i < _num_mboards; ++i) { _mb_controllers.emplace(i, _device->get_mb_controller(i)); @@ -170,7 +282,8 @@ private: try { _gsm = graph_stream_manager::make(pkt_factory, epid_alloc, links); } catch (uhd::io_error& ex) { - UHD_LOG_ERROR("RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what()); + UHD_LOG_ERROR( + "RFNOC::GRAPH", "IO Error during GSM initialization. " << ex.what()); throw; } @@ -187,6 +300,9 @@ private: _gsm->connect_host_to_device(ctrl_sep_addr); // Grab and stash the Client Zero for this mboard detail::client_zero::sptr mb_cz = _gsm->get_client_zero(ctrl_sep_addr); + // Client zero port numbers are based on the control xbar numbers, + // which have the client 0 interface first, followed by stream + // endpoints, and then the blocks. _client_zeros.emplace(mb_idx, mb_cz); const size_t num_blocks = mb_cz->get_num_blocks(); @@ -204,7 +320,7 @@ private: // Iterate through and register each of the blocks in this mboard for (size_t portno = 0; portno < num_blocks; ++portno) { - auto noc_id = mb_cz->get_noc_id(portno + first_block_port); + auto noc_id = mb_cz->get_noc_id(portno + first_block_port); auto block_factory_info = factory::get_block_factory(noc_id); auto block_info = mb_cz->get_block_info(portno + first_block_port); block_id_t block_id(mb_idx, @@ -222,24 +338,25 @@ private: // iface object through the mb_iface auto ctrlport_clk_iface = mb.get_clock_iface(block_factory_info.ctrlport_clk); - auto tb_clk_iface = (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) ? - std::make_shared<clock_iface>(CLOCK_KEY_GRAPH) : - mb.get_clock_iface(block_factory_info.timebase_clk); + auto tb_clk_iface = + (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) + ? std::make_shared<clock_iface>(CLOCK_KEY_GRAPH) + : mb.get_clock_iface(block_factory_info.timebase_clk); // A "graph" clock is always "running" if (block_factory_info.timebase_clk == CLOCK_KEY_GRAPH) { tb_clk_iface->set_running(true); } - auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr, + auto block_reg_iface = _gsm->get_block_register_iface(ctrl_sep_addr, portno, *ctrlport_clk_iface.get(), *tb_clk_iface.get()); - auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>(); + auto make_args_uptr = std::make_unique<noc_block_base::make_args_t>(); make_args_uptr->noc_id = noc_id; - make_args_uptr->block_id = block_id; - make_args_uptr->num_input_ports = block_info.num_inputs; - make_args_uptr->num_output_ports = block_info.num_outputs; - make_args_uptr->reg_iface = block_reg_iface; - make_args_uptr->tb_clk_iface = tb_clk_iface; + make_args_uptr->block_id = block_id; + make_args_uptr->num_input_ports = block_info.num_inputs; + make_args_uptr->num_output_ports = block_info.num_outputs; + make_args_uptr->reg_iface = block_reg_iface; + make_args_uptr->tb_clk_iface = tb_clk_iface; make_args_uptr->ctrlport_clk_iface = ctrlport_clk_iface; make_args_uptr->mb_control = (factory::has_requested_mb_access(noc_id) ? _mb_controllers.at(mb_idx) @@ -262,40 +379,43 @@ private: _block_registry->init_props(); } + void _init_sep_map() + { + for (size_t mb_idx = 0; mb_idx < get_num_mboards(); ++mb_idx) { + auto remote_device_id = _device->get_mb_iface(mb_idx).get_remote_device_id(); + auto& cz = _client_zeros.at(mb_idx); + for (size_t sep_idx = 0; sep_idx < cz->get_num_stream_endpoints(); + ++sep_idx) { + // Register ID in _port_block_map + block_id_t id(mb_idx, NODE_ID_SEP, sep_idx); + _port_block_map.insert({{mb_idx, sep_idx + 1}, id}); + _sep_map.insert({id.to_string(), sep_addr_t(remote_device_id, sep_idx)}); + } + } + } + void _init_static_connections() { UHD_LOG_TRACE("RFNOC::GRAPH", "Identifying static connections..."); for (auto& kv_cz : _client_zeros) { - auto& adjacency_list = kv_cz.second->get_adjacency_list(); - const size_t first_block_port = 1 + kv_cz.second->get_num_stream_endpoints(); - + auto& adjacency_list = kv_cz.second->get_adjacency_list(); for (auto& edge : adjacency_list) { // Assemble edge auto graph_edge = graph_edge_t(); - if (edge.src_blk_index < first_block_port) { - block_id_t id(kv_cz.first, NODE_ID_SEP, edge.src_blk_index - 1); - _port_block_map.insert({{kv_cz.first, edge.src_blk_index}, id}); - graph_edge.src_blockid = id.to_string(); - } else { - graph_edge.src_blockid = - _port_block_map.at({kv_cz.first, edge.src_blk_index}); - } - if (edge.dst_blk_index < first_block_port) { - block_id_t id(kv_cz.first, NODE_ID_SEP, edge.dst_blk_index - 1); - _port_block_map.insert({{kv_cz.first, edge.dst_blk_index}, id}); - graph_edge.dst_blockid = id.to_string(); - } else { - graph_edge.dst_blockid = - _port_block_map.at({kv_cz.first, edge.dst_blk_index}); - } + UHD_ASSERT_THROW( + _port_block_map.count({kv_cz.first, edge.src_blk_index})); + graph_edge.src_blockid = + _port_block_map.at({kv_cz.first, edge.src_blk_index}); + UHD_ASSERT_THROW( + _port_block_map.count({kv_cz.first, edge.dst_blk_index})); + graph_edge.dst_blockid = + _port_block_map.at({kv_cz.first, edge.dst_blk_index}); graph_edge.src_port = edge.src_blk_port; graph_edge.dst_port = edge.dst_blk_port; graph_edge.edge = graph_edge_t::edge_t::STATIC; _static_edges.push_back(graph_edge); - UHD_LOG_TRACE("RFNOC::GRAPH", - "Static connection: " - << graph_edge.src_blockid << ":" << graph_edge.src_port << " -> " - << graph_edge.dst_blockid << ":" << graph_edge.dst_port); + UHD_LOG_TRACE( + "RFNOC::GRAPH", "Static connection: " << graph_edge.to_string()); } } } @@ -312,214 +432,98 @@ private: size_t src_port, std::shared_ptr<node_t> dst_blk, size_t dst_port, + graph_edge_t::edge_t edge_type, bool skip_property_propagation) { graph_edge_t edge_info( - src_port, dst_port, graph_edge_t::DYNAMIC, not skip_property_propagation); + src_port, dst_port, edge_type, not skip_property_propagation); edge_info.src_blockid = src_blk->get_unique_id(); edge_info.dst_blockid = dst_blk->get_unique_id(); _graph->connect(src_blk.get(), dst_blk.get(), edge_info); } - /*! Helper method to find a stream endpoint connected to a block - * - * \param blk_id the block connected to the stream endpoint - * \param port the port connected to the stream endpoint - * \param blk_is_src true if the block is a data source, false if it is a - * destination - * \return the address of the stream endpoint, or boost::none if it is not - * directly connected to a stream endpoint - */ - boost::optional<sep_addr_t> _get_adjacent_sep( - const block_id_t& blk_id, const size_t port, const bool blk_is_src) const - { - const std::string block_id_str = get_block(blk_id)->get_block_id().to_string(); - UHD_LOG_TRACE("RFNOC::GRAPH", - "Finding SEP for " << (blk_is_src ? "source" : "dst") << " block " - << block_id_str << ":" << port); - // TODO: This is an attempt to simplify the algo, but it turns out to be - // as many lines as before: - //auto edge_predicate = [blk_is_src, block_id_str](const graph_edge_t edge) { - //if (blk_is_src) { - //return edge.src_blockid == block_id_str; - //} - //return edge.dst_blockid == block_id_str; - //}; - - //auto blk_edge_it = - //std::find_if(_static_edges.cbegin(), _static_edges.cend(), edge_predicate); - //if (blk_edge_it == _static_edges.cend()) { - //return boost::none; - //} - - //const std::string sep_block_id = blk_is_src ? - //blk_edge_it->dst_blockid : blk_edge_it->src_blockid; - //UHD_LOG_TRACE("RFNOC::GRAPH", - //"Found SEP: " << sep_block_id); - - //auto port_map_result = std::find_if(_port_block_map.cbegin(), - //_port_block_map.cend, - //[sep_block_id](std::pair<std::pair<size_t, size_t>, block_id_t> port_block) { - //return port_block.second == sep_block_id; - //}); - //if (port_map_result == _port_block_map.cend()) { - //throw uhd::lookup_error( - //std::string("SEP `") + sep_block_id + "' not found in port/block map!"); - //} - //const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id(); - //const sep_inst_t sep_inst = blk_is_src ? - //edge.dst_blk_index - 1 : edge.src_blk_index - 1; - //return sep_addr_t(dev, sep_inst); - - const auto& info = _xbar_block_config.at(block_id_str); - const auto mb_idx = blk_id.get_device_no(); - const auto cz = _client_zeros.at(mb_idx); - - const size_t first_block_port = 1 + cz->get_num_stream_endpoints(); - - for (const auto& edge : cz->get_adjacency_list()) { - const auto edge_blk_idx = blk_is_src ? edge.src_blk_index - : edge.dst_blk_index; - const auto edge_blk_port = blk_is_src ? edge.src_blk_port : edge.dst_blk_port; - - if ((edge_blk_idx == info.xbar_port + first_block_port) - and (edge_blk_port == port)) { - UHD_LOGGER_DEBUG("RFNOC::GRAPH") - << boost::format("Block found in adjacency list. %d:%d->%d:%d") - % edge.src_blk_index - % static_cast<unsigned int>(edge.src_blk_port) - % edge.dst_blk_index - % static_cast<unsigned int>(edge.dst_blk_port); - - // Check that the block is connected to a stream endpoint. The - // minus one here is because index zero is client 0. - const sep_inst_t sep_inst = blk_is_src ? - edge.dst_blk_index - 1 : edge.src_blk_index - 1; - - if (sep_inst < cz->get_num_stream_endpoints()) { - const auto dev = _device->get_mb_iface(mb_idx).get_remote_device_id(); - return sep_addr_t(dev, sep_inst); - } else { - // Block is connected to another block - return boost::none; - } - } - } - return boost::none; - } - /*! Internal physical connection helper * * Make the connections in the physical device * - * \throws connect_disallowed_on_src - * if the source port is statically connected to a *different* block - * \throws connect_disallowed_on_dst - * if the destination port is statically connected to a *different* block + * \throws uhd::routing_error + * if the blocks are statically connected to something else */ - void _physical_connect(const block_id_t& src_blk, + graph_edge_t::edge_t _physical_connect(const block_id_t& src_blk, size_t src_port, const block_id_t& dst_blk, size_t dst_port) { - auto src_blk_ctrl = get_block(src_blk); - auto dst_blk_ctrl = get_block(dst_blk); - - /* - * Start by determining if the connection can be made - * Get the adjacency list and check if the connection is in it already - */ - // Read the adjacency list for the source and destination blocks - auto src_mb_idx = src_blk.get_device_no(); - auto src_cz = _gsm->get_client_zero( - sep_addr_t(_device->get_mb_iface(src_mb_idx).get_remote_device_id(), 0)); - std::vector<detail::client_zero::edge_def_t>& adj_list = - src_cz->get_adjacency_list(); - // Check the src_blk - auto src_blk_xbar_info = - _xbar_block_config.at(src_blk_ctrl->get_block_id().to_string()); - // This "xbar_port" starts at the first block, so we need to add the client zero - // and stream endpoint ports - const auto src_xbar_port = - src_blk_xbar_info.xbar_port + src_cz->get_num_stream_endpoints() + 1; - // We can also find out which stream endpoint the src block is connected to - sep_inst_t src_sep; - for (detail::client_zero::edge_def_t edge : adj_list) { - if ((edge.src_blk_index == src_xbar_port) - and (edge.src_blk_port == src_port)) { - UHD_LOGGER_DEBUG("RFNOC::GRAPH") - << boost::format("Block found in adjacency list. %d:%d->%d:%d") - % edge.src_blk_index - % static_cast<unsigned int>(edge.src_blk_port) - % edge.dst_blk_index - % static_cast<unsigned int>(edge.dst_blk_port); - if (edge.dst_blk_index <= src_cz->get_num_stream_endpoints()) { - src_sep = - edge.dst_blk_index - 1 /* minus 1 because port 0 is client zero*/; - } else { - // TODO connect_disallowed_on_src? - // TODO put more info in exception - throw uhd::routing_error( - "Unable to connect to statically connected source port"); - } - } + const std::string src_blk_info = + src_blk.to_string() + ":" + std::to_string(src_port); + const std::string dst_blk_info = + dst_blk.to_string() + ":" + std::to_string(dst_port); + + // Find the static edge for src_blk:src_port + graph_edge_t src_static_edge = _assert_edge( + _get_static_edge( + [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) { + return edge.src_blockid == src_blk_id && edge.src_port == src_port; + }), + src_blk_info); + + // Now see if it's already connected to the destination + if (src_static_edge.dst_blockid == dst_blk.to_string() + && src_static_edge.dst_port == dst_port) { + UHD_LOG_TRACE(LOG_ID, + "Blocks " << src_blk_info << " and " << dst_blk_info + << " are already statically connected, no physical connection " + "required."); + return graph_edge_t::STATIC; } - // Read the dst adjacency list if its different - // TODO they may be on the same mboard, which would make this redundant - auto dst_mb_idx = dst_blk.get_device_no(); - auto dst_cz = _gsm->get_client_zero( - sep_addr_t(_device->get_mb_iface(dst_mb_idx).get_remote_device_id(), 0)); - adj_list = dst_cz->get_adjacency_list(); - // Check the dst blk - auto dst_blk_xbar_info = - _xbar_block_config.at(dst_blk_ctrl->get_block_id().to_string()); - // This "xbar_port" starts at the first block, so we need to add the client zero - // and stream endpoint ports - const auto dst_xbar_port = - dst_blk_xbar_info.xbar_port + dst_cz->get_num_stream_endpoints() + 1; - // We can also find out which stream endpoint the dst block is connected to - sep_inst_t dst_sep; - for (detail::client_zero::edge_def_t edge : adj_list) { - if ((edge.dst_blk_index == dst_xbar_port) - and (edge.dst_blk_port == dst_port)) { - UHD_LOGGER_DEBUG("RFNOC::GRAPH") - << boost::format("Block found in adjacency list. %d:%d->%d:%d") - % edge.src_blk_index - % static_cast<unsigned int>(edge.src_blk_port) - % edge.dst_blk_index - % static_cast<unsigned int>(edge.dst_blk_port); - if (edge.src_blk_index <= dst_cz->get_num_stream_endpoints()) { - dst_sep = - edge.src_blk_index - 1 /* minus 1 because port 0 is client zero*/; - } else { - // TODO connect_disallowed_on_dst? - // TODO put more info in exception - throw uhd::routing_error( - "Unable to connect to statically connected destination port"); - } - } + // If they're not statically connected, the source *must* be connected + // to an SEP, or this route is impossible + if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) { + const std::string err_msg = + src_blk_info + " is neither statically connected to " + dst_blk_info + + " nor to an SEP! Routing impossible."; + UHD_LOG_ERROR(LOG_ID, err_msg); + throw uhd::routing_error(err_msg); } - /* TODO: we checked if either port is used in a static connection (and its not if - * we've made it this far). We also need to check something else, but I can't - * remember what... - */ - - // At this point, we know the attempted connection is valid, so let's go ahead and - // make it - sep_addr_t src_sep_addr( - _device->get_mb_iface(src_mb_idx).get_remote_device_id(), src_sep); - sep_addr_t dst_sep_addr( - _device->get_mb_iface(dst_mb_idx).get_remote_device_id(), dst_sep); + // OK, now we know which source SEP we have + const std::string src_sep_info = src_static_edge.dst_blockid; + const sep_addr_t src_sep_addr = _sep_map.at(src_sep_info); + + // Now find the static edge for the destination SEP + auto dst_static_edge = _assert_edge( + _get_static_edge( + [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) { + return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port; + }), + dst_blk_info); + + // If they're not statically connected, the source *must* be connected + // to an SEP, or this route is impossible + if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) { + const std::string err_msg = + dst_blk_info + " is neither statically connected to " + src_blk_info + + " nor to an SEP! Routing impossible."; + UHD_LOG_ERROR(LOG_ID, err_msg); + throw uhd::routing_error(err_msg); + } + + // OK, now we know which destination SEP we have + const std::string dst_sep_info = dst_static_edge.src_blockid; + const sep_addr_t dst_sep_addr = _sep_map.at(dst_sep_info); + + // Now all we need to do is dynamically connect those SEPs auto strm_info = _gsm->create_device_to_device_data_stream( dst_sep_addr, src_sep_addr, false, 0.1, 0.0, false); - UHD_LOGGER_INFO("RFNOC::GRAPH") + UHD_LOGGER_DEBUG(LOG_ID) << boost::format("Data stream between EPID %d and EPID %d established " "where downstream buffer can hold %lu bytes and %u packets") % std::get<0>(strm_info).first % std::get<0>(strm_info).second % std::get<1>(strm_info).bytes % std::get<1>(strm_info).packets; + + return graph_edge_t::DYNAMIC; } //! Flush and reset each connected port on the mboard @@ -541,6 +545,35 @@ private: mb_cz->reset_ctrl(block_portno); } } + + /*! Find the static edge that matches \p pred + * + * \throws uhd::assertion_error if the edge can't be found. So be careful! + */ + template <typename UnaryPredicate> + boost::optional<graph_edge_t> _get_static_edge(UnaryPredicate&& pred) + { + auto edge_it = std::find_if(_static_edges.cbegin(), _static_edges.cend(), pred); + if (edge_it == _static_edges.cend()) { + return boost::none; + } + return *edge_it; + } + + /*! Make sure an optional edge info is valid, or throw. + */ + graph_edge_t _assert_edge( + boost::optional<graph_edge_t> edge_o, const std::string& blk_info) + { + if (!bool(edge_o)) { + const std::string err_msg = std::string("Cannot connect block ") + blk_info + + ", port is unconnected in the FPGA!"; + UHD_LOG_ERROR("RFNOC::GRAPH", err_msg); + throw uhd::routing_error(err_msg); + } + return edge_o.get(); + } + /************************************************************************** * Attributes *************************************************************************/ @@ -580,6 +613,9 @@ private: // or SEP std::map<std::pair<size_t, size_t>, block_id_t> _port_block_map; + //! Map SEP block ID (e.g. 0/SEP#0) onto a sep_addr_t + std::unordered_map<std::string, sep_addr_t> _sep_map; + //! List of statically connected edges. Includes SEPs too! std::vector<graph_edge_t> _static_edges; diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp new file mode 100644 index 000000000..4340faff0 --- /dev/null +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -0,0 +1,141 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/defaults.hpp> +#include <uhdlib/rfnoc/node_accessor.hpp> +#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp> +#include <atomic> + +using namespace uhd; +using namespace uhd::rfnoc; + +const std::string STREAMER_ID = "RxStreamer"; +static std::atomic<uint64_t> streamer_inst_ctr; + +rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, + const uhd::stream_args_t stream_args) + : rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args) + , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) + , _stream_args(stream_args) +{ + // No block to which to forward properties or actions + set_prop_forwarding_policy(forwarding_policy_t::DROP); + set_action_forwarding_policy(forwarding_policy_t::DROP); + + // Initialize properties + _scaling_in.reserve(num_chans); + _samp_rate_in.reserve(num_chans); + _tick_rate_in.reserve(num_chans); + _type_in.reserve(num_chans); + + for (size_t i = 0; i < num_chans; i++) { + _register_props(i, stream_args.otw_format); + } + node_accessor_t node_accessor{}; + node_accessor.init_props(this); +} + +std::string rfnoc_rx_streamer::get_unique_id() const +{ + return _unique_id; +} + +size_t rfnoc_rx_streamer::get_num_input_ports() const +{ + return get_num_channels(); +} + +size_t rfnoc_rx_streamer::get_num_output_ports() const +{ + return 0; +} + +void rfnoc_rx_streamer::issue_stream_cmd(const stream_cmd_t& stream_cmd) +{ + if (get_num_channels() > 1 and stream_cmd.stream_now + and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) { + throw uhd::runtime_error( + "Invalid recv stream command - stream now on multiple channels in a " + "single streamer will fail to time align."); + } + + auto cmd = stream_cmd_action_info::make(stream_cmd.stream_mode); + cmd->stream_cmd = stream_cmd; + + for (size_t i = 0; i < get_num_channels(); i++) { + const res_source_info info(res_source_info::INPUT_EDGE, i); + post_action(info, cmd); + } +} + +const uhd::stream_args_t& rfnoc_rx_streamer::get_stream_args() const +{ + return _stream_args; +} + +bool rfnoc_rx_streamer::check_topology( + const std::vector<size_t>& connected_inputs, + const std::vector<size_t>& connected_outputs) +{ + // Check that all channels are connected + if (connected_inputs.size() != get_num_input_ports()) { + return false; + } + + // Call base class to check that connections are valid + return node_t::check_topology(connected_inputs, connected_outputs); +} + +void rfnoc_rx_streamer::_register_props(const size_t chan, + const std::string& otw_format) +{ + // Create actual properties and store them + _scaling_in.push_back(property_t<double>( + PROP_KEY_SCALING, {res_source_info::INPUT_EDGE, chan})); + _samp_rate_in.push_back( + property_t<double>(PROP_KEY_SAMP_RATE, {res_source_info::INPUT_EDGE, chan})); + _tick_rate_in.push_back(property_t<double>( + PROP_KEY_TICK_RATE, {res_source_info::INPUT_EDGE, chan})); + _type_in.emplace_back(property_t<std::string>( + PROP_KEY_TYPE, otw_format, {res_source_info::INPUT_EDGE, chan})); + + // Give us some shorthands for the rest of this function + property_t<double>* scaling_in = &_scaling_in.back(); + property_t<double>* samp_rate_in = &_samp_rate_in.back(); + property_t<double>* tick_rate_in = &_tick_rate_in.back(); + property_t<std::string>* type_in = &_type_in.back(); + + // Register them + register_property(scaling_in); + register_property(samp_rate_in); + register_property(tick_rate_in); + register_property(type_in); + + // Add resolvers + add_property_resolver({scaling_in}, {}, + [&scaling_in = *scaling_in, chan, this]() { + RFNOC_LOG_TRACE("Calling resolver for `scaling_in'@" << chan); + if (scaling_in.is_valid()) { + this->set_scale_factor(chan, scaling_in.get() / 32767.0); + } + }); + + add_property_resolver({samp_rate_in}, {}, + [&samp_rate_in = *samp_rate_in, chan, this]() { + RFNOC_LOG_TRACE("Calling resolver for `samp_rate_in'@" << chan); + if (samp_rate_in.is_valid()) { + this->set_samp_rate(samp_rate_in.get()); + } + }); + + add_property_resolver({tick_rate_in}, {}, + [&tick_rate_in = *tick_rate_in, chan, this]() { + RFNOC_LOG_TRACE("Calling resolver for `tick_rate_in'@" << chan); + if (tick_rate_in.is_valid()) { + this->set_tick_rate(tick_rate_in.get()); + } + }); +} diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp new file mode 100644 index 000000000..82feeaf1f --- /dev/null +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -0,0 +1,124 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/defaults.hpp> +#include <uhdlib/rfnoc/node_accessor.hpp> +#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp> +#include <atomic> + +using namespace uhd; +using namespace uhd::rfnoc; + +const std::string STREAMER_ID = "TxStreamer"; +static std::atomic<uint64_t> streamer_inst_ctr; + +rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, + const uhd::stream_args_t stream_args) + : tx_streamer_impl<chdr_tx_data_xport>(num_chans, stream_args) + , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) + , _stream_args(stream_args) +{ + // No block to which to forward properties or actions + set_prop_forwarding_policy(forwarding_policy_t::DROP); + set_action_forwarding_policy(forwarding_policy_t::DROP); + + // Initialize properties + _scaling_out.reserve(num_chans); + _samp_rate_out.reserve(num_chans); + _tick_rate_out.reserve(num_chans); + _type_out.reserve(num_chans); + + for (size_t i = 0; i < num_chans; i++) { + _register_props(i, stream_args.otw_format); + } + + node_accessor_t node_accessor; + node_accessor.init_props(this); +} + +std::string rfnoc_tx_streamer::get_unique_id() const +{ + return _unique_id; +} + +size_t rfnoc_tx_streamer::get_num_input_ports() const +{ + return 0; +} + +size_t rfnoc_tx_streamer::get_num_output_ports() const +{ + return get_num_channels(); +} + +const uhd::stream_args_t& rfnoc_tx_streamer::get_stream_args() const +{ + return _stream_args; +} + +bool rfnoc_tx_streamer::check_topology( + const std::vector<size_t>& connected_inputs, + const std::vector<size_t>& connected_outputs) +{ + // Check that all channels are connected + if (connected_outputs.size() != get_num_output_ports()) { + return false; + } + + // Call base class to check that connections are valid + return node_t::check_topology(connected_inputs, connected_outputs); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, + const std::string& otw_format) +{ + // Create actual properties and store them + _scaling_out.push_back(property_t<double>( + PROP_KEY_SCALING, {res_source_info::OUTPUT_EDGE, chan})); + _samp_rate_out.push_back(property_t<double>( + PROP_KEY_SAMP_RATE, {res_source_info::OUTPUT_EDGE, chan})); + _tick_rate_out.push_back(property_t<double>( + PROP_KEY_TICK_RATE, {res_source_info::OUTPUT_EDGE, chan})); + _type_out.emplace_back(property_t<std::string>( + PROP_KEY_TYPE, otw_format, {res_source_info::OUTPUT_EDGE, chan})); + + // Give us some shorthands for the rest of this function + property_t<double>* scaling_out = &_scaling_out.back(); + property_t<double>* samp_rate_out = &_samp_rate_out.back(); + property_t<double>* tick_rate_out = &_tick_rate_out.back(); + property_t<std::string>* type_out = &_type_out.back(); + + // Register them + register_property(scaling_out); + register_property(samp_rate_out); + register_property(tick_rate_out); + register_property(type_out); + + // Add resolvers + add_property_resolver({scaling_out}, {}, + [&scaling_out = *scaling_out, chan, this]() { + RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan); + if (scaling_out.is_valid()) { + this->set_scale_factor(chan, 32767.0 / scaling_out.get()); + } + }); + + add_property_resolver({samp_rate_out}, {}, + [&samp_rate_out = *samp_rate_out, chan, this]() { + RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan); + if (samp_rate_out.is_valid()) { + this->set_samp_rate(samp_rate_out.get()); + } + }); + + add_property_resolver({tick_rate_out}, {}, + [&tick_rate_out = *tick_rate_out, chan, this]() { + RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan); + if (tick_rate_out.is_valid()) { + this->set_tick_rate(tick_rate_out.get()); + } + }); +} diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index d6ad6d777..1cdb42b96 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -55,6 +55,8 @@ set(test_sources fe_conn_test.cpp rfnoc_node_test.cpp link_test.cpp + rx_streamer_test.cpp + tx_streamer_test.cpp ) set(benchmark_sources diff --git a/host/tests/common/mock_link.hpp b/host/tests/common/mock_link.hpp index 34ea15540..73a65916c 100644 --- a/host/tests/common/mock_link.hpp +++ b/host/tests/common/mock_link.hpp @@ -94,6 +94,14 @@ public: } /*! + * Return the number of packets stored in the mock link. + */ + size_t get_num_packets() const + { + return _tx_mems.size(); + } + + /*! * Retrieve the contents of a packet sent by the link. The link * stores packets in a queue in the order they were sent. */ diff --git a/host/tests/rfnoc_chdr_test.cpp b/host/tests/rfnoc_chdr_test.cpp index 417ed2f96..1c63d5976 100644 --- a/host/tests/rfnoc_chdr_test.cpp +++ b/host/tests/rfnoc_chdr_test.cpp @@ -222,3 +222,49 @@ BOOST_AUTO_TEST_CASE(chdr_strc_packet_no_swap_64) std::cout << pyld.to_string(); } } + +BOOST_AUTO_TEST_CASE(chdr_generic_packet_calculate_pyld_offset_64) +{ + // Check calculation without timestamp + auto test_pyld_offset = [](chdr_packet::uptr& pkt, + const packet_type_t pkt_type, + const size_t num_mdata) + { + uint64_t buff[MAX_BUF_SIZE_WORDS]; + chdr_header header; + header.set_pkt_type(pkt_type); + header.set_num_mdata(num_mdata); + + pkt->refresh(reinterpret_cast<void*>(buff), header, 0); + + const size_t pyld_offset = pkt->calculate_payload_offset( + pkt_type, num_mdata); + + void* pyld_ptr = pkt->get_payload_ptr(); + + const size_t non_pyld_bytes = static_cast<size_t>( + reinterpret_cast<uint8_t*>(pyld_ptr) - + reinterpret_cast<uint8_t*>(buff)); + + BOOST_CHECK(pyld_offset == non_pyld_bytes); + }; + + { + chdr_packet::uptr pkt = chdr64_be_factory.make_generic(); + test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0); + test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1); + test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2); + test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0); + test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1); + test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2); + } + { + chdr_packet::uptr pkt = chdr256_be_factory.make_generic(); + test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0); + test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1); + test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2); + test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0); + test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1); + test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2); + } +} diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp new file mode 100644 index 000000000..cd4daf569 --- /dev/null +++ b/host/tests/rx_streamer_test.cpp @@ -0,0 +1,744 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Contents of mock packet header + */ +struct mock_header_t +{ + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + bool ignore_seq = true; + size_t seq_num = 0; +}; + +/*! + * Mock rx data xport which doesn't use I/O service, and just interacts with + * the link directly. + */ +class mock_rx_data_xport +{ +public: + using uptr = std::unique_ptr<mock_rx_data_xport>; + using buff_t = uhd::transport::frame_buff; + + //! Values extracted from received RX data packets + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + const void* payload = nullptr; + }; + + mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {} + + std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff( + const int32_t timeout_ms) + { + frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms); + mock_header_t header = *(reinterpret_cast<mock_header_t*>(buff->data())); + + packet_info_t info; + info.eob = header.eob; + info.has_tsf = header.has_tsf; + info.tsf = header.tsf; + info.payload_bytes = header.payload_bytes; + info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t); + + const uint8_t* pkt_end = + reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); + const size_t pyld_pkt_len = + pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + + if (pyld_pkt_len < info.payload_bytes) { + _recv_link->release_recv_buff(std::move(buff)); + throw uhd::value_error("Bad header or invalid packet length."); + } + + const bool seq_match = header.seq_num == _seq_num; + const bool seq_error = !header.ignore_seq && !seq_match; + _seq_num = header.seq_num + 1; + + return std::make_tuple(std::move(buff), info, seq_error); + } + + void release_recv_buff(frame_buff::uptr buff) + { + _recv_link->release_recv_buff(std::move(buff)); + } + + size_t get_max_payload_size() const + { + return _recv_link->get_recv_frame_size() - sizeof(packet_info_t); + } + +private: + mock_recv_link::sptr _recv_link; + size_t _seq_num = 0; +}; + +/*! + * Mock rx streamer for testing + */ +class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport> +{ +public: + mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) + : rx_streamer_impl(num_chans, stream_args) + { + } + + void issue_stream_cmd(const stream_cmd_t&) {} + + void set_tick_rate(double rate) + { + rx_streamer_impl::set_tick_rate(rate); + } + + void set_samp_rate(double rate) + { + rx_streamer_impl::set_samp_rate(rate); + } + + void set_scale_factor(const size_t chan, const double scale_factor) + { + rx_streamer_impl::set_scale_factor(chan, scale_factor); + } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using rx_streamer = rx_streamer_impl<mock_rx_data_xport>; + +static const double TICK_RATE = 100e6; +static const double SAMP_RATE = 10e6; +static const size_t FRAME_SIZE = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_recv_link::sptr> make_links(const size_t num) +{ + const mock_recv_link::link_params params = {FRAME_SIZE, 1}; + + std::vector<mock_recv_link::sptr> links; + + for (size_t i = 0; i < num; i++) { + links.push_back(std::make_shared<mock_recv_link>(params)); + } + + return links; +} + +static boost::shared_ptr<mock_rx_streamer> make_rx_streamer( + std::vector<mock_recv_link::sptr> recv_links, + const std::string& host_format, + const std::string& otw_format = "sc16") +{ + const uhd::stream_args_t stream_args(host_format, otw_format); + auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args); + streamer->set_tick_rate(TICK_RATE); + streamer->set_samp_rate(SAMP_RATE); + + for (size_t i = 0; i < recv_links.size(); i++) { + mock_rx_data_xport::uptr xport( + std::make_unique<mock_rx_data_xport>(recv_links[i])); + + streamer->set_scale_factor(i, SCALE_FACTOR); + streamer->connect_channel(i, std::move(xport)); + } + + return streamer; +} + +static void push_back_recv_packet(mock_recv_link::sptr recv_link, + mock_header_t header, + size_t num_samps, + uint16_t start_data = 0) +{ + // Allocate buffer + const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>); + const size_t buff_len = sizeof(header) + pyld_bytes; + boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + + // Write header to buffer + header.payload_bytes = pyld_bytes; + *(reinterpret_cast<mock_header_t*>(data.get())) = header; + + // Write data to buffer + auto data_ptr = + reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header)); + + for (size_t i = 0; i < num_samps; i++) { + uint16_t val = (start_data + i) * 2; + data_ptr[i] = std::complex<uint16_t>(val, val + 1); + } + + // Push back buffer for link to recv + recv_link->push_back_recv_packet(data, buff_len); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet) +{ + const size_t NUM_PKTS_TO_TEST = 5; + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + const bool even_iteration = (i % 2 == 0); + const bool odd_iteration = (i % 2 != 0); + mock_header_t header; + header.eob = even_iteration; + header.has_tsf = odd_iteration; + header.tsf = i; + push_back_recv_packet(recv_links[0], header, num_samps); + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); + BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + + for (size_t j = 0; j < num_samps; j++) { + const auto value = + std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, buff[j]); + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet) +{ + const size_t NUM_BUFFS_TO_TEST = 5; + const std::string format("fc64"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * 4; + std::vector<std::complex<double>> buff(num_samps); + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.tsf = i; + + size_t samps_written = 0; + while (samps_written < num_samps) { + size_t samps_to_write = std::min(num_samps - samps_written, spp); + push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written); + samps_written += samps_to_write; + } + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, false); + BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + + for (size_t j = 0; j < num_samps; j++) { + const auto value = + std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, buff[j]); + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob) +{ + // EOB should terminate a multi-packet recv, test that it does + const std::string format("sc16"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_packets = 4; + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * num_packets; + std::vector<std::complex<double>> buff(num_samps); + uhd::rx_metadata_t metadata; + + // Queue 4 packets, with eob set in every other packet + for (size_t i = 0; i < num_packets; i++) { + mock_header_t header; + header.has_tsf = false; + header.eob = (i % 2) != 0; + push_back_recv_packet(recv_links[0], header, spp); + } + + // Now call recv and check that eob terminates a recv call + for (size_t i = 0; i < num_packets / 2; i++) { + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, spp * 2); + BOOST_CHECK_EQUAL(metadata.end_of_burst, true); + BOOST_CHECK_EQUAL(metadata.has_time_spec, false); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet) +{ + const size_t NUM_PKTS_TO_TEST = 5; + const std::string format("sc16"); + + const size_t num_chans = 2; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + + std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + const bool even_iteration = (i % 2 == 0); + const bool odd_iteration = (i % 2 != 0); + mock_header_t header; + header.eob = even_iteration; + header.has_tsf = odd_iteration; + header.tsf = i; + + size_t samps_pushed = 0; + for (size_t ch = 0; ch < num_chans; ch++) { + push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed); + samps_pushed += num_samps; + } + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); + BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + + size_t samps_checked = 0; + for (size_t ch = 0; ch < num_chans; ch++) { + for (size_t samp = 0; samp < num_samps; samp++) { + const size_t n = samps_checked + samp; + const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1)); + BOOST_CHECK_EQUAL(value, buffer[ch][samp]); + } + samps_checked += num_samps; + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment) +{ + const size_t NUM_PKTS_TO_TEST = 5; + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + // Push back five packets, then read them 1/4 of a packet at a time + const size_t spp = streamer->get_max_num_samps(); + const size_t reads_per_packet = 4; + const size_t num_samps = spp / reads_per_packet; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + mock_header_t header; + header.eob = true; + header.has_tsf = true; + header.tsf = 0; + push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet); + } + + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + std::cout << "receiving packet " << i << std::endl; + + size_t total_samps_read = 0; + for (size_t j = 0; j < reads_per_packet; j++) { + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + BOOST_CHECK_EQUAL(metadata.end_of_burst, true); + BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1); + BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read); + + const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE); + const size_t expected_ticks = ticks_per_sample * total_samps_read; + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks); + + for (size_t samp = 0; samp < num_samps; samp++) { + const size_t pkt_idx = samp + total_samps_read; + const auto value = std::complex<float>( + (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, buff[samp]); + } + + total_samps_read += num_samps_ret; + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_seq_error) +{ + // Test that when we get a sequence error the error is returned in the + // metadata with a time spec that corresponds to the time spec of the + // last sample in the previous packet plus one sample clock. Test that + // the packet that causes the sequence error is not discarded. + const size_t NUM_PKTS_TO_TEST = 2; + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + size_t seq_num = 0; + size_t tsf = 0; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.ignore_seq = false; + + // Push back three packets but skip a seq_num after the second + header.seq_num = seq_num++; + header.tsf = tsf; + push_back_recv_packet(recv_links[0], header, num_samps); + + tsf += num_samps; + header.seq_num = seq_num++; + header.tsf = tsf; + push_back_recv_packet(recv_links[0], header, num_samps); + + seq_num++; // dropped packet + tsf += num_samps; + + header.seq_num = seq_num++; + header.tsf = tsf; + push_back_recv_packet(recv_links[0], header, num_samps); + + // First two reads should succeed + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + size_t prev_tsf = metadata.time_spec.to_ticks(TICK_RATE); + size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE); + + // Third read should be a sequence error + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE); + BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf); + + // Next read should succeed + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, false); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_bad_packet) +{ + // Test that when we receive a packet with invalid chdr header or length + // the streamer returns the correct error in meatadata. + auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) { + mock_header_t header; + header.payload_bytes = 1000; + + // Allocate a buffer that is too small for the payload + const size_t buff_len = 100; + boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + + // Write header to buffer + *(reinterpret_cast<mock_header_t*>(data.get())) = header; + + // Push back buffer for link to recv + recv_link->push_back_recv_packet(data, buff_len); + }; + + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + + mock_header_t header; + + // Push back a regular packet + push_back_recv_packet(recv_links[0], header, num_samps); + + // Push back a bad packet + push_back_bad_packet(recv_links[0]); + + // Push back another regular packet + push_back_recv_packet(recv_links[0], header, num_samps); + + // First read should succeed + size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + + // Second read should be an error + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET); + + // Third read should succeed + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf) +{ + // Test that we can receive packets without tsf. Start by pushing + // a packet with a tsf followed by a few packets without. + const size_t NUM_PKTS_TO_TEST = 6; + const std::string format("fc64"); + + const size_t num_chans = 10; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 21; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + mock_header_t header; + header.eob = (i == NUM_PKTS_TO_TEST - 1); + header.has_tsf = (i == 0); + header.tsf = 500; + + for (size_t ch = 0; ch < num_chans; ch++) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1); + BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error) +{ + // Test that the streamer handles dropped packets correctly by injecting + // a sequence error in one channel. The streamer should discard + // corresponding packets from all other channels. + const std::string format("fc64"); + + const size_t num_chans = 100; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 99; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + for (size_t ch = 0; ch < num_chans; ch++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.tsf = 0; + header.ignore_seq = false; + header.seq_num = 0; + + // Drop a packet from an arbitrary channel right at the start + if (ch != num_chans / 2) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + // Add a regular packet to check the streamer drops the first + header.seq_num++; + header.tsf++; + push_back_recv_packet(recv_links[ch], header, num_samps); + + // Drop a packet from the first channel + header.seq_num++; + header.tsf++; + if (ch != 0) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + // Add a regular packet + header.seq_num++; + header.tsf++; + push_back_recv_packet(recv_links[ch], header, num_samps); + + // Drop a few packets from the last channel + for (size_t j = 0; j < 10; j++) { + header.seq_num++; + header.tsf++; + if (ch != num_chans - 1) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + } + + // Add a regular packet + header.seq_num++; + header.tsf++; + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + uhd::rx_metadata_t metadata; + + // First recv should result in error + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + + // Packet with tsf == 1 should be returned next + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1); + + // Next recv should result in error + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + + // Packet with tsf == 3 should be returned next + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3); + + // Next recv should result in error + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + + // Packet with tsf == 14 should be returned next + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14); +} + +BOOST_AUTO_TEST_CASE(test_recv_alignment_error) +{ + // Test that the alignment procedure returns an alignment error if it can't + // time align packets. + const std::string format("fc64"); + + const size_t num_chans = 4; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 2; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + uhd::rx_metadata_t metadata; + + mock_header_t header; + header.eob = true; + header.has_tsf = true; + header.tsf = 500; + + for (size_t ch = 0; ch < num_chans; ch++) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, true); + BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + + for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) { + header.tsf = header.tsf + num_samps; + for (size_t ch = 0; ch < num_chans; ch++) { + if (ch == num_chans - 1) { + // Misalign this time stamp + header.tsf += 1; + } + push_back_recv_packet(recv_links[ch], header, num_samps); + } + } + + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT); +} diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp new file mode 100644 index 000000000..cb07cffad --- /dev/null +++ b/host/tests/tx_streamer_test.cpp @@ -0,0 +1,393 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Mock tx data xport which doesn't use I/O service, and just interacts with + * the link directly. Transport copies packet info directly into the frame + * buffer. + */ +class mock_tx_data_xport +{ +public: + using uptr = std::unique_ptr<mock_tx_data_xport>; + using buff_t = uhd::transport::frame_buff; + + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + }; + + mock_tx_data_xport(mock_send_link::sptr send_link) : _send_link(send_link) {} + + buff_t::uptr get_send_buff(const int32_t timeout_ms) + { + return _send_link->get_send_buff(timeout_ms); + } + + std::pair<void*, size_t> write_packet_header( + buff_t::uptr& buff, const packet_info_t& info) + { + uint8_t* data = static_cast<uint8_t*>(buff->data()); + *(reinterpret_cast<packet_info_t*>(data)) = info; + return std::make_pair(data + sizeof(info), sizeof(info) + info.payload_bytes); + } + + void release_send_buff(buff_t::uptr buff) + { + _send_link->release_send_buff(std::move(buff)); + } + + size_t get_max_payload_size() const + { + return _send_link->get_send_frame_size() - sizeof(packet_info_t); + ; + } + +private: + mock_send_link::sptr _send_link; +}; + +/*! + * Mock tx streamer for testing + */ +class mock_tx_streamer : public tx_streamer_impl<mock_tx_data_xport> +{ +public: + mock_tx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) + : tx_streamer_impl(num_chans, stream_args) + { + } + + void set_tick_rate(double rate) + { + tx_streamer_impl::set_tick_rate(rate); + } + + void set_samp_rate(double rate) + { + tx_streamer_impl::set_samp_rate(rate); + } + + void set_scale_factor(const size_t chan, const double scale_factor) + { + tx_streamer_impl::set_scale_factor(chan, scale_factor); + } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using tx_streamer = tx_streamer_impl<mock_tx_data_xport>; + +static const double TICK_RATE = 100e6; +static const double SAMP_RATE = 10e6; +static const size_t FRAME_SIZE = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_send_link::sptr> make_links(const size_t num) +{ + const mock_send_link::link_params params = {FRAME_SIZE, 1}; + + std::vector<mock_send_link::sptr> links; + + for (size_t i = 0; i < num; i++) { + links.push_back(std::make_shared<mock_send_link>(params)); + } + + return links; +} + +static boost::shared_ptr<mock_tx_streamer> make_tx_streamer( + std::vector<mock_send_link::sptr> send_links, const std::string& format) +{ + const uhd::stream_args_t stream_args(format, "sc16"); + auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); + streamer->set_tick_rate(TICK_RATE); + streamer->set_samp_rate(SAMP_RATE); + + for (size_t i = 0; i < send_links.size(); i++) { + mock_tx_data_xport::uptr xport( + std::make_unique<mock_tx_data_xport>(send_links[i])); + + streamer->set_scale_factor(i, SCALE_FACTOR); + streamer->connect_channel(i, std::move(xport)); + } + + return streamer; +} + +std::tuple<mock_tx_data_xport::packet_info_t, std::complex<uint16_t>*, size_t, boost::shared_array<uint8_t>> +pop_send_packet(mock_send_link::sptr send_link) +{ + auto packet = send_link->pop_send_packet(); + + const size_t packet_samps = + (packet.second - sizeof(mock_tx_data_xport::packet_info_t)) + / sizeof(std::complex<uint16_t>); + + uint8_t* buff_ptr = packet.first.get(); + auto info = *(reinterpret_cast<mock_tx_data_xport::packet_info_t*>(buff_ptr)); + + std::complex<uint16_t>* data = reinterpret_cast<std::complex<uint16_t>*>( + buff_ptr + sizeof(mock_tx_data_xport::packet_info_t)); + + return std::make_tuple(info, data, packet_samps, packet.first); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet) +{ + const size_t NUM_PKTS_TO_TEST = 30; + const std::string format("fc32"); + + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, format); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer and write data + std::vector<std::complex<float>> buff(20); + for (size_t i = 0; i < buff.size(); i++) { + buff[i] = std::complex<float>(i * 2, i * 2 + 1); + } + + // Send packets and check data + size_t num_accum_samps = 0; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + std::cout << "sending packet " << i << std::endl; + + // Vary num_samps for each packet + const size_t num_samps = 10 + i % 10; + metadata.end_of_burst = (i == NUM_PKTS_TO_TEST - 1); + const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); + BOOST_CHECK_EQUAL(num_sent, num_samps); + metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + + mock_tx_data_xport::packet_info_t info; + std::complex<uint16_t>* data; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); + BOOST_CHECK_EQUAL(num_samps, packet_samps); + + // Check data + for (size_t j = 0; j < num_samps; j++) { + const std::complex<uint16_t> value( + (j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, data[j]); + } + + BOOST_CHECK_EQUAL(num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); + BOOST_CHECK(info.has_tsf); + BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); + BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1); + num_accum_samps += num_samps; + } +} + +BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet) +{ + const size_t NUM_BUFFS_TO_TEST = 5; + const std::string format("fc64"); + + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, format); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer and write data + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * 4; + std::vector<std::complex<double>> buff(num_samps); + for (size_t i = 0; i < buff.size(); i++) { + buff[i] = std::complex<double>(i * 2, i * 2 + 1); + } + + // Send packets and check data + size_t num_accum_samps = 0; + + for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { + std::cout << "sending packet " << i << std::endl; + + metadata.end_of_burst = true; + const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); + BOOST_CHECK_EQUAL(num_sent, num_samps); + metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + + size_t samps_checked = 0; + + while (samps_checked < num_samps) { + mock_tx_data_xport::packet_info_t info; + std::complex<uint16_t>* data; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); + + for (size_t j = 0; j < packet_samps; j++) { + const size_t n = j + samps_checked; + const std::complex<uint16_t> value( + (n * 2) * SCALE_FACTOR, (n * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, data[j]); + } + + BOOST_CHECK_EQUAL( + packet_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); + BOOST_CHECK(info.has_tsf); + BOOST_CHECK_EQUAL( + info.tsf, (num_accum_samps + samps_checked) * TICK_RATE / SAMP_RATE); + samps_checked += packet_samps; + + BOOST_CHECK_EQUAL(info.eob, samps_checked == num_samps); + } + + BOOST_CHECK_EQUAL(samps_checked, num_samps); + num_accum_samps += samps_checked; + } +} + +BOOST_AUTO_TEST_CASE(test_send_two_channel_one_packet) +{ + const size_t NUM_PKTS_TO_TEST = 30; + const std::string format("sc16"); + + auto send_links = make_links(2); + auto streamer = make_tx_streamer(send_links, format); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer and write data + std::vector<std::complex<uint16_t>> buff(20); + for (size_t i = 0; i < buff.size(); i++) { + buff[i] = std::complex<uint16_t>(i * 2, i * 2 + 1); + } + std::vector<void*> buffs; + for (size_t ch = 0; ch < 2; ch++) { + buffs.push_back(buff.data()); // same buffer for each channel + } + + // Send packets and check data + size_t num_accum_samps = 0; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + std::cout << "sending packet " << i << std::endl; + + // Vary num_samps for each packet + const size_t num_samps = 10 + i % 10; + metadata.end_of_burst = (i == NUM_PKTS_TO_TEST - 1); + const size_t num_sent = streamer->send(buffs, num_samps, metadata, 1.0); + BOOST_CHECK_EQUAL(num_sent, num_samps); + metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + + for (size_t ch = 0; ch < 2; ch++) { + mock_tx_data_xport::packet_info_t info; + std::complex<uint16_t>* data; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[ch]); + BOOST_CHECK_EQUAL(num_samps, packet_samps); + + // Check data + for (size_t j = 0; j < num_samps; j++) { + const std::complex<uint16_t> value((j * 2), (j * 2 + 1)); + BOOST_CHECK_EQUAL(value, data[j]); + } + + BOOST_CHECK_EQUAL( + num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); + BOOST_CHECK(info.has_tsf); + BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); + BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1); + } + num_accum_samps += num_samps; + } +} + +BOOST_AUTO_TEST_CASE(test_meta_data_cache) +{ + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, "fc32"); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.start_of_burst = true; + metadata.end_of_burst = true; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer and write data + std::vector<std::complex<float>> buff(20); + + size_t num_sent = streamer->send(buff.data(), 0, metadata, 1.0); + BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), 0); + BOOST_CHECK_EQUAL(num_sent, 0); + uhd::tx_metadata_t metadata2; + num_sent = streamer->send(buff.data(), 10, metadata2, 1.0); + + mock_tx_data_xport::packet_info_t info; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + std::tie(info, std::ignore, packet_samps, frame_buff) = pop_send_packet(send_links[0]); + BOOST_CHECK_EQUAL(packet_samps, num_sent); + BOOST_CHECK(info.has_tsf); + BOOST_CHECK(info.eob); +} + +BOOST_AUTO_TEST_CASE(test_spp) +{ + // Test the spp calculation when it is limited by the stream args + { + auto send_links = make_links(1); + uhd::stream_args_t stream_args("fc64", "sc16"); + stream_args.args["spp"] = std::to_string(10); + auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); + mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0])); + streamer->connect_channel(0, std::move(xport)); + BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), 10); + } + + // Test the spp calculation when it is limited by the frame size + { + auto send_links = make_links(1); + uhd::stream_args_t stream_args("fc64", "sc16"); + stream_args.args["spp"] = std::to_string(10000); + auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); + mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0])); + const size_t max_pyld = xport->get_max_payload_size(); + streamer->connect_channel(0, std::move(xport)); + BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), max_pyld / sizeof(std::complex<uint16_t>)); + } +} |