diff options
Diffstat (limited to 'host/lib/include')
17 files changed, 2707 insertions, 36 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp index 770c6cf6f..cc729de6c 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_packet.hpp @@ -114,6 +114,15 @@ public: */ virtual void* get_payload_ptr() = 0; + /*! Return the payload offset in bytes for a given type and num_mdata + * + * \param pkt_type The packet type for calculation + * \param num_mdata The number of metadata words for calculation + * \return The offset of the payload in a packet with the given params + */ + virtual size_t calculate_payload_offset(const packet_type_t pkt_type, + const uint8_t num_mdata = 0) const = 0; + //! Shortcut to return the const metadata pointer cast as a specific type template <typename data_t> inline const data_t* get_mdata_const_ptr_as() const diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp new file mode 100644 index 000000000..69dceebe1 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -0,0 +1,481 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP + +#include <uhd/config.hpp> +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send rx flow control responses + */ +class rx_flow_ctrl_sender +{ +public: + //! Constructor + rx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.first) + { + _fc_packet = pkt_factory.make_strs(); + _fc_strs_pyld.src_epid = sep_ids.second; + } + + /*! Configure buffer capacity + * \param recv_capacity The buffer capacity of the receive link + */ + void set_capacity(const stream_buff_params_t& recv_capacity) + { + _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; + _fc_strs_pyld.capacity_pkts = recv_capacity.packets; + } + + /*! Send a flow control response packet + * + * \param send_link the link to use to send the packet + * \counts transfer counts for packet contents + */ + void send_strs(transport::send_link_if* send_link, const stream_buff_params_t& counts) + { + auto buff = send_link->get_send_buff(0); + if (!buff) { + throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); + } + + chdr::chdr_header header; + header.set_seq_num(_fc_seq_num++); + header.set_dst_epid(_dst_epid); + + chdr::strs_payload fc_payload(_fc_strs_pyld); + fc_payload.xfer_count_bytes = counts.bytes; + fc_payload.xfer_count_pkts = counts.packets; + + _fc_packet->refresh(buff->data(), header, fc_payload); + const size_t size = header.get_length(); + + buff->set_packet_size(size); + send_link->release_send_buff(std::move(buff)); + } + +private: + // Endpoint ID for recipient of flow control response + const sep_id_t _dst_epid; + + // Packet for writing flow control info + chdr::chdr_strs_packet::uptr _fc_packet; + + // Pre-configured strs payload to hold values that don't change + chdr::strs_payload _fc_strs_pyld; + + // Sequence number for flow control packets + uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for RX chdr data + * + * This transport provides the streamer an interface to read RX data packets. + * The transport implements flow control and sequence number checking. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For an rx transport, the device sends data packets, and the host sends strs + * packets letting the device know that buffer space in the host has been freed. + * For lossy links, the device also sends strc packets to resynchronize the + * transfer counts between host and device, to correct for any dropped packets + * in the link. + */ +class chdr_rx_data_xport +{ +public: + using uptr = std::unique_ptr<chdr_rx_data_xport>; + using buff_t = transport::frame_buff; + + //! Values extracted from received RX data packets + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + const void* payload = nullptr; + }; + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param addrs Source and destination addresses + * \param epids Source and destination endpoint IDs + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param num_recv_frames Num frames to reserve from the recv link + * \param recv_capacity Total capacity of the recv link + * \param fc_freq Frequency of flow control status messages + * \param fc_headroom Headroom for flow control status messages + * \param lossy_xport Whether the xport is lossy, for flow control configuration + */ + chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_addr_pair_t& addrs, + const uhd::rfnoc::sep_id_pair_t& epids, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt, + const size_t num_recv_frames, + const stream_buff_params_t& recv_capacity, + const stream_buff_params_t& fc_freq, + const stream_buff_params_t& fc_headroom, + const bool lossy_xport) + : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second) + { + const sep_addr_t remote_sep_addr = addrs.first; + const sep_addr_t local_sep_addr = addrs.second; + const sep_id_t remote_epid = epids.first; + const sep_id_t local_epid = epids.second; + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Creating rx xport with local epid=" << local_epid + << ", remote epid=" << remote_epid); + + _recv_packet = pkt_factory.make_generic(); + _fc_sender.set_capacity(recv_capacity); + + // Calculate max payload size + const size_t pyld_offset = + _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + + // Make data transport + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + auto fc_cb = [this](buff_t::uptr buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + this->_fc_callback(std::move(buff), recv_link, send_link); + }; + + // Needs just a single send frame for responses + _recv_io = io_srv->make_recv_client(recv_link, + num_recv_frames, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + // Create a control transport with the rx data links to send mgmt packets + // needed to setup the stream + // Piggyback on frames from the recv_io_if + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + local_epid, + 0, // num_send_frames + 0); // num_recv_frames + + // Create new temporary management portal with the transports used for this stream + // TODO: This is a bit excessive. Maybe we can pare down the functionality of the + // portal just for route setup purposes. Whatever we do, we *must* use xport in it + // though otherwise the transport will not behave correctly. + auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( + *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + + // Setup a route to the EPID + // Note that this may be gratuitous--The endpoint may already have been set up + data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); + data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + + // Initialize flow control - management portal sends a stream command + // containing its requested flow control frequency, the rx transport + // responds with a stream status containing its buffer capacity. + data_mgmt_portal->config_local_rx_stream_start(*ctrl_xport, + remote_epid, + lossy_xport, + pyld_buff_fmt, + mdata_buff_fmt, + fc_freq, + fc_headroom); + + data_mgmt_portal->config_local_rx_stream_commit(*ctrl_xport, remote_epid); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Stream endpoint was configured with:" + << std::endl + << "capacity bytes=" << recv_capacity.bytes + << ", packets=" << recv_capacity.packets << std::endl + << "fc headroom bytes=" << fc_headroom.bytes + << ", packets=" << fc_headroom.packets << std::endl + << "fc frequency bytes=" << fc_freq.bytes + << ", packets=" << fc_freq.packets); + + // We no longer need the control xport and mgmt_portal, release them so + // the control xport is no longer connected to the I/O service. + data_mgmt_portal.reset(); + ctrl_xport.reset(); + } + + /*! Returns maximum number payload bytes + * + * \return maximum payload bytes per packet + */ + size_t get_max_payload_size() const + { + return _max_payload_size; + } + + /*! + * Gets an RX frame buffer containing a recv packet + * + * \param timeout_ms timeout in milliseconds + * \return returns a tuple containing: + * - a frame_buff, or null if timeout occurs + * - info struct corresponding to the packet + * - whether the packet was out of sequence + */ + std::tuple<typename buff_t::uptr, packet_info_t, bool> get_recv_buff( + const int32_t timeout_ms) + { + buff_t::uptr buff = _recv_io->get_recv_buff(timeout_ms); + + if (!buff) { + return std::make_tuple(typename buff_t::uptr(), packet_info_t(), false); + } + + auto info = _read_data_packet_info(buff); + bool seq_error = _is_out_of_sequence(std::get<1>(info)); + + return std::make_tuple(std::move(buff), std::get<0>(info), seq_error); + } + + /*! + * Releases an RX frame buffer + * + * \param buff the frame buffer to release + */ + void release_recv_buff(typename buff_t::uptr buff) + { + _recv_io->release_recv_buff(std::move(buff)); + } + +private: + /*! + * Recv callback for I/O service + * + * The I/O service invokes this callback when it reads a packet from the + * recv link. + * + * \param buff the frame buffer containing the packet data + * \param recv_link the recv link from which buff was read + * \param send_link the send link for flow control messages + */ + bool _recv_callback(buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) + { + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + const auto packet_size = buff->packet_size(); + + if (dst_epid != _epid) { + return false; + } + + if (type == chdr::PKT_TYPE_STRC) { + chdr::strc_payload strc; + strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), + _recv_packet->get_payload_size() / sizeof(uint64_t), + _recv_packet->conv_to_host<uint64_t>()); + + const stream_buff_params_t strc_counts = { + strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}; + + if (strc.op_code == chdr::STRC_RESYNC) { + // Resynchronize before updating fc_state, the strc payload + // contains counts before the strc packet itself + _fc_state.resynchronize(strc_counts); + + // Update state that we received a packet + _fc_state.data_received(packet_size); + + recv_link->release_recv_buff(std::move(buff)); + buff = buff_t::uptr(); + _fc_state.xfer_done(packet_size); + _send_fc_response(send_link); + } else if (strc.op_code == chdr::STRC_INIT) { + _fc_state.initialize( + {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Received strc init with fc freq" + << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + + // Make sure flow control was initialized + assert(_fc_state.get_fc_freq().bytes > 0); + assert(_fc_state.get_fc_freq().packets > 0); + + // Send a strs response to configure flow control on the sender + _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); + + // Reset counts, since mgmt_portal will do it to FPGA + _fc_state.reset_counts(); + + recv_link->release_recv_buff(std::move(buff)); + buff = buff_t::uptr(); + } else { + throw uhd::value_error("Unexpected opcode value in STRC packet."); + } + + // For stream commands, we return true (packet was destined to this + // client) but release the buffer. The I/O service won't queue this + // packet in the recv_io_if. + return true; + + } else if (type == chdr::PKT_TYPE_DATA_NO_TS + || type == chdr::PKT_TYPE_DATA_WITH_TS) { + // Update state that we received a packet + _fc_state.data_received(packet_size); + + // If this is a data packet, just claim it by returning true. The + // I/O service will queue this packet in the recv_io_if. + return true; + + } else { + return false; + } + } + + /*! + * Flow control callback for I/O service + * + * The I/O service invokes this callback when a packet needs to be released + * to the recv link. + * + * \param buff the frame buffer containing the packet data + * \param recv_link the recv link to which to release the buffer + * \param send_link the send link for flow control messages + */ + void _fc_callback(buff_t::uptr buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) + { + const size_t packet_size = buff->packet_size(); + recv_link->release_recv_buff(std::move(buff)); + _fc_state.xfer_done(packet_size); + _send_fc_response(send_link); + } + + /*! + * Sends a flow control response packet if necessary. + * + * \param send_link the send link for flow control messages + */ + void _send_fc_response(transport::send_link_if* send_link) + { + if (_fc_state.fc_resp_due()) { + _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); + _fc_state.fc_resp_sent(); + } + } + + /*! + * Checks if the sequence number is out of sequence, prints 'D' if it is + * and returns result of check. + * + * \return true if a sequence error occurred + */ + UHD_FORCE_INLINE bool _is_out_of_sequence(uint16_t seq_num) + { + const uint16_t expected_packet_count = _data_seq_num; + _data_seq_num = seq_num + 1; + + if (expected_packet_count != seq_num) { + UHD_LOG_FASTPATH("D"); + return true; + } + return false; + } + + /*! + * Reads packet header and returns information in a struct. + * + * \return a tuple containing the packet info and packet sequence number + */ + std::tuple<packet_info_t, uint16_t> _read_data_packet_info(buff_t::uptr& buff) + { + const void* data = buff->data(); + _recv_packet->refresh(data); + const auto header = _recv_packet->get_chdr_header(); + const auto optional_time = _recv_packet->get_timestamp(); + + packet_info_t info; + info.eob = header.get_eob(); + info.has_tsf = optional_time.is_initialized(); + info.tsf = optional_time ? *optional_time : 0; + info.payload_bytes = _recv_packet->get_payload_size(); + info.payload = _recv_packet->get_payload_const_ptr(); + + const uint8_t* pkt_end = + reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); + const size_t pyld_pkt_len = + pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + + if (pyld_pkt_len < info.payload_bytes) { + _recv_io->release_recv_buff(std::move(buff)); + throw uhd::value_error("Bad CHDR header or invalid packet length."); + } + + return std::make_tuple(info, header.get_seq_num()); + } + + // Interface to the I/O service + transport::recv_io_if::sptr _recv_io; + + // Flow control state + rx_flow_ctrl_state _fc_state; + + // Maximum data payload in bytes + size_t _max_payload_size = 0; + + // Sequence number for data packets + uint16_t _data_seq_num = 0; + + // Packet for received data + chdr::chdr_packet::uptr _recv_packet; + + // Handles sending of strs flow control response packets + detail::rx_flow_ctrl_sender _fc_sender; + + // Local / Sink EPID + sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp new file mode 100644 index 000000000..62b811bf5 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -0,0 +1,550 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP + +#include <uhdlib/rfnoc/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +namespace detail { + +/*! + * Utility class to send tx flow control messages + */ +class tx_flow_ctrl_sender +{ +public: + //! Constructor + tx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.second) + { + _fc_packet = pkt_factory.make_strc(); + _fc_strc_pyld.src_epid = sep_ids.first; + _fc_strc_pyld.op_code = chdr::STRC_RESYNC; + } + + /*! + * Sends a flow control resync packet + * + * Sends a strc packet with the resync opcode to make the device transfer + * counts match those of the host, to correct for dropped packets. + * + * \param send_link the link to use to send the packet + * \counts transfer counts for packet contents + */ + size_t send_strc_resync( + transport::send_link_if* send_link, const stream_buff_params_t& counts) + { + auto buff = send_link->get_send_buff(0); + if (!buff) { + throw uhd::runtime_error("tx_flowctrl timed out getting a send buffer"); + } + + chdr::chdr_header header; + header.set_seq_num(_fc_seq_num++); + header.set_dst_epid(_dst_epid); + + chdr::strc_payload fc_payload(_fc_strc_pyld); + fc_payload.num_bytes = counts.bytes; + fc_payload.num_pkts = counts.packets; + + _fc_packet->refresh(buff->data(), header, fc_payload); + const size_t size = header.get_length(); + + buff->set_packet_size(size); + send_link->release_send_buff(std::move(buff)); + return size; + } + +private: + // Endpoint ID for recipient of flow control response + const sep_id_t _dst_epid; + + // Packet for writing flow control info + chdr::chdr_strc_packet::uptr _fc_packet; + + // Pre-configured strc payload to hold values that don't change + chdr::strc_payload _fc_strc_pyld; + + // Sequence number for flow control packets + uint16_t _fc_seq_num = 0; +}; +} // namespace detail + +/*! + * Flow-controlled transport for TX chdr data + * + * This transport provides the streamer an interface to send TX data packets. + * The transport implements flow control and keeps track of sequence numbers. + * + * The transport uses I/O services to provide options for work scheduling. I/O + * services allow the I/O work to be offloaded to a worker thread or to be + * performed in the same thread as the streamer API calls. + * + * For a tx transport, the host sends data packets, and the device sends strs + * packets letting the host know that buffer space in the device stream endpoint + * has been freed. For lossy links, the host also sends strc packets to + * resynchronize the transfer counts between host and device, to correct for + * any dropped packets in the link. + */ +class chdr_tx_data_xport +{ +public: + using uptr = std::unique_ptr<chdr_tx_data_xport>; + using buff_t = transport::frame_buff; + + //! Information about data packet + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + }; + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param addrs Source and destination addresses + * \param epids Source and destination endpoint IDs + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param num_send_frames Num frames to reserve from the send link + * \param fc_freq_ratio Ratio to use to configure the device fc frequency + * \param fc_headroom_ratio Ratio to use to configure the device fc headroom + */ + chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_addr_pair_t& addrs, + const uhd::rfnoc::sep_id_pair_t& epids, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt, + const size_t num_send_frames, + const double fc_freq_ratio, + const double fc_headroom_ratio) + : _fc_sender(pkt_factory, epids), _epid(epids.first) + { + const sep_addr_t remote_sep_addr = addrs.second; + const sep_addr_t local_sep_addr = addrs.first; + const sep_id_t remote_epid = epids.second; + const sep_id_t local_epid = epids.first; + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Creating tx xport with local epid=" << local_epid + << ", remote epid=" << remote_epid); + + _send_header.set_dst_epid(epids.second); + _send_packet = pkt_factory.make_generic(); + _recv_packet = pkt_factory.make_generic(); + + // Calculate max payload size + const size_t pyld_offset = + _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + + _configure_sep(io_srv, + recv_link, + send_link, + pkt_factory, + local_sep_addr, + local_epid, + remote_sep_addr, + remote_epid, + pyld_buff_fmt, + mdata_buff_fmt); + + _initialize_flow_ctrl(io_srv, + recv_link, + send_link, + pkt_factory, + epids, + fc_freq_ratio, + fc_headroom_ratio); + + // Now create the send I/O we will use for data + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + this->_send_callback(buff, send_link); + }; + + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + // Needs just a single recv frame for strs packets + _send_io = io_srv->make_send_client(send_link, + num_send_frames, + send_cb, + recv_link, + /* num_recv_frames */ 1, + recv_cb); + } + + /*! Returns maximum number of payload bytes + * + * \return maximum number of payload bytes + */ + size_t get_max_payload_size() const + { + return _max_payload_size; + } + + /*! + * Gets a TX frame buffer + * + * \param timeout_ms timeout in milliseconds + * \return the frame buffer, or nullptr if timeout occurs + */ + buff_t::uptr get_send_buff(const int32_t timeout_ms) + { + return _send_io->get_send_buff(timeout_ms); + } + + /*! + * Sends a TX data packet + * + * \param buff the frame buffer containing the packet to send + */ + void release_send_buff(buff_t::uptr buff) + { + _send_io->release_send_buff(std::move(buff)); + } + + /*! + * Writes header into frame buffer and returns payload pointer + * + * \param buff Frame buffer to write header into + * \param info Information to include in the header + * \return A pointer to the payload data area and the packet size in bytes + */ + std::pair<void*, size_t> write_packet_header(buff_t::uptr& buff, + const packet_info_t& info) + { + uint64_t tsf = 0; + + if (info.has_tsf) { + _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_WITH_TS); + tsf = info.tsf; + } else { + _send_header.set_pkt_type(chdr::PKT_TYPE_DATA_NO_TS); + } + + _send_header.set_eob(info.eob); + _send_header.set_seq_num(_data_seq_num++); + + _send_packet->refresh(buff->data(), _send_header, tsf); + _send_packet->update_payload_size(info.payload_bytes); + + return std::make_pair( + _send_packet->get_payload_ptr(), + _send_packet->get_chdr_header().get_length()); + } + +private: + /*! + * Recv callback for I/O service + * + * The I/O service invokes this callback when it reads a packet from the + * recv link. + * + * \param buff the frame buffer containing the packet data + * \param recv_link the recv link from which buff was read + * \param send_link the send link for flow control messages + */ + bool _recv_callback(buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* /*send_link*/) + { + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + if (dst_epid != _epid) { + return false; + } + + if (type == chdr::PKT_TYPE_STRS) { + chdr::strs_payload strs; + strs.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), + _recv_packet->get_payload_size() / sizeof(uint64_t), + _recv_packet->conv_to_host<uint64_t>()); + + _fc_state.update_dest_recv_count({strs.xfer_count_bytes, + static_cast<uint32_t>(strs.xfer_count_pkts)}); + + // TODO: check strs status here and push into async msg queue + + // Packet belongs to this transport, release buff and return true + recv_link->release_recv_buff(std::move(buff)); + buff = nullptr; + return true; + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + } + + /*! + * Send callback for I/O service + * + * The I/O service invokes this callback when it is requested to release + * a send buffer to the send link. + * + * \param buff the frame buffer to release + * \param send_link the send link for flow control messages + */ + void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link) + { + const size_t packet_size = buff->packet_size(); + + if (_fc_state.dest_has_space(packet_size)) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + + _fc_state.data_sent(packet_size); + + if (_fc_state.get_fc_resync_req_pending() + && _fc_state.dest_has_space(chdr::strc_payload::PACKET_SIZE)) { + const auto& xfer_counts = _fc_state.get_xfer_counts(); + const size_t strc_size = + _fc_sender.send_strc_resync(send_link, xfer_counts); + _fc_state.clear_fc_resync_req_pending(); + _fc_state.data_sent(strc_size); + } + } + } + + /*! + * Configures the stream endpoint using mgmt_portal + */ + void _configure_sep(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_addr_t& local_sep_addr, + const uhd::rfnoc::sep_id_t& local_epid, + const uhd::rfnoc::sep_addr_t& remote_sep_addr, + const uhd::rfnoc::sep_id_t& remote_epid, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt) + { + // Create a control transport with the tx data links to send mgmt packets + // needed to setup the stream. Only need one frame for this. + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + local_epid, + 1, // num_send_frames + 1); // num_recv_frames + + // Create new temporary management portal with the transports used for this stream + // TODO: This is a bit excessive. Maybe we can pare down the functionality of the + // portal just for route setup purposes. Whatever we do, we *must* use xport in it + // though otherwise the transport will not behave correctly. + auto data_mgmt_portal = uhd::rfnoc::mgmt::mgmt_portal::make( + *ctrl_xport, pkt_factory, local_sep_addr, local_epid); + + // Setup a route to the EPID + data_mgmt_portal->initialize_endpoint(*ctrl_xport, remote_sep_addr, remote_epid); + data_mgmt_portal->setup_local_route(*ctrl_xport, remote_epid); + + data_mgmt_portal->config_local_tx_stream( + *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + + // We no longer need the control xport and mgmt_portal, release them so + // the control xport is no longer connected to the I/O service. + data_mgmt_portal.reset(); + ctrl_xport.reset(); + } + + /*! + * Initializes flow control + * + * To initialize flow control, we need to send an init strc packet, then + * receive a strs containing the stream endpoint ingress buffer size. We + * then repeat this (now that we know the buffer size) to configure the flow + * control frequency. To avoid having this logic within the data packet + * processing flow, we use temporary send and recv I/O instances with + * simple callbacks here. + */ + void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const sep_id_pair_t sep_ids, + const double fc_freq_ratio, + const double fc_headroom_ratio) + { + // No flow control at initialization, just release all send buffs + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + }; + + // For recv, just queue strs packets for recv_io to read + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* /*recv_link*/, + transport::send_link_if* /*send_link*/) { + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS); + }; + + // No flow control at initialization, just release all recv buffs + auto fc_cb = [this](buff_t::uptr buff, + transport::recv_link_if* recv_link, + transport::send_link_if* /*send_link*/) { + recv_link->release_recv_buff(std::move(buff)); + }; + + auto send_io = io_srv->make_send_client(send_link, + 1, // num_send_frames + send_cb, + nullptr, + 0, // num_recv_frames + nullptr); + + auto recv_io = io_srv->make_recv_client(recv_link, + 1, // num_recv_frames + recv_cb, + nullptr, + 0, // num_send_frames + fc_cb); + + chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); + chdr::chdr_packet::uptr& recv_packet = _recv_packet; + + // Function to send a strc init + auto send_strc_init = [&send_io, sep_ids, &strc_packet]( + const stream_buff_params_t fc_freq = {0, 0}) { + transport::frame_buff::uptr buff = send_io->get_send_buff(0); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out getting a send buffer for strc init"); + } + + chdr::chdr_header header; + header.set_seq_num(0); + header.set_dst_epid(sep_ids.second); + + chdr::strc_payload strc_pyld; + strc_pyld.src_epid = sep_ids.first; + strc_pyld.op_code = chdr::STRC_INIT; + strc_pyld.num_bytes = fc_freq.bytes; + strc_pyld.num_pkts = fc_freq.packets; + strc_packet->refresh(buff->data(), header, strc_pyld); + + const size_t size = header.get_length(); + buff->set_packet_size(size); + send_io->release_send_buff(std::move(buff)); + }; + + // Function to receive a strs, returns buffer capacity + auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { + transport::frame_buff::uptr buff = recv_io->get_recv_buff(200); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out wating for a strs packet during initialization"); + } + + recv_packet->refresh(buff->data()); + UHD_ASSERT_THROW( + recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); + chdr::strs_payload strs; + strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), + recv_packet->get_payload_size() / sizeof(uint64_t), + recv_packet->conv_to_host<uint64_t>()); + + recv_io->release_recv_buff(std::move(buff)); + + return {strs.capacity_bytes, + static_cast<uint32_t>(strs.capacity_pkts)}; + }; + + // Send a strc init to get the buffer size + send_strc_init(); + stream_buff_params_t capacity = recv_strs(); + _fc_state.set_dest_capacity(capacity); + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Received strs initializing buffer capacity to " + << capacity.bytes << " bytes"); + + // Calculate the requested fc_freq parameters + uhd::rfnoc::stream_buff_params_t fc_freq = { + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), + static_cast<uint32_t>( + std::ceil(double(capacity.packets) * fc_freq_ratio))}; + + const size_t headroom_bytes = + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); + const size_t headroom_packets = static_cast<uint32_t>( + std::ceil(double(capacity.packets) * fc_headroom_ratio)); + + fc_freq.bytes -= headroom_bytes; + fc_freq.packets -= headroom_packets; + + // Send a strc init to configure fc freq + send_strc_init(fc_freq); + recv_strs(); + + // Release temporary I/O service interfaces to disconnect from it + send_io.reset(); + recv_io.reset(); + } + + // Interface to the I/O service + transport::send_io_if::sptr _send_io; + + // Flow control state + tx_flow_ctrl_state _fc_state; + + // Maximum data payload in bytes + size_t _max_payload_size = 0; + + // Sequence number for data packets + uint16_t _data_seq_num = 0; + + // Header to write into send packets + chdr::chdr_header _send_header; + + // Packet for send data + chdr::chdr_packet::uptr _send_packet; + + // Packet to receive strs messages + chdr::chdr_packet::uptr _recv_packet; + + // Handles sending of strc flow control ack packets + detail::tx_flow_ctrl_sender _fc_sender; + + // Local / Source EPID + sep_id_t _epid; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp index 62b24ab61..1f14ea7d0 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_types.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_types.hpp @@ -482,6 +482,8 @@ public: // Members uint64_t num_pkts = 0; //! Number of bytes to use for operation (64 bits) uint64_t num_bytes = 0; + //! Size of a strc packet (including header) + static constexpr size_t PACKET_SIZE = 24; public: // Functions strc_payload() = default; diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp index 120b0e0f8..28fa8ec7c 100644 --- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp @@ -7,11 +7,14 @@ #ifndef INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP #define INCLUDED_LIBUHD_RFNOC_GRAPH_STREAM_MANAGER_HPP +#include <uhd/stream.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/client_zero.hpp> #include <uhdlib/rfnoc/ctrlport_endpoint.hpp> #include <uhdlib/rfnoc/epid_allocator.hpp> #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> #include <functional> #include <memory> #include <set> @@ -84,6 +87,7 @@ public: virtual detail::client_zero::sptr get_client_zero( sep_addr_t dst_addr, device_id_t via_device = NULL_DEVICE_ID) const = 0; + /*! Configure a flow controlled data stream from the endpoint with ID src_epid to the * endpoint with ID dst_epid * @@ -102,7 +106,33 @@ public: const double fc_headroom_ratio, const bool reset = false) = 0; - // TODO: Implement functions to get graph-wide streamers + /*! \brief Create a data stream going from the device to the host + * + * \param dst_addr The address of the destination stream endpoint + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param xport_args The transport arguments + * \return An transport instance + */ + virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( + sep_addr_t dst_addr, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const device_addr_t& xport_args) = 0; + + /*! \brief Create a data stream going from the host to the device + * + * \param dst_addr The address of the destination stream endpoint + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata + * \param xport_args The transport arguments + * \return An transport instance + */ + virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( + sep_addr_t dst_addr, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const device_addr_t& xport_args) = 0; /*! * \brief Create a graph_stream_manager and return a unique_ptr to it diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp index 79121a498..72f1cf1c7 100644 --- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp @@ -11,6 +11,7 @@ #include <uhdlib/rfnoc/ctrlport_endpoint.hpp> #include <uhdlib/rfnoc/epid_allocator.hpp> #include <uhdlib/rfnoc/mb_iface.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> #include <functional> #include <memory> #include <set> @@ -112,41 +113,29 @@ public: /*! \brief Create a data stream going from the host to the device * * \param dst_addr The address of the destination stream endpoint - * \param lossy_xport Is the transport lossy? * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params - * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params * \param xport_args The transport arguments * \return An transport instance */ virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( const sep_addr_t dst_addr, - const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, - const double fc_freq_ratio, - const double fc_headroom_ratio, const device_addr_t& xport_args) = 0; /*! \brief Create a data stream going from the device to the host * * \param dst_addr The address of the destination stream endpoint - * \param lossy_xport Is the transport lossy? * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param fc_freq_ratio Flow control response frequency as a ratio of the buff params - * \param fc_headroom_ratio Flow control headroom as a ratio of the buff params * \param xport_args The transport arguments * \return An transport instance */ virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( const sep_addr_t src_addr, - const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, - const double fc_freq_ratio, - const double fc_headroom_ratio, const device_addr_t& xport_args) = 0; static uptr make(const chdr::chdr_packet_factory& pkt_factory, diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp index 0a2790a34..33a0e3df0 100644 --- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp +++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp @@ -8,21 +8,14 @@ #define INCLUDED_LIBUHD_MB_IFACE_HPP #include <uhdlib/rfnoc/chdr_ctrl_xport.hpp> -#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> #include <uhdlib/rfnoc/clock_iface.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> #include <memory> namespace uhd { namespace rfnoc { -// FIXME: Update this -class chdr_rx_data_xport -{ -public: - using uptr = std::unique_ptr<chdr_rx_data_xport>; -}; - -using chdr_tx_data_xport = chdr_rx_data_xport; - /*! Motherboard (backchannel) interface * * In RFNoC devices, the RFNoC subystem needs a backchannel interface to talk to @@ -70,7 +63,8 @@ public: /*! Return a reference to a clock iface */ - virtual std::shared_ptr<clock_iface> get_clock_iface(const std::string& clock_name) = 0; + virtual std::shared_ptr<clock_iface> get_clock_iface( + const std::string& clock_name) = 0; /*! Create a control transport * @@ -89,30 +83,34 @@ public: * * This is typically called once per streaming channel. * - * \param local_device_id ID for the host transport adapter to use - * \param local_epid Host (sink) streaming endpoint ID - * \param remote_epid Remote device (source) streaming endpoint ID + * \param addrs Address of the device and host stream endpoints + * \param epids Endpoint IDs of the device and host stream endpoints + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata * \param xport_args Transport configuration args * \return A CHDR RX data transport */ - virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id, - const sep_id_t& local_epid, - const sep_id_t& remote_epid, + virtual chdr_rx_data_xport::uptr make_rx_data_transport(const sep_addr_pair_t& addrs, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, const device_addr_t& xport_args) = 0; /*! Create an TX data transport * * This is typically called once per streaming channel. * - * \param local_device_id ID for the host transport adapter to use - * \param local_epid Host (source) streaming endpoint ID - * \param remote_epid Remote device (sink) streaming endpoint ID + * \param addrs Address of the host and device stream endpoints + * \param epids Endpoint IDs of the host and device stream endpoints + * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload + * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata * \param xport_args Transport configuration args * \return A CHDR TX data transport */ - virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id, - const sep_id_t& local_epid, - const sep_id_t& remote_epid, + virtual chdr_tx_data_xport::uptr make_tx_data_transport(const sep_addr_pair_t& addrs, + const sep_id_pair_t& epids, + const uhd::rfnoc::sw_buff_t pyld_buff_fmt, + const uhd::rfnoc::sw_buff_t mdata_buff_fmt, const device_addr_t& xport_args) = 0; }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp index 7ec1b7bb2..bc56fd311 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp @@ -41,6 +41,8 @@ using device_id_t = uint16_t; using sep_inst_t = uint16_t; //! Stream Endpoint Physical Address Type using sep_addr_t = std::pair<device_id_t, sep_inst_t>; +//! Stream Endpoint Physical Address Type (first = source, second = destination) +using sep_addr_pair_t = std::pair<sep_addr_t, sep_addr_t>; //! Stream Endpoint ID Type using sep_id_t = uint16_t; //! Stream Endpoint pair Type (first = source, second = destination) @@ -65,6 +67,19 @@ struct stream_buff_params_t //! The data type of the buffer used to capture/generate data enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; +//! Conversion from number of bits to sw_buff +constexpr sw_buff_t bits_to_sw_buff(size_t bits) +{ + if (bits <= 8) { + return BUFF_U8; + } else if (bits <= 16) { + return BUFF_U16; + } else if (bits <= 32) { + return BUFF_U32; + } else { + return BUFF_U64; + } +} //---------------------------------------------- // Constants @@ -72,6 +87,12 @@ enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; constexpr uint16_t RFNOC_PROTO_VER = 0x0100; +constexpr uint64_t MAX_FC_CAPACITY_BYTES = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_CAPACITY_PKTS = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_FREQ_BYTES = (uint64_t(1) << 40) - 1; +constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1; +constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1; +constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp new file mode 100644 index 000000000..6ced60d19 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -0,0 +1,95 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + * Extends the streamer_impl to be an rfnoc node so it can connect to the + * graph. Configures the streamer conversion and rate parameters with values + * received through property propagation. + */ +class rfnoc_rx_streamer : public node_t, + public transport::rx_streamer_impl<chdr_rx_data_xport> +{ +public: + /*! Constructor + * + * \param num_ports The number of ports + * \param stream_args Arguments to aid the construction of the streamer + */ + rfnoc_rx_streamer(const size_t num_ports, const uhd::stream_args_t stream_args); + + /*! Returns a unique identifier string for this node. In every RFNoC graph, + * no two nodes cannot have the same ID. Returns a string in the form of + * "RxStreamer#0". + * + * \returns The unique ID as a string + */ + std::string get_unique_id() const; + + /*! Returns the number of input ports for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_input_ports() const; + + /*! Returns the number of output ports for this block. + * + * Always returns 0 for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_output_ports() const; + + /*! Implementation of rx_streamer API method + * + * \param stream_cmd the stream command to issue + */ + void issue_stream_cmd(const stream_cmd_t& stream_cmd); + + /*! Returns stream args provided at creation + * + * \return stream args provided when streamer is created + */ + const uhd::stream_args_t& get_stream_args() const; + + /*! Check that all streamer ports are connected to blocks + * + * Overrides node_t to ensure there are no unconnected ports. + * + * \param connected_inputs A list of input ports that are connected + * \param connected_outputs A list of output ports that are connected + * \returns true if the block can deal with this configuration + */ + bool check_topology(const std::vector<size_t>& connected_inputs, + const std::vector<size_t>& connected_outputs); +private: + void _register_props(const size_t chan, const std::string& otw_format); + + // Properties + std::vector<property_t<double>> _scaling_in; + std::vector<property_t<double>> _samp_rate_in; + std::vector<property_t<double>> _tick_rate_in; + std::vector<property_t<std::string>> _type_in; + + // Streamer unique ID + const std::string _unique_id; + + // Stream args provided at construction + const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp new file mode 100644 index 000000000..4acee45cc --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -0,0 +1,90 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP + +#include <uhd/rfnoc/node.hpp> +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <string> + +namespace uhd { namespace rfnoc { + +/*! + * Extends the streamer_impl to be an rfnoc node so it can connect to the + * graph. Configures the streamer conversion and rate parameters with values + * received through property propagation. + */ +class rfnoc_tx_streamer : public node_t, + public transport::tx_streamer_impl<chdr_tx_data_xport> +{ +public: + /*! Constructor + * + * \param num_ports The number of ports + * \param stream_args Arguments to aid the construction of the streamer + */ + rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args); + + /*! Returns a unique identifier string for this node. In every RFNoC graph, + * no two nodes cannot have the same ID. Returns a string in the form of + * "TxStreamer#0". + * + * \returns The unique ID as a string + */ + std::string get_unique_id() const; + + /*! Returns the number of input ports for this block. + * + * Always returns 0 for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_input_ports() const; + + /*! Returns the number of output ports for this block. + * + * \return noc_id The number of ports + */ + size_t get_num_output_ports() const; + + /*! Returns stream args provided at creation + * + * \return stream args provided when streamer is created + */ + const uhd::stream_args_t& get_stream_args() const; + + /*! Check that all streamer ports are connected to blocks + * + * Overrides node_t to ensure there are no unconnected ports. + * + * \param connected_inputs A list of input ports that are connected + * \param connected_outputs A list of output ports that are connected + * \returns true if the block can deal with this configuration + */ + bool check_topology(const std::vector<size_t>& connected_inputs, + const std::vector<size_t>& connected_outputs); + +private: + void _register_props(const size_t chan, const std::string& otw_format); + + // Properties + std::vector<property_t<double>> _scaling_out; + std::vector<property_t<double>> _samp_rate_out; + std::vector<property_t<double>> _tick_rate_out; + std::vector<property_t<std::string>> _type_out; + + // Streamer unique ID + const std::string _unique_id; + + // Stream args provided at construction + const uhd::stream_args_t _stream_args; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_STREAMER_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp new file mode 100644 index 000000000..937baf982 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp @@ -0,0 +1,130 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP + +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage rx flow control state +class rx_flow_ctrl_state +{ +public: + //! Constructor + rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {} + + //! Initialize frequency parameters + void initialize(const stream_buff_params_t fc_freq) + { + _fc_freq = fc_freq; + } + + //! Resynchronize with transfer counts from the sender + void resynchronize(const stream_buff_params_t counts) + { + if (_recv_counts.bytes != counts.bytes + || _recv_counts.packets != counts.packets) { + // If there is a discrepancy between the amount of data sent by + // the device and received by the transport, adjust the counts + // of data received and transferred to include the dropped data. + auto bytes_dropped = counts.bytes - _recv_counts.bytes; + auto pkts_dropped = counts.packets - _recv_counts.packets; + _xfer_counts.bytes += bytes_dropped; + _xfer_counts.packets += pkts_dropped; + + UHD_LOGGER_DEBUG("rx_flow_ctrl_state") + << "oh noes: bytes_sent=" << counts.bytes + << " bytes_received=" << _recv_counts.bytes + << " pkts_sent=" << counts.packets + << " pkts_received=" << _recv_counts.packets + << " src_epid=" << _epids.first << " dst_epid=" << _epids.second + << std::endl; + + _recv_counts = counts; + } + } + + //! Reset the transfer counts (happens during init) + void reset_counts() + { + UHD_LOGGER_TRACE("rx_flow_ctrl_state") + << "Resetting transfer counts" << std::endl; + _recv_counts = {0, 0}; + _xfer_counts = {0, 0}; + } + + //! Update state when data is received + void data_received(const size_t bytes) + { + _recv_counts.bytes += bytes; + _recv_counts.packets++; + } + + //! Update state when transfer is complete (buffer space freed) + void xfer_done(const size_t bytes) + { + _xfer_counts.bytes += bytes; + _xfer_counts.packets++; + } + + //! Returns whether a flow control response is needed + bool fc_resp_due() const + { + stream_buff_params_t accum_counts = { + _xfer_counts.bytes - _last_fc_resp_counts.bytes, + _xfer_counts.packets - _last_fc_resp_counts.packets}; + + return accum_counts.bytes >= _fc_freq.bytes + || accum_counts.packets >= _fc_freq.packets; + } + + //! Update state after flow control response was sent + void fc_resp_sent() + { + _last_fc_resp_counts = _xfer_counts; + } + + //! Returns counts for completed transfers + stream_buff_params_t get_xfer_counts() const + { + return _xfer_counts; + } + + //! Returns counts for completed transfers + stream_buff_params_t get_recv_counts() const + { + return _recv_counts; + } + + //! Returns configured flow control frequency + stream_buff_params_t get_fc_freq() const + { + return _fc_freq; + } + +private: + // Counts for data received, including any data still in use + stream_buff_params_t _recv_counts{0, 0}; + + // Counts for data read and whose buffer space is ok to reuse + stream_buff_params_t _xfer_counts{0, 0}; + + // Counts sent in last flow control response + stream_buff_params_t _last_fc_resp_counts{0, 0}; + + // Frequency of flow control responses + stream_buff_params_t _fc_freq{0, 0}; + + // Endpoint ID for log messages + const sep_id_pair_t _epids; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_RX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp new file mode 100644 index 000000000..65fc1b093 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp @@ -0,0 +1,99 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP +#define INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP + +#include <uhdlib/rfnoc/rfnoc_common.hpp> + +namespace uhd { namespace rfnoc { + +//! Class to manage tx flow control state +class tx_flow_ctrl_state +{ +public: + //! Updates destination capacity + void set_dest_capacity(const stream_buff_params_t& capacity) + { + _dest_capacity = capacity; + } + + //! Updates destination received count + void update_dest_recv_count(const stream_buff_params_t& recv_count) + { + _recv_counts = recv_count; + } + + /*! Returns whether the destination has buffer space for the requested + * packet size + */ + bool dest_has_space(const size_t packet_size) + { + // The stream endpoint only cares about bytes, the packet count is not + // important to determine the space available. + const auto buffer_fullness = _xfer_counts.bytes - _recv_counts.bytes; + const auto space_available = _dest_capacity.bytes - buffer_fullness; + return space_available >= packet_size; + } + + //! Increments transfer count with amount of data sent + void data_sent(const size_t packet_size) + { + _xfer_counts.bytes += packet_size; + _xfer_counts.packets++; + + // Request an fc resync after we have transferred a number of bytes >= + // to the destination capacity. There is no strict requirement on how + // often we need to send this, as it is only needed to correct for + // dropped packets. One buffer's worth of bytes is probably a good + // cadence. + if (_xfer_counts.bytes - _last_fc_resync_bytes >= _dest_capacity.bytes) { + _fc_resync_req = true; + } + } + + /*! Returns whether an fc resync request is pending. The policy we use + * here is to send an fc resync every time we send a number of bytes + * equal to the destination buffer capacity. + */ + bool get_fc_resync_req_pending() const + { + return _fc_resync_req; + } + + //! Clears fc resync request pending status + void clear_fc_resync_req_pending() + { + _fc_resync_req = false; + _last_fc_resync_bytes = _xfer_counts.bytes; + } + + //! Returns counts for packets sent + stream_buff_params_t get_xfer_counts() const + { + return _xfer_counts; + } + +private: + // Counts for data sent + stream_buff_params_t _xfer_counts{0, 0}; + + // Counts for data received by the destination + stream_buff_params_t _recv_counts{0, 0}; + + // Buffer size at the destination + stream_buff_params_t _dest_capacity{0, 0}; + + // Counts sent in last flow control resync + size_t _last_fc_resync_bytes = 0; + + // Track when to send ack packets + bool _fc_resync_req = false; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_RFNOC_TX_FLOW_CTRL_STATE_HPP */ diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/dynamic_bitset.hpp> +#include <boost/format.hpp> + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template <typename transport_t> +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector<typename transport_t::uptr>& xports, + std::vector<typename transport_t::buff_t::uptr>& frame_buffs, + std::vector<typename transport_t::packet_info_t>& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr>& _xports; + + // Storage for buffers resulting from alignment + std::vector<typename transport_t::buff_t::uptr>& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector<typename transport_t::packet_info_t>& _infos; + + // Time of previous packet for each channel + std::vector<uint64_t> _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp new file mode 100644 index 000000000..d66e867bc --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -0,0 +1,341 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/exception.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/endianness.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/rx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for error handling + * + * If a recv call reads data from multiple packets, and an error occurs in the + * second or later packets, recv stops short of the num samps requested and + * returns no error. The error is cached for the next call to recv. + * + * Timeout errors are an exception. Timeouts that occur in the second or later + * packets of a recv call stop the recv method but the error is not returned in + * the next call. The user can check for this condition since fewer samples are + * returned than the number requested. + */ +class rx_metadata_cache +{ +public: + //! Stores metadata in the cache, ignoring timeout errors + UHD_FORCE_INLINE void store(const rx_metadata_t& metadata) + { + if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) { + _metadata_cache = metadata; + _cached_metadata = true; + } + } + + //! Checks for cached metadata + UHD_FORCE_INLINE bool check(rx_metadata_t& metadata) + { + if (_cached_metadata) { + metadata = _metadata_cache; + _cached_metadata = false; + return true; + } + return false; + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::rx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of rx streamer API + */ +template <typename transport_t> +class rx_streamer_impl : public rx_streamer +{ +public: + //! Constructor + rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_ports) + , _in_buffs(num_ports) + { + if (stream_args.cpu_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a cpu_format!"); + } + if (stream_args.otw_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a otw_format!"); + } + _setup_converters(num_ports, stream_args); + _zero_copy_streamer.set_samp_rate(_samp_rate); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + //! Implementation of rx_streamer API method + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + //! Implementation of rx_streamer API method + size_t get_max_num_samps() const + { + return _spp; + } + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + //! Implementation of rx_streamer API method + UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const double timeout, + const bool one_packet) + { + if (_error_metadata_cache.check(metadata)) { + return 0; + } + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + size_t total_samps_recv = + _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + + if (one_packet or metadata.end_of_burst) { + return total_samps_recv; + } + + // First set of packets recv had an error, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + return total_samps_recv; + } + + // Loop until buffer is filled or error code. This method returns the + // metadata from the first packet received, with the exception of + // end-of-burst. + uhd::rx_metadata_t loop_metadata; + + while (total_samps_recv < nsamps_per_buff) { + size_t num_samps = _recv_one_packet(buffs, + nsamps_per_buff - total_samps_recv, + loop_metadata, + timeout_ms, + total_samps_recv * _convert_info.bytes_per_cpu_item); + + // If metadata had an error code set, store for next call and return + if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + _error_metadata_cache.store(loop_metadata); + break; + } + + total_samps_recv += num_samps; + + // Return immediately if end of burst + if (loop_metadata.end_of_burst) { + metadata.end_of_burst = true; + break; + } + } + + return total_samps_recv; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + _zero_copy_streamer.set_samp_rate(rate); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Receive a single packet + UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const int32_t timeout_ms, + const size_t buffer_offset_bytes = 0) + { + if (_buff_samps_remaining == 0) { + // Current set of buffers has expired, get the next one + _buff_samps_remaining = + _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); + _fragment_offset_in_samps = 0; + } else { + // There are samples still left in the current set of buffers + metadata = _last_fragment_metadata; + metadata.time_spec += time_spec_t::from_ticks( + _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate); + } + + if (_buff_samps_remaining != 0) { + const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining); + + // Convert samples to the streamer's output format + for (size_t i = 0; i < get_num_channels(); i++) { + char* b = reinterpret_cast<char*>(buffs[i]); + const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes); + _convert_to_out_buff(out_buffs, i, num_samps); + } + + _buff_samps_remaining -= num_samps; + + // Write the fragment flags and offset + metadata.more_fragments = _buff_samps_remaining != 0; + metadata.fragment_offset = _fragment_offset_in_samps; + + if (metadata.more_fragments) { + _fragment_offset_in_samps += num_samps; + _last_fragment_metadata = metadata; + } + + return num_samps; + } else { + return 0; + } + } + + //! Convert samples for one channel into its buffer + UHD_FORCE_INLINE void _convert_to_out_buff( + const uhd::rx_streamer::buffs_type& out_buffs, + const size_t chan, + const size_t num_samps) + { + const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]); + + _converters[chan]->conv(buffer_ptr, out_buffs, num_samps); + + // Advance the pointer for the source buffer + _in_buffs[chan] = + buffer_ptr + num_samps * _convert_info.bytes_per_otw_item; + + if (_buff_samps_remaining == num_samps) { + _zero_copy_streamer.release_recv_buff(chan); + } + } + + //! Create converters and initialize _convert_info + void _setup_converters(const size_t num_ports, + const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.otw_format + "_chdr"; + id.num_inputs = 1; + id.output_format = stream_args.cpu_format; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_ports; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(1 / 32767.0); + } + } + + // Converter and item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Implementation of frame buffer management and packet info + rx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Container for buffer pointers used in recv method + std::vector<const void*> _in_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Num samps remaining in buffer currently held by zero copy streamer + size_t _buff_samps_remaining = 0; + + // Metadata cache for error handling + detail::rx_metadata_cache _error_metadata_cache; + + // Fragment (partially read packet) information + size_t _fragment_offset_in_samps = 0; + rx_metadata_t _last_fragment_metadata; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp new file mode 100644 index 000000000..36f568f2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -0,0 +1,207 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/get_aligned_buffs.hpp> +#include <boost/format.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of rx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class rx_streamer_zero_copy +{ +public: + //! Constructor + rx_streamer_zero_copy(const size_t num_ports) + : _xports(num_ports) + , _frame_buffs(num_ports) + , _infos(num_ports) + , _get_aligned_buffs(_xports, _frame_buffs, _infos) + { + } + + ~rx_streamer_zero_copy() + { + for (size_t i = 0; i < _frame_buffs.size(); i++) { + if (_frame_buffs[i]) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + } + } + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of time-aligned buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param metadata returns the metadata corresponding to the buffer + * \param timeout_ms timeout in milliseconds + * \return the size in samples of each packet, or 0 if timeout + */ + size_t get_recv_buffs(std::vector<const void*>& buffs, + rx_metadata_t& metadata, + const int32_t timeout_ms) + { + metadata.reset(); + + switch (_get_aligned_buffs(timeout_ms)) { + case get_aligned_buffs_t::SUCCESS: + break; + + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return 0; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return 0; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return 0; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + metadata.has_time_spec = _last_read_time_info.has_time_spec; + metadata.time_spec = + _last_read_time_info.time_spec + + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + return 0; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + + // Get payload pointers for each buffer and aggregate eob. We set eob to + // true if any channel has it set, since no more data will be received for + // that channel. In most cases, all channels should have the same value. + bool eob = false; + for (size_t i = 0; i < buffs.size(); i++) { + buffs[i] = _infos[i].payload; + eob |= _infos[i].eob; + } + + // Set the metadata from the buffer information at index zero + const auto& info_0 = _infos[0]; + + metadata.has_time_spec = info_0.has_tsf; + metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate); + metadata.start_of_burst = false; + metadata.end_of_burst = eob; + metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + // Done with these packets, save timestamp info for next call + _last_read_time_info.has_time_spec = metadata.has_time_spec; + _last_read_time_info.time_spec = metadata.time_spec; + _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item; + + return _last_read_time_info.num_samps; + } + + /*! + * Release the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + void release_recv_buff(const size_t channel) + { + _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel])); + _frame_buffs[channel] = typename transport_t::buff_t::uptr(); + } + +private: + using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + + // Information recorded by streamer about the last data packet processed, + // used to create the metadata when there is a sequence error. + struct last_read_time_info_t + { + size_t num_samps = 0; + bool has_time_spec = false; + time_spec_t time_spec; + }; + + // Transports for each channel + std::vector<typename transport_t::uptr> _xports; + + // Storage for buffers for each channel while they are in flight (between + // calls to get_recv_buff and release_recv_buff). + std::vector<typename transport_t::buff_t::uptr> _frame_buffs; + + // Packet info corresponding to the packets in flight + std::vector<typename transport_t::packet_info_t> _infos; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Rate used in conversion of timestamp to time_spec_t + double _samp_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Implementation of packet time alignment + get_aligned_buffs_t _get_aligned_buffs; + + // Information about the last data packet processed + last_read_time_info_t _last_read_time_info; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp new file mode 100644 index 000000000..60881dad2 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -0,0 +1,307 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhdlib/transport/tx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for send calls with zero samples + * + * Metadata is cached when we get a send requesting a start of burst with no + * samples. It is applied here on the next call to send() that actually has + * samples to send. + */ +class tx_metadata_cache +{ +public: + //! Stores metadata in the cache + UHD_FORCE_INLINE void store(const tx_metadata_t& metadata) + { + _metadata_cache = metadata; + _cached_metadata = true; + } + + //! Checks for cached metadata + UHD_FORCE_INLINE void check(tx_metadata_t& metadata) + { + if (_cached_metadata) { + // Only use cached time_spec if metadata does not have one + if (!metadata.has_time_spec) { + metadata.has_time_spec = _metadata_cache.has_time_spec; + metadata.time_spec = _metadata_cache.time_spec; + } + metadata.start_of_burst = _metadata_cache.start_of_burst; + metadata.end_of_burst = _metadata_cache.end_of_burst; + _cached_metadata = false; + } + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::tx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of tx streamer API + */ +template <typename transport_t> +class tx_streamer_impl : public tx_streamer +{ +public: + tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_chans) + , _zero_buffs(num_chans, &_zero) + , _out_buffs(num_chans) + { + _setup_converters(num_chans, stream_args); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + _spp = stream_args.args.cast<size_t>("spp", _spp); + } + + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + size_t get_max_num_samps() const + { + return _spp; + } + + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + size_t send(const uhd::tx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t& metadata_, + const double timeout) + { + uhd::tx_metadata_t metadata(metadata_); + + if (nsamps_per_buff == 0 && metadata.start_of_burst) { + _metadata_cache.store(metadata); + return 0; + } + + _metadata_cache.check(metadata); + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + if (nsamps_per_buff == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + timeout_ms); + + return 0; + } else if (nsamps_per_buff <= _spp) { + return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + + } else { + size_t total_num_samps_sent = 0; + const bool eob = metadata.end_of_burst; + metadata.end_of_burst = false; + + const size_t num_fragments = (nsamps_per_buff - 1) / _spp; + const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + + for (size_t i = 0; i < num_fragments; i++) { + const size_t num_samps_sent = _send_one_packet( + buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + + total_num_samps_sent += num_samps_sent; + + if (num_samps_sent == 0) { + return total_num_samps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; + } + + // Send the final fragment + metadata.end_of_burst = eob; + + size_t nsamps_sent = + total_num_samps_sent + + _send_one_packet( + buffs, total_num_samps_sent, final_length, metadata, timeout); + + return nsamps_sent; + } + } + + //! Implementation of rx_streamer API method + bool recv_async_msg( + uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) + { + // TODO: implement me + return false; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Convert samples for one channel and sends a packet + size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs, + const size_t buffer_offset_in_samps, + const size_t num_samples, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + if (!_zero_copy_streamer.get_send_buffs( + _out_buffs, num_samples, metadata, timeout_ms)) { + return 0; + } + + size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item; + + for (size_t i = 0; i < get_num_channels(); i++) { + const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset; + _converters[i]->conv(input_ptr, _out_buffs[i], num_samples); + + _zero_copy_streamer.release_send_buff(i); + } + + return num_samples; + } + + //! Create converters and initialize _bytes_per_cpu_item + void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.cpu_format; + id.num_inputs = 1; + id.output_format = stream_args.otw_format + "_chdr"; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_chans; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(32767.0); + } + } + + // Converter item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Manages frame buffers and packet info + tx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Buffer used to handle send calls with no data + std::vector<const void*> _zero_buffs; + + const uint64_t _zero = 0; + + // Container for buffer pointers used in send method + std::vector<void*> _out_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Metadata cache for send calls with no data + detail::tx_metadata_cache _metadata_cache; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp new file mode 100644 index 000000000..1b6f55238 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -0,0 +1,147 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of tx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class tx_streamer_zero_copy +{ +public: + //! Constructor + tx_streamer_zero_copy(const size_t num_chans) + : _xports(num_chans), _frame_buffs(num_chans) + { + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of frame buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param nsamps_per_buff the number of samples that will be written to each buffer + * \param metadata the metadata to write to the packet header + * \param timeout_ms timeout in milliseconds + * \return true if the operation was sucessful, false if timeout occurs + */ + UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs, + const size_t nsamps_per_buff, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + // Try to get a buffer per channel + for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) { + _frame_buffs[_next_buff_to_get].first = + _xports[_next_buff_to_get]->get_send_buff(timeout_ms); + + if (!_frame_buffs[_next_buff_to_get].first) { + return false; + } + } + + // Got all the buffers, start from index 0 next call + _next_buff_to_get = 0; + + // Store portions of metadata we care about + typename transport_t::packet_info_t info; + info.has_tsf = metadata.has_time_spec; + + if (metadata.has_time_spec) { + info.tsf = metadata.time_spec.to_ticks(_tick_rate); + } + + info.payload_bytes = nsamps_per_buff * _bytes_per_item; + info.eob = metadata.end_of_burst; + + // Write packet header + for (size_t i = 0; i < buffs.size(); i++) { + std::tie(buffs[i], _frame_buffs[i].second) = + _xports[i]->write_packet_header(_frame_buffs[i].first, info); + } + + return true; + } + + /*! + * Send the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + UHD_FORCE_INLINE void release_send_buff(const size_t channel) + { + _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second); + _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first)); + + _frame_buffs[channel].first = nullptr; + _frame_buffs[channel].second = 0; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr> _xports; + + // Container to hold frame buffers for each channel and their packet sizes + std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Next channel from which to get a buffer, stored as a member to + // allow the streamer to continue where it stopped due to timeouts. + size_t _next_buff_to_get = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */ |