diff options
-rw-r--r-- | host/lib/usrp/device3/device3_flow_ctrl.hpp | 302 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 295 |
2 files changed, 306 insertions, 291 deletions
diff --git a/host/lib/usrp/device3/device3_flow_ctrl.hpp b/host/lib/usrp/device3/device3_flow_ctrl.hpp new file mode 100644 index 000000000..4a7910f0f --- /dev/null +++ b/host/lib/usrp/device3/device3_flow_ctrl.hpp @@ -0,0 +1,302 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_DEVICE3_FLOW_CTRL_HPP +#define INCLUDED_DEVICE3_FLOW_CTRL_HPP + +#include "device3_impl.hpp" +#include <uhd/utils/log.hpp> +#include <uhd/types/sid.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <boost/shared_ptr.hpp> + +//! Stores the state of RX flow control +struct rx_fc_cache_t +{ + rx_fc_cache_t(): + interval(0), + last_byte_count(0), + total_bytes_consumed(0), + total_packets_consumed(0), + seq_num(0) {} + + //! Flow control interval in bytes + size_t interval; + //! Byte count at last flow control packet + uint32_t last_byte_count; + //! This will wrap around, but that's OK, because math. + uint32_t total_bytes_consumed; + //! This will wrap around, but that's OK, because math. + uint32_t total_packets_consumed; + //! Sequence number of next flow control packet + uint64_t seq_num; + uhd::sid_t sid; + uhd::transport::zero_copy_if::sptr xport; + std::function<uint32_t(uint32_t)> to_host; + std::function<uint32_t(uint32_t)> from_host; + std::function<void(const uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> unpack; + std::function<void(uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> pack; +}; + +/*! Send out RX flow control packets. +* +* This function handles updating the counters for the consumed +* bytes and packets, determines if a flow control message is +* is necessary, and sends one if it is. Passing a nullptr for +* the buff parameter will skip the counter update. +* +* \param fc_cache RX flow control state information +* \param buff Receive buffer. Setting to nullptr will +* skip the counter update. +*/ +inline bool rx_flow_ctrl( + boost::shared_ptr<rx_fc_cache_t> fc_cache, + uhd::transport::managed_buffer::sptr buff +) { + // If the caller supplied a buffer + if (buff) + { + // Unpack the header + uhd::transport::vrt::if_packet_info_t packet_info; + packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); + const uint32_t *pkt = buff->cast<const uint32_t *>(); + try { + fc_cache->unpack(pkt, packet_info); + } + catch(const std::exception &ex) + { + // Log and ignore + UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; + return true; + } + + // Update counters assuming the buffer is a consumed packet + if (not packet_info.error) + { + fc_cache->total_bytes_consumed += buff->size(); + fc_cache->total_packets_consumed++; + } + } + + // Just return if there is no need to send a flow control packet + if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) + { + return true; + } + + // Time to send a flow control packet + // Get a send buffer + uhd::transport::managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); + if (not fc_buff) { + throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); + } + uint32_t *pkt = fc_buff->cast<uint32_t *>(); + + //load packet info + uhd::transport::vrt::if_packet_info_t packet_info; + packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC; + packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; + packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); + packet_info.packet_count = fc_cache->seq_num++; + packet_info.sob = false; + packet_info.eob = false; + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = fc_cache->sid.get(); + packet_info.has_sid = true; + packet_info.has_cid = false; + packet_info.has_tsi = false; + packet_info.has_tsf = false; + packet_info.has_tlr = false; + + // Load Header: + fc_cache->pack(pkt, packet_info); + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = + fc_cache->from_host(fc_cache->total_packets_consumed); + pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = + fc_cache->from_host(fc_cache->total_bytes_consumed); + + //send the buffer over the interface + fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + + //update byte count + fc_cache->last_byte_count = fc_cache->total_bytes_consumed; + + return true; +} + +/*! Handle RX flow control ACK packets. +* +*/ +inline void handle_rx_flowctrl_ack( + boost::shared_ptr<rx_fc_cache_t> fc_cache, + const uint32_t *payload +) { + const uint32_t pkt_count = fc_cache->to_host(payload[0]); + const uint32_t byte_count = fc_cache->to_host(payload[1]); + if (fc_cache->total_bytes_consumed != byte_count) + { + UHD_LOGGER_DEBUG("device3") + << "oh noes: byte_count==" << byte_count + << " total_bytes_consumed==" << fc_cache->total_bytes_consumed + << std::hex << " sid==" << fc_cache->sid << std::dec + << std::endl + ; + } + fc_cache->total_bytes_consumed = byte_count; + fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? + + // This will send a flow control packet if there is a significant discrepancy + rx_flow_ctrl(fc_cache, nullptr); +} + +//! Stores the state of TX flow control +struct tx_fc_cache_t +{ + tx_fc_cache_t(uint32_t capacity): + last_byte_ack(0), + last_seq_ack(0), + byte_count(0), + pkt_count(0), + window_size(capacity), + fc_ack_seqnum(0), + fc_received(false) {} + + uint32_t last_byte_ack; + uint32_t last_seq_ack; + uint32_t byte_count; + uint32_t pkt_count; + uint32_t window_size; + uint32_t fc_ack_seqnum; + bool fc_received; + std::function<uint32_t(uint32_t)> to_host; + std::function<uint32_t(uint32_t)> from_host; + std::function<void(const uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> unpack; + std::function<void(uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> pack; +}; + +inline bool tx_flow_ctrl( + boost::shared_ptr<tx_fc_cache_t> fc_cache, + uhd::transport::zero_copy_if::sptr xport, + uhd::transport::managed_buffer::sptr buff +) { + while (true) + { + // If there is space + if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) + { + // All is good - packet will be sent + fc_cache->byte_count += buff->size(); + // Round up to nearest word + if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) + { + fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); + } + fc_cache->pkt_count++; + return true; + } + + // Look for a flow control message to update the space available in the buffer. + uhd::transport::managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); + if (buff) + { + uhd::transport::vrt::if_packet_info_t if_packet_info; + if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); + const uint32_t *packet_buff = buff->cast<const uint32_t *>(); + try { + fc_cache->unpack(packet_buff, if_packet_info); + } + catch(const std::exception &ex) + { + UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; + continue; + } + + if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC) + { + UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; + continue; + } + + const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; + const uint32_t pkt_count = fc_cache->to_host(payload[0]); + const uint32_t byte_count = fc_cache->to_host(payload[1]); + + // update the amount of space + fc_cache->last_byte_ack = byte_count; + fc_cache->last_seq_ack = pkt_count; + + fc_cache->fc_received = true; + } + } + return false; +} + +inline void tx_flow_ctrl_ack( + boost::shared_ptr<tx_fc_cache_t> fc_cache, + uhd::transport::zero_copy_if::sptr send_xport, + uhd::sid_t send_sid +) { + if (not fc_cache->fc_received) + { + return; + } + + // Time to send a flow control ACK packet + // Get a send buffer + uhd::transport::managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); + if (not fc_buff) { + UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; + return; + } + uint32_t *pkt = fc_buff->cast<uint32_t *>(); + + // Load packet info + uhd::transport::vrt::if_packet_info_t packet_info; + packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_ACK; + packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; + packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); + packet_info.packet_count = fc_cache->fc_ack_seqnum++; + packet_info.sob = false; + packet_info.eob = true; + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = send_sid.get(); + packet_info.has_sid = true; + packet_info.has_cid = false; + packet_info.has_tsi = false; + packet_info.has_tsf = false; + packet_info.has_tlr = false; + + // Load Header: + fc_cache->pack(pkt, packet_info); + + // Update counters to include this packet + size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); + fc_cache->byte_count += fc_ack_pkt_size; + // Round up to nearest word + if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) + { + fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); + } + fc_cache->pkt_count++; + + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = + fc_cache->from_host(fc_cache->pkt_count); + pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = + fc_cache->from_host(fc_cache->byte_count); + + // Send the buffer over the interface + fc_buff->commit(fc_ack_pkt_size); + + // Reset for next FC + fc_cache->fc_received = false; +} + +#endif /* INCLUDED_DEVICE3_FLOW_CTRL_HPP */ diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index d5b0d4f1d..7afa2ace0 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -8,6 +8,7 @@ // Provides streaming-related functions which are used by device3 objects. #include "device3_impl.hpp" +#include "device3_flow_ctrl.hpp" #include <uhd/rfnoc/constants.hpp> #include <uhd/rfnoc/source_block_ctrl_base.hpp> #include <uhd/rfnoc/sink_block_ctrl_base.hpp> @@ -136,34 +137,6 @@ void generate_channel_list( /*********************************************************************** * RX Flow Control Functions **********************************************************************/ -//! Stores the state of RX flow control -struct rx_fc_cache_t -{ - rx_fc_cache_t(): - interval(0), - last_byte_count(0), - total_bytes_consumed(0), - total_packets_consumed(0), - seq_num(0) {} - - //! Flow control interval in bytes - size_t interval; - //! Byte count at last flow control packet - uint32_t last_byte_count; - //! This will wrap around, but that's OK, because math. - uint32_t total_bytes_consumed; - //! This will wrap around, but that's OK, because math. - uint32_t total_packets_consumed; - //! Sequence number of next flow control packet - uint64_t seq_num; - sid_t sid; - zero_copy_if::sptr xport; - std::function<uint32_t(uint32_t)> to_host; - std::function<uint32_t(uint32_t)> from_host; - std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack; - std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack; -}; - /*! Determine the size of the flow control window in number of packets. * * This value depends on three things: @@ -209,271 +182,11 @@ static size_t get_rx_flow_control_window( } -/*! Send out RX flow control packets. - * - * This function handles updating the counters for the consumed - * bytes and packets, determines if a flow control message is - * is necessary, and sends one if it is. Passing a nullptr for - * the buff parameter will skip the counter update. - * - * \param fc_cache RX flow control state information - * \param buff Receive buffer. Setting to nullptr will - * skip the counter update. - */ -static bool rx_flow_ctrl( - boost::shared_ptr<rx_fc_cache_t> fc_cache, - managed_buffer::sptr buff -) { - // If the caller supplied a buffer - if (buff) - { - // Unpack the header - vrt::if_packet_info_t packet_info; - packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *pkt = buff->cast<const uint32_t *>(); - try { - fc_cache->unpack(pkt, packet_info); - } - catch(const std::exception &ex) - { - // Log and ignore - UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; - return true; - } - - // Update counters assuming the buffer is a consumed packet - if (not packet_info.error) - { - fc_cache->total_bytes_consumed += buff->size(); - fc_cache->total_packets_consumed++; - } - } - - // Just return if there is no need to send a flow control packet - if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) - { - return true; - } - - // Time to send a flow control packet - // Get a send buffer - managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); - if (not fc_buff) { - throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); - } - uint32_t *pkt = fc_buff->cast<uint32_t *>(); - - //load packet info - vrt::if_packet_info_t packet_info; - packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC; - packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = fc_cache->seq_num++; - packet_info.sob = false; - packet_info.eob = false; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = fc_cache->sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - // Load Header: - fc_cache->pack(pkt, packet_info); - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->total_packets_consumed); - pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->total_bytes_consumed); - - //send the buffer over the interface - fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); - - //update byte count - fc_cache->last_byte_count = fc_cache->total_bytes_consumed; - - return true; -} - -/*! Handle RX flow control ACK packets. - * - */ -static void handle_rx_flowctrl_ack( - boost::shared_ptr<rx_fc_cache_t> fc_cache, - const uint32_t *payload -) { - const uint32_t pkt_count = fc_cache->to_host(payload[0]); - const uint32_t byte_count = fc_cache->to_host(payload[1]); - if (fc_cache->total_bytes_consumed != byte_count) - { - UHD_LOGGER_DEBUG("device3") - << "oh noes: byte_count==" << byte_count - << " total_bytes_consumed==" << fc_cache->total_bytes_consumed - << std::hex << " sid==" << fc_cache->sid << std::dec - << std::endl - ; - } - fc_cache->total_bytes_consumed = byte_count; - fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? - - // This will send a flow control packet if there is a significant discrepancy - rx_flow_ctrl(fc_cache, nullptr); -} - /*********************************************************************** - * TX Flow Control Functions + * TX Async Message Functions **********************************************************************/ #define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 -//! Stores the state of TX flow control -struct tx_fc_cache_t -{ - tx_fc_cache_t(uint32_t capacity): - last_byte_ack(0), - last_seq_ack(0), - byte_count(0), - pkt_count(0), - window_size(capacity), - fc_ack_seqnum(0), - fc_received(false) {} - - uint32_t last_byte_ack; - uint32_t last_seq_ack; - uint32_t byte_count; - uint32_t pkt_count; - uint32_t window_size; - uint32_t fc_ack_seqnum; - bool fc_received; - std::function<uint32_t(uint32_t)> to_host; - std::function<uint32_t(uint32_t)> from_host; - std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack; - std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack; -}; - -static bool tx_flow_ctrl( - boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr xport, - managed_buffer::sptr buff -) { - while (true) - { - // If there is space - if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) - { - // All is good - packet will be sent - fc_cache->byte_count += buff->size(); - // Round up to nearest word - if (fc_cache->byte_count % DEVICE3_LINE_SIZE) - { - fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); - } - fc_cache->pkt_count++; - return true; - } - - // Look for a flow control message to update the space available in the buffer. - managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); - if (buff) - { - vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *packet_buff = buff->cast<const uint32_t *>(); - try { - fc_cache->unpack(packet_buff, if_packet_info); - } - catch(const std::exception &ex) - { - UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; - continue; - } - - if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC) - { - UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; - continue; - } - - const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; - const uint32_t pkt_count = fc_cache->to_host(payload[0]); - const uint32_t byte_count = fc_cache->to_host(payload[1]); - - // update the amount of space - fc_cache->last_byte_ack = byte_count; - fc_cache->last_seq_ack = pkt_count; - - fc_cache->fc_received = true; - } - } - return false; -} - -static void tx_flow_ctrl_ack( - boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr send_xport, - sid_t send_sid -) { - if (not fc_cache->fc_received) - { - return; - } - - // Time to send a flow control ACK packet - // Get a send buffer - managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); - if (not fc_buff) { - UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; - return; - } - uint32_t *pkt = fc_buff->cast<uint32_t *>(); - - // Load packet info - vrt::if_packet_info_t packet_info; - packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK; - packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = fc_cache->fc_ack_seqnum++; - packet_info.sob = false; - packet_info.eob = true; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = send_sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - // Load Header: - fc_cache->pack(pkt, packet_info); - - // Update counters to include this packet - size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); - fc_cache->byte_count += fc_ack_pkt_size; - // Round up to nearest word - if (fc_cache->byte_count % DEVICE3_LINE_SIZE) - { - fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); - } - fc_cache->pkt_count++; - - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->pkt_count); - pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->byte_count); - - // Send the buffer over the interface - fc_buff->commit(fc_ack_pkt_size); - - // Reset for next FC - fc_cache->fc_received = false; -} - -/*********************************************************************** - * TX Async Message Functions - **********************************************************************/ struct async_tx_info_t { size_t stream_channel; @@ -532,7 +245,7 @@ static void handle_tx_async_msgs( async_info->stream_channel ); - // Filter out any flow control messages and cache the rest + // Filter out any flow control messages and cache the rest if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; @@ -1039,7 +752,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) xport.send, xport.send_sid ); - } + } ); } |