diff options
Diffstat (limited to 'host/lib/usrp/device3/device3_io_impl.cpp')
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 295 |
1 files changed, 4 insertions, 291 deletions
diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index d5b0d4f1d..7afa2ace0 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -8,6 +8,7 @@ // Provides streaming-related functions which are used by device3 objects. #include "device3_impl.hpp" +#include "device3_flow_ctrl.hpp" #include <uhd/rfnoc/constants.hpp> #include <uhd/rfnoc/source_block_ctrl_base.hpp> #include <uhd/rfnoc/sink_block_ctrl_base.hpp> @@ -136,34 +137,6 @@ void generate_channel_list( /*********************************************************************** * RX Flow Control Functions **********************************************************************/ -//! Stores the state of RX flow control -struct rx_fc_cache_t -{ - rx_fc_cache_t(): - interval(0), - last_byte_count(0), - total_bytes_consumed(0), - total_packets_consumed(0), - seq_num(0) {} - - //! Flow control interval in bytes - size_t interval; - //! Byte count at last flow control packet - uint32_t last_byte_count; - //! This will wrap around, but that's OK, because math. - uint32_t total_bytes_consumed; - //! This will wrap around, but that's OK, because math. - uint32_t total_packets_consumed; - //! Sequence number of next flow control packet - uint64_t seq_num; - sid_t sid; - zero_copy_if::sptr xport; - std::function<uint32_t(uint32_t)> to_host; - std::function<uint32_t(uint32_t)> from_host; - std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack; - std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack; -}; - /*! Determine the size of the flow control window in number of packets. * * This value depends on three things: @@ -209,271 +182,11 @@ static size_t get_rx_flow_control_window( } -/*! Send out RX flow control packets. - * - * This function handles updating the counters for the consumed - * bytes and packets, determines if a flow control message is - * is necessary, and sends one if it is. Passing a nullptr for - * the buff parameter will skip the counter update. - * - * \param fc_cache RX flow control state information - * \param buff Receive buffer. Setting to nullptr will - * skip the counter update. - */ -static bool rx_flow_ctrl( - boost::shared_ptr<rx_fc_cache_t> fc_cache, - managed_buffer::sptr buff -) { - // If the caller supplied a buffer - if (buff) - { - // Unpack the header - vrt::if_packet_info_t packet_info; - packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *pkt = buff->cast<const uint32_t *>(); - try { - fc_cache->unpack(pkt, packet_info); - } - catch(const std::exception &ex) - { - // Log and ignore - UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; - return true; - } - - // Update counters assuming the buffer is a consumed packet - if (not packet_info.error) - { - fc_cache->total_bytes_consumed += buff->size(); - fc_cache->total_packets_consumed++; - } - } - - // Just return if there is no need to send a flow control packet - if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) - { - return true; - } - - // Time to send a flow control packet - // Get a send buffer - managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); - if (not fc_buff) { - throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); - } - uint32_t *pkt = fc_buff->cast<uint32_t *>(); - - //load packet info - vrt::if_packet_info_t packet_info; - packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC; - packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = fc_cache->seq_num++; - packet_info.sob = false; - packet_info.eob = false; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = fc_cache->sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - // Load Header: - fc_cache->pack(pkt, packet_info); - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->total_packets_consumed); - pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->total_bytes_consumed); - - //send the buffer over the interface - fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); - - //update byte count - fc_cache->last_byte_count = fc_cache->total_bytes_consumed; - - return true; -} - -/*! Handle RX flow control ACK packets. - * - */ -static void handle_rx_flowctrl_ack( - boost::shared_ptr<rx_fc_cache_t> fc_cache, - const uint32_t *payload -) { - const uint32_t pkt_count = fc_cache->to_host(payload[0]); - const uint32_t byte_count = fc_cache->to_host(payload[1]); - if (fc_cache->total_bytes_consumed != byte_count) - { - UHD_LOGGER_DEBUG("device3") - << "oh noes: byte_count==" << byte_count - << " total_bytes_consumed==" << fc_cache->total_bytes_consumed - << std::hex << " sid==" << fc_cache->sid << std::dec - << std::endl - ; - } - fc_cache->total_bytes_consumed = byte_count; - fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? - - // This will send a flow control packet if there is a significant discrepancy - rx_flow_ctrl(fc_cache, nullptr); -} - /*********************************************************************** - * TX Flow Control Functions + * TX Async Message Functions **********************************************************************/ #define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 -//! Stores the state of TX flow control -struct tx_fc_cache_t -{ - tx_fc_cache_t(uint32_t capacity): - last_byte_ack(0), - last_seq_ack(0), - byte_count(0), - pkt_count(0), - window_size(capacity), - fc_ack_seqnum(0), - fc_received(false) {} - - uint32_t last_byte_ack; - uint32_t last_seq_ack; - uint32_t byte_count; - uint32_t pkt_count; - uint32_t window_size; - uint32_t fc_ack_seqnum; - bool fc_received; - std::function<uint32_t(uint32_t)> to_host; - std::function<uint32_t(uint32_t)> from_host; - std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack; - std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack; -}; - -static bool tx_flow_ctrl( - boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr xport, - managed_buffer::sptr buff -) { - while (true) - { - // If there is space - if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) - { - // All is good - packet will be sent - fc_cache->byte_count += buff->size(); - // Round up to nearest word - if (fc_cache->byte_count % DEVICE3_LINE_SIZE) - { - fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); - } - fc_cache->pkt_count++; - return true; - } - - // Look for a flow control message to update the space available in the buffer. - managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); - if (buff) - { - vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *packet_buff = buff->cast<const uint32_t *>(); - try { - fc_cache->unpack(packet_buff, if_packet_info); - } - catch(const std::exception &ex) - { - UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; - continue; - } - - if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC) - { - UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; - continue; - } - - const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; - const uint32_t pkt_count = fc_cache->to_host(payload[0]); - const uint32_t byte_count = fc_cache->to_host(payload[1]); - - // update the amount of space - fc_cache->last_byte_ack = byte_count; - fc_cache->last_seq_ack = pkt_count; - - fc_cache->fc_received = true; - } - } - return false; -} - -static void tx_flow_ctrl_ack( - boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr send_xport, - sid_t send_sid -) { - if (not fc_cache->fc_received) - { - return; - } - - // Time to send a flow control ACK packet - // Get a send buffer - managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); - if (not fc_buff) { - UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; - return; - } - uint32_t *pkt = fc_buff->cast<uint32_t *>(); - - // Load packet info - vrt::if_packet_info_t packet_info; - packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK; - packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = fc_cache->fc_ack_seqnum++; - packet_info.sob = false; - packet_info.eob = true; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = send_sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - // Load Header: - fc_cache->pack(pkt, packet_info); - - // Update counters to include this packet - size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); - fc_cache->byte_count += fc_ack_pkt_size; - // Round up to nearest word - if (fc_cache->byte_count % DEVICE3_LINE_SIZE) - { - fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); - } - fc_cache->pkt_count++; - - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->pkt_count); - pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - fc_cache->from_host(fc_cache->byte_count); - - // Send the buffer over the interface - fc_buff->commit(fc_ack_pkt_size); - - // Reset for next FC - fc_cache->fc_received = false; -} - -/*********************************************************************** - * TX Async Message Functions - **********************************************************************/ struct async_tx_info_t { size_t stream_channel; @@ -532,7 +245,7 @@ static void handle_tx_async_msgs( async_info->stream_channel ); - // Filter out any flow control messages and cache the rest + // Filter out any flow control messages and cache the rest if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; @@ -1039,7 +752,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) xport.send, xport.send_sid ); - } + } ); } |