diff options
Diffstat (limited to 'host/lib/usrp/device3')
-rw-r--r-- | host/lib/usrp/device3/CMakeLists.txt | 15 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_flow_ctrl.hpp | 306 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_impl.cpp | 206 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_impl.hpp | 264 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 827 |
5 files changed, 0 insertions, 1618 deletions
diff --git a/host/lib/usrp/device3/CMakeLists.txt b/host/lib/usrp/device3/CMakeLists.txt deleted file mode 100644 index 979225c2c..000000000 --- a/host/lib/usrp/device3/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -# -# Copyright 2014 Ettus Research LLC -# Copyright 2018 Ettus Research, a National Instruments Company -# -# SPDX-License-Identifier: GPL-3.0-or-later -# - -######################################################################## -# This file included, use CMake directory variables -######################################################################## - -LIBUHD_APPEND_SOURCES( - ${CMAKE_CURRENT_SOURCE_DIR}/device3_impl.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/device3_io_impl.cpp -) diff --git a/host/lib/usrp/device3/device3_flow_ctrl.hpp b/host/lib/usrp/device3/device3_flow_ctrl.hpp deleted file mode 100644 index fd445effd..000000000 --- a/host/lib/usrp/device3/device3_flow_ctrl.hpp +++ /dev/null @@ -1,306 +0,0 @@ -// -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#ifndef INCLUDED_DEVICE3_FLOW_CTRL_HPP -#define INCLUDED_DEVICE3_FLOW_CTRL_HPP - -#include "device3_impl.hpp" -#include <uhd/transport/vrt_if_packet.hpp> -#include <uhd/transport/zero_copy.hpp> -#include <uhd/types/sid.hpp> -#include <uhd/utils/log.hpp> -#include <boost/shared_ptr.hpp> - -namespace uhd { namespace usrp { - -//! Stores the state of RX flow control -struct rx_fc_cache_t -{ - rx_fc_cache_t() - : interval(0) - , last_byte_count(0) - , total_bytes_consumed(0) - , total_packets_consumed(0) - , seq_num(0) - { - } - - //! Flow control interval in bytes - size_t interval; - //! Byte count at last flow control packet - uint32_t last_byte_count; - //! This will wrap around, but that's OK, because math. - uint32_t total_bytes_consumed; - //! This will wrap around, but that's OK, because math. - uint32_t total_packets_consumed; - //! Sequence number of next flow control packet - uint64_t seq_num; - uhd::sid_t sid; - uhd::transport::zero_copy_if::sptr xport; - std::function<uint32_t(uint32_t)> to_host; - std::function<uint32_t(uint32_t)> from_host; - std::function<void( - const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> - unpack; - std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> - pack; -}; - -/*! Send out RX flow control packets. - * - * This function handles updating the counters for the consumed - * bytes and packets, determines if a flow control message is - * is necessary, and sends one if it is. Passing a nullptr for - * the buff parameter will skip the counter update. - * - * \param fc_cache RX flow control state information - * \param buff Receive buffer. Setting to nullptr will - * skip the counter update. - */ -inline bool rx_flow_ctrl( - boost::shared_ptr<rx_fc_cache_t> fc_cache, uhd::transport::managed_buffer::sptr buff) -{ - // If the caller supplied a buffer - if (buff) { - // Unpack the header - uhd::transport::vrt::if_packet_info_t packet_info; - packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); - const uint32_t* pkt = buff->cast<const uint32_t*>(); - try { - fc_cache->unpack(pkt, packet_info); - } catch (const std::exception& ex) { - // Log and ignore - UHD_LOGGER_ERROR("RX FLOW CTRL") - << "Error unpacking packet: " << ex.what() << std::endl; - return true; - } - - // Update counters assuming the buffer is a consumed packet - if (not packet_info.error) { - const size_t bytes = 4 * (packet_info.num_header_words32 + packet_info.num_payload_words32); - fc_cache->total_bytes_consumed += bytes; - fc_cache->total_packets_consumed++; - } - } - - // Just return if there is no need to send a flow control packet - if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) { - return true; - } - - // Time to send a flow control packet - // Get a send buffer - uhd::transport::managed_send_buffer::sptr fc_buff = - fc_cache->xport->get_send_buff(0.0); - if (not fc_buff) { - throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); - } - uint32_t* pkt = fc_buff->cast<uint32_t*>(); - - // load packet info - uhd::transport::vrt::if_packet_info_t packet_info; - packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC; - packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32 * sizeof(uint32_t); - packet_info.packet_count = fc_cache->seq_num++; - packet_info.sob = false; - packet_info.eob = false; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = fc_cache->sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - // Load Header: - fc_cache->pack(pkt, packet_info); - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->total_packets_consumed); - pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->total_bytes_consumed); - - // send the buffer over the interface - fc_buff->commit(sizeof(uint32_t) * (packet_info.num_packet_words32)); - - // update byte count - fc_cache->last_byte_count = fc_cache->total_bytes_consumed; - - return true; -} - -/*! Handle RX flow control ACK packets. - * - */ -inline void handle_rx_flowctrl_ack( - boost::shared_ptr<rx_fc_cache_t> fc_cache, const uint32_t* payload) -{ - const uint32_t pkt_count = fc_cache->to_host(payload[0]); - const uint32_t byte_count = fc_cache->to_host(payload[1]); - if (fc_cache->total_bytes_consumed != byte_count) { - UHD_LOGGER_DEBUG("device3") - << "oh noes: byte_count==" << byte_count - << " total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::hex - << " sid==" << fc_cache->sid << std::dec << std::endl; - } - fc_cache->total_bytes_consumed = byte_count; - fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? - - // This will send a flow control packet if there is a significant discrepancy - rx_flow_ctrl(fc_cache, nullptr); -} - -//! Stores the state of TX flow control -struct tx_fc_cache_t -{ - tx_fc_cache_t(uint32_t capacity) - : last_byte_ack(0) - , last_seq_ack(0) - , byte_count(0) - , pkt_count(0) - , window_size(capacity) - , fc_ack_seqnum(0) - , fc_received(false) - { - } - - uint32_t last_byte_ack; - uint32_t last_seq_ack; - uint32_t byte_count; - uint32_t pkt_count; - uint32_t window_size; - uint32_t fc_ack_seqnum; - bool fc_received; - std::function<uint32_t(uint32_t)> to_host; - std::function<uint32_t(uint32_t)> from_host; - std::function<void( - const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> - unpack; - std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> - pack; -}; - -inline bool tx_flow_ctrl(boost::shared_ptr<tx_fc_cache_t> fc_cache, - uhd::transport::zero_copy_if::sptr xport, - uhd::transport::managed_buffer::sptr buff) -{ - while (true) { - // If there is space - if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) - >= buff->size()) { - // All is good - packet will be sent - fc_cache->byte_count += buff->size(); - // Round up to nearest word - if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) { - fc_cache->byte_count += - uhd::usrp::DEVICE3_LINE_SIZE - - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); - } - fc_cache->pkt_count++; - return true; - } - - // Look for a flow control message to update the space available in the buffer. - uhd::transport::managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); - if (buff) { - uhd::transport::vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); - const uint32_t* packet_buff = buff->cast<const uint32_t*>(); - try { - fc_cache->unpack(packet_buff, if_packet_info); - } catch (const std::exception& ex) { - UHD_LOGGER_ERROR("TX FLOW CTRL") - << "Error unpacking flow control packet: " << ex.what() << std::endl; - continue; - } - - if (if_packet_info.packet_type - != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC) { - UHD_LOGGER_ERROR("TX FLOW CTRL") - << "Unexpected packet received by flow control handler: " - << if_packet_info.packet_type << std::endl; - continue; - } - - const uint32_t* payload = &packet_buff[if_packet_info.num_header_words32]; - const uint32_t pkt_count = fc_cache->to_host(payload[0]); - const uint32_t byte_count = fc_cache->to_host(payload[1]); - - // update the amount of space - fc_cache->last_byte_ack = byte_count; - fc_cache->last_seq_ack = pkt_count; - - fc_cache->fc_received = true; - } - } - return false; -} - -inline void tx_flow_ctrl_ack(boost::shared_ptr<tx_fc_cache_t> fc_cache, - uhd::transport::zero_copy_if::sptr send_xport, - uhd::sid_t send_sid) -{ - if (not fc_cache->fc_received) { - return; - } - - // Time to send a flow control ACK packet - // Get a send buffer - uhd::transport::managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); - if (not fc_buff) { - UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; - return; - } - uint32_t* pkt = fc_buff->cast<uint32_t*>(); - - // Load packet info - uhd::transport::vrt::if_packet_info_t packet_info; - packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_ACK; - packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32 * sizeof(uint32_t); - packet_info.packet_count = fc_cache->fc_ack_seqnum++; - packet_info.sob = false; - packet_info.eob = true; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = send_sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - // Load Header: - fc_cache->pack(pkt, packet_info); - - // Update counters to include this packet - size_t fc_ack_pkt_size = sizeof(uint32_t) * (packet_info.num_packet_words32); - fc_cache->byte_count += fc_ack_pkt_size; - // Round up to nearest word - if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) { - fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); - } - fc_cache->pkt_count++; - - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->pkt_count); - pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->byte_count); - - // Send the buffer over the interface - fc_buff->commit(fc_ack_pkt_size); - - // Reset for next FC - fc_cache->fc_received = false; -} - -}}; // namespace uhd::usrp - -#endif /* INCLUDED_DEVICE3_FLOW_CTRL_HPP */ diff --git a/host/lib/usrp/device3/device3_impl.cpp b/host/lib/usrp/device3/device3_impl.cpp deleted file mode 100644 index bc1cf9002..000000000 --- a/host/lib/usrp/device3/device3_impl.cpp +++ /dev/null @@ -1,206 +0,0 @@ -// -// Copyright 2014-2016 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#include "device3_impl.hpp" -#include <uhd/rfnoc/block_ctrl_base.hpp> -#include <uhd/utils/log.hpp> -#include <uhdlib/rfnoc/ctrl_iface.hpp> -#include <uhdlib/rfnoc/graph_impl.hpp> -#include <boost/make_shared.hpp> -#include <algorithm> - -using namespace uhd::usrp; - -device3_impl::device3_impl() -{ - _type = uhd::device::USRP; - _async_md.reset(new async_md_type(1000 /*messages deep*/)); - _tree = uhd::property_tree::make(); -}; - -//! Returns true if the integer value stored in lhs is smaller than that in rhs -bool _compare_string_indexes(const std::string& lhs, const std::string& rhs) -{ - return boost::lexical_cast<size_t>(lhs) < boost::lexical_cast<size_t>(rhs); -} - -void device3_impl::merge_channel_defs(const std::vector<uhd::rfnoc::block_id_t>& chan_ids, - const std::vector<uhd::device_addr_t>& chan_args, - const uhd::direction_t dir) -{ - UHD_ASSERT_THROW(chan_ids.size() == chan_args.size()); - if (dir == uhd::DX_DIRECTION) { - merge_channel_defs(chan_ids, chan_args, RX_DIRECTION); - merge_channel_defs(chan_ids, chan_args, TX_DIRECTION); - return; - } - - uhd::fs_path chans_root = - uhd::fs_path("/channels/") / (dir == RX_DIRECTION ? "rx" : "tx"); - // Store the new positions of the channels: - std::vector<size_t> chan_idxs; - - // 1. Get sorted list of currently defined channels - std::vector<std::string> curr_channels; - if (_tree->exists(chans_root)) { - curr_channels = _tree->list(chans_root); - std::sort(curr_channels.begin(), curr_channels.end(), _compare_string_indexes); - } - - // 2. Cycle through existing channels to find out where to merge - // the new channels. Rules are: - // - The order of chan_ids must be preserved - // - All block indices that are in chan_ids may be overwritten in the channel - // definition - // - If the channels in chan_ids are not yet in the property tree channel list, - // they are appended. - for (const std::string& chan_idx : curr_channels) { - if (_tree->exists(chans_root / chan_idx)) { - rfnoc::block_id_t chan_block_id = - _tree->access<rfnoc::block_id_t>(chans_root / chan_idx).get(); - if (std::find(chan_ids.begin(), chan_ids.end(), chan_block_id) - != chan_ids.end()) { - chan_idxs.push_back(boost::lexical_cast<size_t>(chan_idx)); - } - } - } - size_t last_chan_idx = curr_channels.empty() - ? 0 - : (boost::lexical_cast<size_t>(curr_channels.back()) + 1); - while (chan_idxs.size() < chan_ids.size()) { - chan_idxs.push_back(last_chan_idx); - last_chan_idx++; - } - - // 3. Write the new channels - for (size_t i = 0; i < chan_ids.size(); i++) { - if (not _tree->exists(chans_root / chan_idxs[i])) { - _tree->create<rfnoc::block_id_t>(chans_root / chan_idxs[i]); - } - _tree->access<rfnoc::block_id_t>(chans_root / chan_idxs[i]).set(chan_ids[i]); - if (not _tree->exists(chans_root / chan_idxs[i] / "args")) { - _tree->create<uhd::device_addr_t>(chans_root / chan_idxs[i] / "args"); - } - _tree->access<uhd::device_addr_t>(chans_root / chan_idxs[i] / "args") - .set(chan_args[i]); - } -} - -/*********************************************************************** - * RFNoC-Specific - **********************************************************************/ -void device3_impl::enumerate_rfnoc_blocks(size_t device_index, - size_t n_blocks, - size_t base_port, - const uhd::sid_t& base_sid, - uhd::device_addr_t transport_args) -{ - // entries that are already connected to this block - uhd::sid_t ctrl_sid = base_sid; - uhd::property_tree::sptr subtree = - _tree->subtree(uhd::fs_path("/mboards") / device_index); - // 1) Clean property tree entries - // TODO put this back once radios are actual rfnoc blocks!!!!!! - // if (subtree->exists("xbar")) { - // subtree->remove("xbar"); - //} - // 2) Destroy existing block controllers - // TODO: Clear out all the old block control classes - // 3) Create new block controllers - for (size_t i = 0; i < n_blocks; i++) { - // First, make a transport for port number zero, because we always need that: - ctrl_sid.set_dst_xbarport(base_port + i); - ctrl_sid.set_dst_blockport(0); - both_xports_t xport = this->make_transport(ctrl_sid, CTRL, transport_args); - UHD_LOG_TRACE("DEVICE3", - str(boost::format("Setting up NoC-Shell Control for port #0 (SID: %s)...") - % xport.send_sid.to_pp_string_hex())); - uhd::rfnoc::ctrl_iface::sptr ctrl = uhd::rfnoc::ctrl_iface::make(xport, - str(boost::format("CE_%02d_Port_%02X") % i % ctrl_sid.get_dst_endpoint())); - uint64_t noc_id = ctrl->send_cmd_pkt( - uhd::rfnoc::SR_READBACK, uhd::rfnoc::SR_READBACK_REG_ID, true); - UHD_LOG_DEBUG("DEVICE3", - str(boost::format("Port 0x%02X: Found NoC-Block with ID %016X.") - % int(ctrl_sid.get_dst_endpoint()) % noc_id)); - uhd::rfnoc::make_args_t make_args; - uhd::rfnoc::blockdef::sptr block_def = - uhd::rfnoc::blockdef::make_from_noc_id(noc_id); - if (not block_def) { - UHD_LOG_WARNING("DEVICE3", - "No block definition found, using default block configuration " - "for block with NOC ID: " - + str(boost::format("0x%08X") % noc_id)); - block_def = - uhd::rfnoc::blockdef::make_from_noc_id(uhd::rfnoc::DEFAULT_NOC_ID_64); - } - UHD_ASSERT_THROW(block_def); - make_args.ctrl_ifaces[0] = ctrl; - for (const size_t port_number : block_def->get_all_port_numbers()) { - if (port_number == 0) { // We've already set this up - continue; - } - ctrl_sid.set_dst_blockport(port_number); - both_xports_t xport1 = this->make_transport(ctrl_sid, CTRL, transport_args); - UHD_LOG_TRACE("DEVICE3", - str(boost::format("Setting up NoC-Shell Control for port #%d " - "(SID: %s)...") - % port_number % xport1.send_sid.to_pp_string_hex())); - uhd::rfnoc::ctrl_iface::sptr ctrl1 = uhd::rfnoc::ctrl_iface::make(xport1, - str(boost::format("CE_%02d_Port_%02X") % i - % ctrl_sid.get_dst_endpoint())); - make_args.ctrl_ifaces[port_number] = ctrl1; - } - UHD_LOG_TRACE("DEVICE3", - "All control transports successfully created for block with ID " - << str(boost::format("0x%08X") % noc_id)); - - make_args.base_address = xport.send_sid.get_dst(); - make_args.device_index = device_index; - make_args.tree = subtree; - { // Critical section for block_ctrl vector access - boost::lock_guard<boost::mutex> lock(_block_ctrl_mutex); - _rfnoc_block_ctrl.push_back( - uhd::rfnoc::block_ctrl_base::make(make_args, noc_id)); - _rfnoc_block_ctrl.back()->set_graph_update_cb([this]() { - update_rx_streamers(); - update_tx_streamers(); - }); - } - } -} - - -uhd::rfnoc::graph::sptr device3_impl::create_graph(const std::string& name) -{ - // Create an async message handler - UHD_LOGGER_TRACE("DEVICE3") - << "Creating async message handler for graph `" << name << "'..."; - // FIXME: right now this only can only handle source sid of 0 and xbar local addr - // of 2. This is ok for now because that most of our device has xbard local addr - // hardcode to 2. - sid_t async_sid(0); - async_sid.set_dst_addr(2); - both_xports_t async_xports = make_transport(async_sid, - ASYNC_MSG, - // FIXME: only get rx_hints from mb index of 0 - get_rx_hints(0)); - UHD_LOGGER_TRACE("DEVICE3") << " Async transport ready." << std::endl; - uhd::rfnoc::async_msg_handler::sptr async_msg_handler = - uhd::rfnoc::async_msg_handler::make(async_xports.recv, - async_xports.send, - async_xports.send_sid, - async_xports.endianness); - UHD_LOGGER_TRACE("DEVICE3") - << "Async message has address " << async_xports.send_sid << std::endl; - - // Create the graph - UHD_LOGGER_TRACE("DEVICE3") << "Creating graph `" << name << "'..." << std::endl; - uhd::rfnoc::graph::sptr graph = boost::make_shared<uhd::rfnoc::graph_impl>( - name, shared_from_this(), async_msg_handler); - - return graph; -} diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp deleted file mode 100644 index 17f6a3f6f..000000000 --- a/host/lib/usrp/device3/device3_impl.hpp +++ /dev/null @@ -1,264 +0,0 @@ -// -// Copyright 2014-2015 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -// Declares the device3_impl class which is a layer between device3 and -// the different 3-rd gen device impls (e.g. x300_impl) - -#ifndef INCLUDED_DEVICE3_IMPL_HPP -#define INCLUDED_DEVICE3_IMPL_HPP - -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp" -#include <uhd/device3.hpp> -#include <uhd/transport/bounded_buffer.hpp> -#include <uhd/transport/chdr.hpp> -#include <uhd/transport/vrt_if_packet.hpp> -#include <uhd/transport/zero_copy.hpp> -#include <uhd/types/direction.hpp> -#include <uhd/types/endianness.hpp> -#include <uhd/types/metadata.hpp> -#include <uhd/types/sid.hpp> -#include <uhd/utils/tasks.hpp> -#include <uhdlib/rfnoc/rx_stream_terminator.hpp> -#include <uhdlib/rfnoc/tx_stream_terminator.hpp> -#include <uhdlib/rfnoc/xports.hpp> - -namespace uhd { namespace usrp { - -/*********************************************************************** - * Default settings (any device3 may override these) - **********************************************************************/ -static const size_t DEVICE3_RX_FC_REQUEST_FREQ = 32; // per flow-control window -static const size_t DEVICE3_TX_FC_RESPONSE_FREQ = 8; -static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2; -static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET = 0; -static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET = 1; -static const size_t DEVICE3_LINE_SIZE = 8; - -static const size_t DEVICE3_TX_MAX_HDR_LEN = - uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes -static const size_t DEVICE3_RX_MAX_HDR_LEN = - uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes - -// This class manages the lifetime of the TX async message handler task, transports, and -// terminator -class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer -{ -public: - device3_send_packet_streamer(const size_t max_num_samps, - const uhd::rfnoc::tx_stream_terminator::sptr terminator, - const both_xports_t data_xport, - const both_xports_t async_msg_xport) - : uhd::transport::sph::send_packet_streamer(max_num_samps) - , _terminator(terminator) - , _data_xport(data_xport) - , _async_msg_xport(async_msg_xport) - { - } - - ~device3_send_packet_streamer() - { - // Make sure the async task is destroyed before the transports - _tx_async_msg_tasks.clear(); - } - - uhd::rfnoc::tx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - - void add_async_msg_task(task::sptr task) - { - _tx_async_msg_tasks.push_back(task); - } - -private: - uhd::rfnoc::tx_stream_terminator::sptr _terminator; - both_xports_t _data_xport; - both_xports_t _async_msg_xport; - std::vector<task::sptr> _tx_async_msg_tasks; -}; - -// This class manages the lifetime of the RX transports and terminator and provides access -// to both -class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer -{ -public: - device3_recv_packet_streamer(const size_t max_num_samps, - const uhd::rfnoc::rx_stream_terminator::sptr terminator, - const both_xports_t xport) - : uhd::transport::sph::recv_packet_streamer(max_num_samps) - , _terminator(terminator) - , _xport(xport) - { - } - - ~device3_recv_packet_streamer() {} - - both_xports_t get_xport() - { - return _xport; - } - - uhd::rfnoc::rx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - -private: - uhd::rfnoc::rx_stream_terminator::sptr _terminator; - both_xports_t _xport; -}; - -class device3_impl : public uhd::device3, - public boost::enable_shared_from_this<device3_impl> -{ -public: - /*********************************************************************** - * device3-specific Types - **********************************************************************/ - typedef uhd::transport::bounded_buffer<uhd::async_metadata_t> async_md_type; - - //! The purpose of a transport - enum xport_type_t { CTRL = 0, ASYNC_MSG, TX_DATA, RX_DATA }; - - enum xport_t { AXI, ETH, PCIE }; - - //! Stores all streaming-related options - struct stream_options_t - { - //! Max size of the header in bytes for TX - size_t tx_max_len_hdr; - //! Max size of the header in bytes for RX - size_t rx_max_len_hdr; - //! How often we send ACKs to the upstream block per one full FC window - size_t rx_fc_request_freq; - //! How often the downstream block should send ACKs per one full FC window - size_t tx_fc_response_freq; - stream_options_t(void) - : tx_max_len_hdr(DEVICE3_TX_MAX_HDR_LEN) - , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN) - , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ) - , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) - { - } - }; - - /*********************************************************************** - * I/O Interface - **********************************************************************/ - uhd::tx_streamer::sptr get_tx_stream(const uhd::stream_args_t&); - uhd::rx_streamer::sptr get_rx_stream(const uhd::stream_args_t&); - bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout); - - /*********************************************************************** - * Other public APIs - **********************************************************************/ - rfnoc::graph::sptr create_graph(const std::string& name = ""); - -protected: - /*********************************************************************** - * Structors - **********************************************************************/ - device3_impl(); - virtual ~device3_impl() {} - - /*********************************************************************** - * Streaming-related - **********************************************************************/ -public: // TODO make these protected again - /*! Update tick rate, samp rate, and scaling on the streamers by querying - * the graph. - */ - void update_rx_streamers(); - /*! Update tick rate, samp rate, and scaling on the streamers by querying - * the graph. - */ - void update_tx_streamers(); - -protected: - /*********************************************************************** - * Transport-related - **********************************************************************/ - stream_options_t stream_options; - - /*! \brief Create a transport to a given endpoint. - * - * \param address The endpoint address of the block we're creating a transport to. - * The source address in this value is not considered, only the - * destination address. - * \param xport_type Specify which kind of transport this is. - * \param args Additional arguments for the transport generation. See \ref - * page_transport for valid arguments. - */ - virtual uhd::both_xports_t make_transport(const uhd::sid_t& address, - const xport_type_t xport_type, - const uhd::device_addr_t& args) = 0; - - virtual uhd::device_addr_t get_tx_hints(size_t) - { - return uhd::device_addr_t(); - } - virtual uhd::device_addr_t get_rx_hints(size_t) - { - return uhd::device_addr_t(); - } - - //! Is called after a streamer is generated - virtual void post_streamer_hooks(uhd::direction_t) {} - - //! get mtu - virtual size_t get_mtu(const size_t, const uhd::direction_t) = 0; - - /*********************************************************************** - * Channel-related - **********************************************************************/ - /*! Merge a list of channels into the existing channel definition. - * - * Intelligently merge the channels described in \p chan_ids - * into the current channel definition. If none of the channels in - * \p chan_ids is in the current definition, they simply get appended. - * Otherwise, they get overwritten in the order of \p chan_ids. - * - * \param chan_ids List of block IDs for the channels. - * \param chan_args New channel args. Must have same length as chan_ids. - * - */ - void merge_channel_defs(const std::vector<rfnoc::block_id_t>& chan_ids, - const std::vector<uhd::device_addr_t>& chan_args, - const uhd::direction_t dir); - - /*********************************************************************** - * RFNoC-Specific - **********************************************************************/ - void enumerate_rfnoc_blocks(size_t device_index, - size_t n_blocks, - size_t base_port, - const uhd::sid_t& base_sid, - uhd::device_addr_t transport_args); - - /*********************************************************************** - * Members - **********************************************************************/ - // TODO: Maybe move these to private - uhd::dict<std::string, boost::weak_ptr<uhd::rx_streamer>> _rx_streamers; - uhd::dict<std::string, boost::weak_ptr<uhd::tx_streamer>> _tx_streamers; - -private: - /*********************************************************************** - * Private Members - **********************************************************************/ - //! Buffer for async metadata - boost::shared_ptr<async_md_type> _async_md; - - //! This mutex locks the get_xx_stream() functions. - boost::mutex _transport_setup_mutex; -}; - -}} /* namespace uhd::usrp */ - -#endif /* INCLUDED_DEVICE3_IMPL_HPP */ diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp deleted file mode 100644 index 800d2f5a8..000000000 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ /dev/null @@ -1,827 +0,0 @@ -// -// Copyright 2014-2016 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -// Provides streaming-related functions which are used by device3 objects. - -#include "device3_flow_ctrl.hpp" -#include "device3_impl.hpp" -#include <uhd/rfnoc/constants.hpp> -#include <uhd/rfnoc/radio_ctrl.hpp> -#include <uhd/rfnoc/rate_node_ctrl.hpp> -#include <uhd/rfnoc/sink_block_ctrl_base.hpp> -#include <uhd/rfnoc/source_block_ctrl_base.hpp> -#include <uhd/transport/zero_copy_flow_ctrl.hpp> -#include <uhd/utils/byteswap.hpp> -#include <uhd/utils/log.hpp> -#include <uhdlib/rfnoc/rx_stream_terminator.hpp> -#include <uhdlib/rfnoc/tx_stream_terminator.hpp> -#include <uhdlib/usrp/common/async_packet_handler.hpp> -#include <boost/atomic.hpp> - -#define UHD_TX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER") -#define UHD_RX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER") - -using namespace uhd; -using namespace uhd::usrp; -using namespace uhd::transport; - -/*********************************************************************** - * Helper functions for get_?x_stream() - **********************************************************************/ -static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t& args_) -{ - uhd::stream_args_t args = args_; - if (args.channels.empty()) { - args.channels = std::vector<size_t>(1, 0); - } - - return args; -} - -static void check_stream_sig_compatible( - const rfnoc::stream_sig_t& stream_sig, stream_args_t& args, const std::string& tx_rx) -{ - if (args.otw_format.empty()) { - if (stream_sig.item_type.empty()) { - throw uhd::runtime_error( - str(boost::format("[%s Streamer] No otw_format defined!") % tx_rx)); - } else { - args.otw_format = stream_sig.item_type; - } - } else if (not stream_sig.item_type.empty() - and stream_sig.item_type != args.otw_format) { - throw uhd::runtime_error( - str(boost::format("[%s Streamer] Conflicting OTW types defined: " - "args.otw_format = '%s' <=> stream_sig.item_type = '%s'") - % tx_rx % args.otw_format % stream_sig.item_type)); - } - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - if (stream_sig.packet_size) { - if (args.args.has_key("spp")) { - size_t args_spp = args.args.cast<size_t>("spp", 0); - if (args_spp * bpi != stream_sig.packet_size) { - throw uhd::runtime_error( - str(boost::format( - "[%s Streamer] Conflicting packet sizes defined: args yields " - "%d bytes but stream_sig.packet_size is %d bytes") - % tx_rx % (args_spp * bpi) % stream_sig.packet_size)); - } - } else { - args.args["spp"] = str(boost::format("%d") % (stream_sig.packet_size / bpi)); - } - } -} - -/*! \brief Returns a list of rx or tx channels for a streamer. - * - * If the given stream args contain instructions to set up channels, - * those are used. Otherwise, the current device's channel definition - * is consulted. - * - * \param args_ Stream args. - * \param[out] chan_list The list of channels in the correct order. - * \param[out] chan_args Channel args for every channel. `chan_args.size() == - * chan_list.size()` - */ -void generate_channel_list(const uhd::stream_args_t& args_, - std::vector<uhd::rfnoc::block_id_t>& chan_list, - std::vector<device_addr_t>& chan_args) -{ - uhd::stream_args_t args = args_; - std::vector<uhd::rfnoc::block_id_t> chan_list_(args.channels.size()); - std::vector<device_addr_t> chan_args_(args.channels.size()); - - for (size_t i = 0; i < args.channels.size(); i++) { - // Extract block ID - size_t chan_idx = args.channels[i]; - std::string key = str(boost::format("block_id%d") % chan_idx); - if (args.args.has_key(key)) { - chan_list_[i] = args.args.pop(key); - } else if (args.args.has_key("block_id")) { - chan_list_[i] = args.args["block_id"]; - } else { - throw uhd::runtime_error( - str(boost::format( - "Cannot create streamers: No block_id specified for channel %d.") - % chan_idx)); - } - - // Split off known channel specific args - key = str(boost::format("block_port%d") % chan_idx); - if (args.args.has_key(key)) { - chan_args_[i]["block_port"] = args.args.pop(key); - } - key = str(boost::format("radio_id%d") % chan_idx); - if (args.args.has_key(key)) { - chan_args_[i]["radio_id"] = args.args.pop(key); - } - key = str(boost::format("radio_port%d") % chan_idx); - if (args.args.has_key(key)) { - chan_args_[i]["radio_port"] = args.args.pop(key); - } - } - - // Add all remaining args to all channel args - for (device_addr_t& chan_arg : chan_args_) { - chan_arg = chan_arg.to_string() + "," + args.args.to_string(); - } - - chan_list = chan_list_; - chan_args = chan_args_; -} - - -/*********************************************************************** - * RX Flow Control Functions - **********************************************************************/ -/*! Determine the size of the flow control window in number of packets. - * - * This value depends on three things: - * - The packet size (in bytes), P - * - The size of the software buffer (in bytes), B - * - The desired buffer fullness, F - * - * The FC window size is thus X = floor(B*F/P). - * - * \param pkt_size The maximum packet size in bytes - * \param sw_buff_size Software buffer size in bytes - * \param rx_args If this has a key 'recv_buff_fullness', this value will - * be used for said fullness. Must be between 0.01 and 1. - * - * \returns The size of the flow control window in number of packets - */ -static size_t get_rx_flow_control_window( - size_t pkt_size, size_t sw_buff_size, const device_addr_t& rx_args) -{ - double fullness_factor = rx_args.cast<double>( - "recv_buff_fullness", uhd::rfnoc::DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR); - - if (fullness_factor < 0.01 || fullness_factor > 1) { - throw uhd::value_error( - "recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); - } - - size_t window_in_bytes = (static_cast<size_t>(fullness_factor * sw_buff_size)); - if (rx_args.has_key("max_recv_window")) { - window_in_bytes = std::min( - window_in_bytes, - pkt_size * rx_args.cast<size_t>("max_recv_window", 1) - ); - } - if (window_in_bytes < pkt_size) { - throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); - } - UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= window_in_bytes); - return window_in_bytes; -} - - -/*********************************************************************** - * TX Async Message Functions - **********************************************************************/ -#define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 - -struct async_tx_info_t -{ - size_t stream_channel; - size_t device_channel; - boost::shared_ptr<device3_impl::async_md_type> async_queue; - boost::shared_ptr<device3_impl::async_md_type> old_async_queue; -}; - -/*! Handle incoming messages. - * Send them to the async message queue for the user to poll. - * - * This is run inside a uhd::task as long as this streamer lives. - */ -static void handle_tx_async_msgs(boost::shared_ptr<async_tx_info_t> async_info, - zero_copy_if::sptr xport, - uint32_t (*to_host)(uint32_t), - void (*unpack)(const uint32_t* packet_buff, vrt::if_packet_info_t&), - boost::function<double(void)> get_tick_rate) -{ - managed_recv_buffer::sptr buff = xport->get_recv_buff(); - if (not buff) { - return; - } - - // extract packet info - vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); - const uint32_t* packet_buff = buff->cast<const uint32_t*>(); - - // unpacking can fail - try { - unpack(packet_buff, if_packet_info); - } catch (const std::exception& ex) { - UHD_LOGGER_ERROR("STREAMER") - << "Error parsing async message packet: " << ex.what(); - return; - } - - double tick_rate = get_tick_rate(); - if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { - tick_rate = 1; - } - - // fill in the async metadata - async_metadata_t metadata; - load_metadata_from_buff(to_host, - metadata, - if_packet_info, - packet_buff, - tick_rate, - async_info->stream_channel); - - // Filter out any flow control messages and cache the rest - if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - UHD_LOGGER_ERROR("TX ASYNC MSG") - << "Unexpected flow control message found in async message handling" - << std::endl; - } else { - async_info->async_queue->push_with_pop_on_full(metadata); - metadata.channel = async_info->device_channel; - async_info->old_async_queue->push_with_pop_on_full(metadata); - standard_async_msg_prints(metadata); - } -} - -bool device3_impl::recv_async_msg(async_metadata_t& async_metadata, double timeout) -{ - return _async_md->pop_with_timed_wait(async_metadata, timeout); -} - -/*********************************************************************** - * Receive streamer - **********************************************************************/ -void device3_impl::update_rx_streamers() -{ - for (const std::string& block_id : _rx_streamers.keys()) { - UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id; - boost::shared_ptr<device3_recv_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<device3_recv_packet_streamer>( - _rx_streamers[block_id].lock()); - if (my_streamer) { - double tick_rate = my_streamer->get_terminator()->get_tick_rate(); - if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { - tick_rate = 1.0; - } - double samp_rate = my_streamer->get_terminator()->get_output_samp_rate(); - if (samp_rate == rfnoc::rate_node_ctrl::RATE_UNDEFINED) { - samp_rate = 1.0; - } - double scaling = my_streamer->get_terminator()->get_output_scale_factor(); - if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { - scaling = 1 / 32767.; - } - UHD_RX_STREAMER_LOG() - << " New tick_rate == " << tick_rate - << " New samp_rate == " << samp_rate << " New scaling == " << scaling; - - my_streamer->set_tick_rate(tick_rate); - my_streamer->set_samp_rate(samp_rate); - my_streamer->set_scale_factor(scaling); - } - } -} - -rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t& args_) -{ - boost::mutex::scoped_lock lock(_transport_setup_mutex); - stream_args_t args = sanitize_stream_args(args_); - - // I. Generate the channel list - std::vector<uhd::rfnoc::block_id_t> chan_list; - std::vector<device_addr_t> chan_args; - generate_channel_list(args, chan_list, chan_args); - // Note: All 'args.args' are merged into chan_args now. - - // II. Iterate over all channels - boost::shared_ptr<device3_recv_packet_streamer> my_streamer; - // The terminator's lifetime is coupled to the streamer. - // There is only one terminator. If the streamer has multiple channels, - // it will be connected to each upstream block. - rfnoc::rx_stream_terminator::sptr recv_terminator = - rfnoc::rx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { - // First, configure blocks and create transport - - // Get block ID and mb index - uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; - UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id; - // Update args so args.args is always valid for this particular channel: - args.args = chan_args[stream_i]; - size_t mb_index = block_id.get_device_no(); - size_t suggested_block_port = - args.args.cast<size_t>("block_port", rfnoc::ANY_PORT); - - // Access to this channel's block control - uhd::rfnoc::source_block_ctrl_base::sptr blk_ctrl = - boost::dynamic_pointer_cast<uhd::rfnoc::source_block_ctrl_base>( - get_block_ctrl(block_id)); - - // Connect the terminator with this channel's block. - size_t block_port = blk_ctrl->connect_downstream( - recv_terminator, suggested_block_port, args.args); - const size_t terminator_port = recv_terminator->connect_upstream(blk_ctrl); - blk_ctrl->set_downstream_port(block_port, terminator_port); - recv_terminator->set_upstream_port(terminator_port, block_port); - - // Check if the block connection is compatible (spp and item type) - check_stream_sig_compatible( - blk_ctrl->get_output_signature(block_port), args, "RX"); - - // Setup the DSP transport hints - device_addr_t rx_hints = get_rx_hints(mb_index); - - // Search the device and all nodes for lowest MTU - size_t mtu = std::min( - get_mtu(mb_index, uhd::direction_t::RX_DIRECTION), - blk_ctrl->get_mtu(block_port)); - UHD_RX_STREAMER_LOG() << "Maximum MTU supported by " - << blk_ctrl->unique_id() - << ": " << blk_ctrl->get_mtu(block_port); - std::vector<boost::shared_ptr<uhd::rfnoc::source_block_ctrl_base>> - upstream_source_nodes = - blk_ctrl->find_upstream_node<uhd::rfnoc::source_block_ctrl_base>(); - for (const boost::shared_ptr<uhd::rfnoc::source_block_ctrl_base>& node : - upstream_source_nodes) { - // Get MTU from Port 0 of the upstream nodes. This is okay for now as - // currently we use port 0 of a block in case of channel 1. - UHD_RX_STREAMER_LOG() << "Maximum MTU supported by " << node->unique_id() - << ": " << node->get_mtu(0); - mtu = std::min(mtu, node->get_mtu(0)); - } - rx_hints["mtu"] = std::to_string(mtu); - - // Make sure user supplied recv_frame_size is less than the MTU - if (rx_hints.cast<size_t>("recv_frame_size", mtu) > mtu) { - UHD_LOGGER_WARNING("STREAMER") - << "Requested recv_frame_size of " - << rx_hints["recv_frame_size"] - << " exceeds the maximum possible on this stream. Using " - << mtu; - rx_hints["recv_frame_size"] = std::to_string(mtu); - } - - // allocate sid and create transport - uhd::sid_t stream_address = blk_ctrl->get_address(block_port); - UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string(); - both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); - UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec - << " actual recv_buff_size = " << xport.recv_buff_size; - - // Configure the block - // Flow control setup - const size_t pkt_size = xport.recv->get_recv_frame_size(); - // Leave one pkt_size space for overrun packets - TODO make this obsolete - const size_t fc_window = - get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - - pkt_size; - const size_t fc_handle_window = - std::max<size_t>(pkt_size, fc_window / stream_options.rx_fc_request_freq); - UHD_RX_STREAMER_LOG() << "Flow Control Window = " << (fc_window) - << ", Flow Control Handler Window = " << fc_handle_window; - blk_ctrl->configure_flow_control_out(true, - xport.lossless, - fc_window, - rx_hints.cast<size_t>("recv_pkt_limit", - 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 - block_port); - - // Add flow control transport - boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); - fc_cache->sid = xport.send_sid; - fc_cache->xport = xport.send; - fc_cache->interval = fc_handle_window; - if (xport.endianness == ENDIANNESS_BIG) { - fc_cache->to_host = uhd::ntohx<uint32_t>; - fc_cache->from_host = uhd::htonx<uint32_t>; - fc_cache->pack = vrt::chdr::if_hdr_pack_be; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; - } else { - fc_cache->to_host = uhd::wtohx<uint32_t>; - fc_cache->from_host = uhd::htowx<uint32_t>; - fc_cache->pack = vrt::chdr::if_hdr_pack_le; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; - } - xport.recv = zero_copy_flow_ctrl::make( - xport.recv, 0, [fc_cache](managed_buffer::sptr buff) { - return rx_flow_ctrl(fc_cache, buff); - }); - - // Configure the block - // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. - // See noc_shell.v, in the section called Stream Source for details. - // Setting SR_CLEAR_TX_FC will actually also clear the destination and - // other settings. - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x1, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x0, block_port); - // Configure routing for data - blk_ctrl->set_destination(xport.send_sid.get_src(), block_port); - - // Configure routing for responses - blk_ctrl->sr_write( - uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); - UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src(); - - // Find all upstream radio nodes and set their response in SID to the host - std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl>> upstream_radio_nodes = - blk_ctrl->find_upstream_node<uhd::rfnoc::radio_ctrl>(); - UHD_RX_STREAMER_LOG() << "Number of upstream radio nodes: " - << upstream_radio_nodes.size(); - for (const boost::shared_ptr<uhd::rfnoc::radio_ctrl>& node : - upstream_radio_nodes) { - node->sr_write( - uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); - } - - // Second, configure the streamer - - // make the new streamer given the samples per packet - if (not my_streamer) { - // To calculate the max number of samples per packet, we assume the maximum - // header length to avoid fragmentation should the entire header be used. - const size_t bpp = - pkt_size - stream_options.rx_max_len_hdr; // bytes per packet - const size_t bpi = - convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast<size_t>("spp", bpp / bpi), - bpp / bpi); // samples per packet - UHD_RX_STREAMER_LOG() << "bpp == " << bpp << ", bpi == " << bpi << ", spp == " << spp; - - my_streamer = boost::make_shared<device3_recv_packet_streamer>( - spp, recv_terminator, xport); - my_streamer->resize(chan_list.size()); - } - - // init some streamer stuff - std::string conv_endianness; - if (xport.endianness == ENDIANNESS_BIG) { - my_streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be); - conv_endianness = "be"; - } else { - my_streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_le); - conv_endianness = "le"; - } - - // set the converter - uhd::convert::id_type id; - id.input_format = args.otw_format + "_item32_" + conv_endianness; - id.num_inputs = 1; - id.output_format = args.cpu_format; - id.num_outputs = 1; - my_streamer->set_converter(id); - - // Give the streamer a functor to handle flow control ACK messages - my_streamer->set_xport_handle_flowctrl_ack( - stream_i, [fc_cache](const uint32_t* payload) { - handle_rx_flowctrl_ack(fc_cache, payload); - }); - - // Give the streamer a functor to get the recv_buffer - my_streamer->set_xport_chan_get_buff(stream_i, - [xport](double timeout) { return xport.recv->get_recv_buff(timeout); }, - true /*flush*/ - ); - - // Give the streamer a functor to handle overruns - // bind requires a weak_ptr to break the a streamer->streamer circular dependency - // Using "this" is OK because we know that this device3_impl will outlive the - // streamer - boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer); - my_streamer->set_overflow_handler( - stream_i, [recv_terminator, weak_ptr, stream_i]() { - recv_terminator->handle_overrun(weak_ptr, stream_i); - }); - - // Give the streamer a functor issue stream cmd - my_streamer->set_issue_stream_cmd( - stream_i, [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) { - blk_ctrl->issue_stream_cmd(stream_cmd, block_port); - }); - } - - // Notify all blocks in this chain that they are connected to an active streamer - recv_terminator->set_rx_streamer(true, 0); - - // Store a weak pointer to prevent a streamer->device3_impl->streamer circular - // dependency. Note that we store the streamer only once, and use its terminator's ID - // to do so. - _rx_streamers[recv_terminator->unique_id()] = - boost::weak_ptr<uhd::rx_streamer>(my_streamer); - - // Sets tick rate, samp rate and scaling on this streamer. - // A registered terminator is required to do this. - update_rx_streamers(); - - post_streamer_hooks(RX_DIRECTION); - return my_streamer; -} - -/*********************************************************************** - * Transmit streamer - **********************************************************************/ -void device3_impl::update_tx_streamers() -{ - for (const std::string& block_id : _tx_streamers.keys()) { - UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id; - boost::shared_ptr<device3_send_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<device3_send_packet_streamer>( - _tx_streamers[block_id].lock()); - if (my_streamer) { - double tick_rate = my_streamer->get_terminator()->get_tick_rate(); - if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { - tick_rate = 1.0; - } - double samp_rate = my_streamer->get_terminator()->get_input_samp_rate(); - if (samp_rate == rfnoc::rate_node_ctrl::RATE_UNDEFINED) { - samp_rate = 1.0; - } - double scaling = my_streamer->get_terminator()->get_input_scale_factor(); - if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { - scaling = 32767.; - } - UHD_TX_STREAMER_LOG() - << "New tick_rate == " << tick_rate << " New samp_rate == " << samp_rate - << " New scaling == " << scaling; - my_streamer->set_tick_rate(tick_rate); - my_streamer->set_samp_rate(samp_rate); - my_streamer->set_scale_factor(scaling); - } - } -} - -tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t& args_) -{ - boost::mutex::scoped_lock lock(_transport_setup_mutex); - stream_args_t args = sanitize_stream_args(args_); - - // I. Generate the channel list - std::vector<uhd::rfnoc::block_id_t> chan_list; - std::vector<device_addr_t> chan_args; - generate_channel_list(args, chan_list, chan_args); - // Note: All 'args.args' are merged into chan_args now. - - // shared async queue for all channels in streamer - boost::shared_ptr<async_md_type> async_md(new async_md_type(1000 /*messages deep*/)); - - // II. Iterate over all channels - boost::shared_ptr<device3_send_packet_streamer> my_streamer; - // The terminator's lifetime is coupled to the streamer. - // There is only one terminator. If the streamer has multiple channels, - // it will be connected to each downstream block. - rfnoc::tx_stream_terminator::sptr send_terminator = - rfnoc::tx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { - // First, configure the downstream blocks and create the transports - - // Get block ID and mb index - uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; - // Update args so args.args is always valid for this particular channel: - args.args = chan_args[stream_i]; - size_t mb_index = block_id.get_device_no(); - size_t suggested_block_port = - args.args.cast<size_t>("block_port", rfnoc::ANY_PORT); - - // Access to this channel's block control - uhd::rfnoc::sink_block_ctrl_base::sptr blk_ctrl = - boost::dynamic_pointer_cast<uhd::rfnoc::sink_block_ctrl_base>( - get_block_ctrl(block_id)); - - // Connect the terminator with this channel's block. - // This will throw if the connection is not possible. - size_t block_port = - blk_ctrl->connect_upstream(send_terminator, suggested_block_port, args.args); - const size_t terminator_port = send_terminator->connect_downstream(blk_ctrl); - blk_ctrl->set_upstream_port(block_port, terminator_port); - send_terminator->set_downstream_port(terminator_port, block_port); - - // Check if the block connection is compatible (spp and item type) - check_stream_sig_compatible( - blk_ctrl->get_input_signature(block_port), args, "TX"); - - // Setup the dsp transport hints - device_addr_t tx_hints = get_tx_hints(mb_index); - - // Search the device and all nodes for lowest MTU - size_t mtu = std::min( - get_mtu(mb_index, uhd::direction_t::TX_DIRECTION), - blk_ctrl->get_mtu(block_port)); - UHD_TX_STREAMER_LOG() << "Maximum MTU supported by " - << blk_ctrl->unique_id() << ": " - << blk_ctrl->get_mtu(block_port); - std::vector<boost::shared_ptr<uhd::rfnoc::sink_block_ctrl_base>> - downstream_sink_nodes = - blk_ctrl->find_downstream_node<uhd::rfnoc::sink_block_ctrl_base>(); - for (const boost::shared_ptr<uhd::rfnoc::sink_block_ctrl_base>& node : - downstream_sink_nodes) { - // Get MTU from Port 0 of the downstream nodes. This is okay for now as - // currently we use port 0 of a block in case of channel 1. - UHD_TX_STREAMER_LOG() << "Maximum MTU supported by " - << node->unique_id() << ": " - << node->get_mtu(0); - mtu = std::min(mtu, node->get_mtu(0)); - } - tx_hints["mtu"] = std::to_string(mtu); - - // Make sure user supplied send_frame_size is less than the MTU - if (tx_hints.cast<size_t>("send_frame_size", mtu) > mtu) { - UHD_LOGGER_WARNING("STREAMER") - << "Requested send_frame_size of " - << tx_hints["send_frame_size"] - << " exceeds the maximum possible on this stream. Using " - << mtu; - tx_hints["send_frame_size"] = std::to_string(mtu); - } - - const size_t fifo_size = blk_ctrl->get_fifo_size(block_port); - // Allocate sid and create transport - uhd::sid_t stream_address = blk_ctrl->get_address(block_port); - UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string(); - both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); - both_xports_t async_xport = - make_transport(stream_address, ASYNC_MSG, device_addr_t("")); - UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec; - - // Configure flow control - // This disables the FC module's output, do this before configuring flow control - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); - // Configure flow control on downstream block - const size_t pkt_size = xport.send->get_send_frame_size(); - const size_t fc_window = - std::min(tx_hints.cast<size_t>("send_buff_size", fifo_size), fifo_size); - const size_t fc_handle_window = - std::max<size_t>(pkt_size, fc_window / stream_options.tx_fc_response_freq); - UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window - << ", Flow Control Handler Window = " << fc_handle_window - << ", FIFO size = " << fifo_size; - blk_ctrl->configure_flow_control_in(fc_handle_window, /*bytes*/ - block_port); - // Add flow control transport - boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); - if (xport.endianness == ENDIANNESS_BIG) { - fc_cache->to_host = uhd::ntohx<uint32_t>; - fc_cache->from_host = uhd::htonx<uint32_t>; - fc_cache->pack = vrt::chdr::if_hdr_pack_be; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; - } else { - fc_cache->to_host = uhd::wtohx<uint32_t>; - fc_cache->from_host = uhd::htowx<uint32_t>; - fc_cache->pack = vrt::chdr::if_hdr_pack_le; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; - } - xport.send = zero_copy_flow_ctrl::make(xport.send, - [fc_cache, xport](managed_buffer::sptr buff) { - return tx_flow_ctrl(fc_cache, xport.recv, buff); - }, - 0); - - // Configure return path for async messages - blk_ctrl->sr_write( - uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); - UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " - << boost::format("0x%04X") % xport.recv_sid.get_dst(); - - // FIXME: Once there is a better way to map the radio block and port - // to the channel or another way to receive asynchronous messages that - // is not in-band, this should be removed. - if (args.args.has_key("radio_id") and args.args.has_key("radio_port")) { - // Find downstream radio node and set the response SID to the host - uhd::rfnoc::block_id_t radio_id(args.args["radio_id"]); - size_t radio_port = args.args.cast<size_t>("radio_port", 0); - std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl>> - downstream_radio_nodes = - blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); - UHD_TX_STREAMER_LOG() - << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); - for (const boost::shared_ptr<uhd::rfnoc::radio_ctrl>& node : - downstream_radio_nodes) { - if (node->get_block_id() == radio_id) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, - async_xport.recv_sid.get_dst(), - radio_port); - } - } - } else { - // FIXME: This block is preserved for legacy behavior where the - // radio_id and radio_port are not provided. It fails if more - // than one radio is visible downstream or the port on the radio - // is not the same as the block_port. It should be removed as - // soon as possible. - // Find all downstream radio nodes and set their response SID to the host - std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl>> - downstream_radio_nodes = - blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); - UHD_TX_STREAMER_LOG() - << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); - for (const boost::shared_ptr<uhd::rfnoc::radio_ctrl>& node : - downstream_radio_nodes) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, - async_xport.recv_sid.get_dst(), - block_port); - } - } - - // Second, configure the streamer now that the blocks and transports are - // configured - - // make the new streamer given the samples per packet - if (not my_streamer) { - // To calculate the max number of samples per packet, we assume the maximum - // header length to avoid fragmentation should the entire header be used. - const size_t bpp = - tx_hints.cast<size_t>("bpp", pkt_size) - - stream_options.tx_max_len_hdr; - const size_t bpi = - convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast<size_t>("spp", bpp / bpi), - bpp / bpi); // samples per packet - UHD_TX_STREAMER_LOG() - << "bpp == " << bpp - << ", bpi == " << bpi - << ", spp == " << spp; - - my_streamer = boost::make_shared<device3_send_packet_streamer>( - spp, send_terminator, xport, async_xport); - my_streamer->resize(chan_list.size()); - } - - // init some streamer stuff - std::string conv_endianness; - if (xport.endianness == ENDIANNESS_BIG) { - my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); - conv_endianness = "be"; - } else { - my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); - conv_endianness = "le"; - } - - // set the converter - uhd::convert::id_type id; - id.input_format = args.cpu_format; - id.num_inputs = 1; - id.output_format = args.otw_format + "_item32_" + conv_endianness; - id.num_outputs = 1; - my_streamer->set_converter(id); - - boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); - async_tx_info->stream_channel = args.channels[stream_i]; - async_tx_info->device_channel = mb_index; - async_tx_info->async_queue = async_md; - async_tx_info->old_async_queue = _async_md; - - task::sptr async_task = - task::make([async_tx_info, async_xport, xport, send_terminator]() { - handle_tx_async_msgs(async_tx_info, - async_xport.recv, - xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> - : uhd::wtohx<uint32_t>, - xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be - : vrt::chdr::if_hdr_unpack_le, - [send_terminator]() { return send_terminator->get_tick_rate(); }); - }); - my_streamer->add_async_msg_task(async_task); - - // Give the streamer a functor to get the send buffer - my_streamer->set_xport_chan_get_buff(stream_i, - [xport](const double timeout) { return xport.send->get_send_buff(timeout); }); - // Give the streamer a functor handled received async messages - my_streamer->set_async_receiver( - [async_md](uhd::async_metadata_t& md, const double timeout) { - return async_md->pop_with_timed_wait(md, timeout); - }); - my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); - // CHDR does not support trailers - my_streamer->set_enable_trailer(false); - - // Avoid sending FC ACKs if the transport is lossless or the user - // has explictly requested not to send them - if (not(xport.lossless or tx_hints.has_key("send_no_fc_acks"))) { - my_streamer->set_xport_chan_post_send_cb(stream_i, [fc_cache, xport]() { - tx_flow_ctrl_ack(fc_cache, xport.send, xport.send_sid); - }); - } - } - - // Notify all blocks in this chain that they are connected to an active streamer - send_terminator->set_tx_streamer(true, 0); - - // Store a weak pointer to prevent a streamer->device3_impl->streamer circular - // dependency. Note that we store the streamer only once, and use its terminator's ID - // to do so. - _tx_streamers[send_terminator->unique_id()] = - boost::weak_ptr<uhd::tx_streamer>(my_streamer); - - // Sets tick rate, samp rate and scaling on this streamer - // A registered terminator is required to do this. - update_tx_streamers(); - - post_streamer_hooks(TX_DIRECTION); - return my_streamer; -} |