aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/device3/device3_flow_ctrl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/usrp/device3/device3_flow_ctrl.hpp')
-rw-r--r--host/lib/usrp/device3/device3_flow_ctrl.hpp302
1 files changed, 302 insertions, 0 deletions
diff --git a/host/lib/usrp/device3/device3_flow_ctrl.hpp b/host/lib/usrp/device3/device3_flow_ctrl.hpp
new file mode 100644
index 000000000..4a7910f0f
--- /dev/null
+++ b/host/lib/usrp/device3/device3_flow_ctrl.hpp
@@ -0,0 +1,302 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_DEVICE3_FLOW_CTRL_HPP
+#define INCLUDED_DEVICE3_FLOW_CTRL_HPP
+
+#include "device3_impl.hpp"
+#include <uhd/utils/log.hpp>
+#include <uhd/types/sid.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <uhd/transport/vrt_if_packet.hpp>
+#include <boost/shared_ptr.hpp>
+
+//! Stores the state of RX flow control
+struct rx_fc_cache_t
+{
+ rx_fc_cache_t():
+ interval(0),
+ last_byte_count(0),
+ total_bytes_consumed(0),
+ total_packets_consumed(0),
+ seq_num(0) {}
+
+ //! Flow control interval in bytes
+ size_t interval;
+ //! Byte count at last flow control packet
+ uint32_t last_byte_count;
+ //! This will wrap around, but that's OK, because math.
+ uint32_t total_bytes_consumed;
+ //! This will wrap around, but that's OK, because math.
+ uint32_t total_packets_consumed;
+ //! Sequence number of next flow control packet
+ uint64_t seq_num;
+ uhd::sid_t sid;
+ uhd::transport::zero_copy_if::sptr xport;
+ std::function<uint32_t(uint32_t)> to_host;
+ std::function<uint32_t(uint32_t)> from_host;
+ std::function<void(const uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> unpack;
+ std::function<void(uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> pack;
+};
+
+/*! Send out RX flow control packets.
+*
+* This function handles updating the counters for the consumed
+* bytes and packets, determines if a flow control message is
+* is necessary, and sends one if it is. Passing a nullptr for
+* the buff parameter will skip the counter update.
+*
+* \param fc_cache RX flow control state information
+* \param buff Receive buffer. Setting to nullptr will
+* skip the counter update.
+*/
+inline bool rx_flow_ctrl(
+ boost::shared_ptr<rx_fc_cache_t> fc_cache,
+ uhd::transport::managed_buffer::sptr buff
+) {
+ // If the caller supplied a buffer
+ if (buff)
+ {
+ // Unpack the header
+ uhd::transport::vrt::if_packet_info_t packet_info;
+ packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
+ const uint32_t *pkt = buff->cast<const uint32_t *>();
+ try {
+ fc_cache->unpack(pkt, packet_info);
+ }
+ catch(const std::exception &ex)
+ {
+ // Log and ignore
+ UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl;
+ return true;
+ }
+
+ // Update counters assuming the buffer is a consumed packet
+ if (not packet_info.error)
+ {
+ fc_cache->total_bytes_consumed += buff->size();
+ fc_cache->total_packets_consumed++;
+ }
+ }
+
+ // Just return if there is no need to send a flow control packet
+ if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval)
+ {
+ return true;
+ }
+
+ // Time to send a flow control packet
+ // Get a send buffer
+ uhd::transport::managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0);
+ if (not fc_buff) {
+ throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");
+ }
+ uint32_t *pkt = fc_buff->cast<uint32_t *>();
+
+ //load packet info
+ uhd::transport::vrt::if_packet_info_t packet_info;
+ packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC;
+ packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32;
+ packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t);
+ packet_info.packet_count = fc_cache->seq_num++;
+ packet_info.sob = false;
+ packet_info.eob = false;
+ packet_info.error = false;
+ packet_info.fc_ack = false;
+ packet_info.sid = fc_cache->sid.get();
+ packet_info.has_sid = true;
+ packet_info.has_cid = false;
+ packet_info.has_tsi = false;
+ packet_info.has_tsf = false;
+ packet_info.has_tlr = false;
+
+ // Load Header:
+ fc_cache->pack(pkt, packet_info);
+ // Load Payload: Packet count, and byte count
+ pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] =
+ fc_cache->from_host(fc_cache->total_packets_consumed);
+ pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] =
+ fc_cache->from_host(fc_cache->total_bytes_consumed);
+
+ //send the buffer over the interface
+ fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32));
+
+ //update byte count
+ fc_cache->last_byte_count = fc_cache->total_bytes_consumed;
+
+ return true;
+}
+
+/*! Handle RX flow control ACK packets.
+*
+*/
+inline void handle_rx_flowctrl_ack(
+ boost::shared_ptr<rx_fc_cache_t> fc_cache,
+ const uint32_t *payload
+) {
+ const uint32_t pkt_count = fc_cache->to_host(payload[0]);
+ const uint32_t byte_count = fc_cache->to_host(payload[1]);
+ if (fc_cache->total_bytes_consumed != byte_count)
+ {
+ UHD_LOGGER_DEBUG("device3")
+ << "oh noes: byte_count==" << byte_count
+ << " total_bytes_consumed==" << fc_cache->total_bytes_consumed
+ << std::hex << " sid==" << fc_cache->sid << std::dec
+ << std::endl
+ ;
+ }
+ fc_cache->total_bytes_consumed = byte_count;
+ fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too?
+
+ // This will send a flow control packet if there is a significant discrepancy
+ rx_flow_ctrl(fc_cache, nullptr);
+}
+
+//! Stores the state of TX flow control
+struct tx_fc_cache_t
+{
+ tx_fc_cache_t(uint32_t capacity):
+ last_byte_ack(0),
+ last_seq_ack(0),
+ byte_count(0),
+ pkt_count(0),
+ window_size(capacity),
+ fc_ack_seqnum(0),
+ fc_received(false) {}
+
+ uint32_t last_byte_ack;
+ uint32_t last_seq_ack;
+ uint32_t byte_count;
+ uint32_t pkt_count;
+ uint32_t window_size;
+ uint32_t fc_ack_seqnum;
+ bool fc_received;
+ std::function<uint32_t(uint32_t)> to_host;
+ std::function<uint32_t(uint32_t)> from_host;
+ std::function<void(const uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> unpack;
+ std::function<void(uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> pack;
+};
+
+inline bool tx_flow_ctrl(
+ boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ uhd::transport::zero_copy_if::sptr xport,
+ uhd::transport::managed_buffer::sptr buff
+) {
+ while (true)
+ {
+ // If there is space
+ if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size())
+ {
+ // All is good - packet will be sent
+ fc_cache->byte_count += buff->size();
+ // Round up to nearest word
+ if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE)
+ {
+ fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE);
+ }
+ fc_cache->pkt_count++;
+ return true;
+ }
+
+ // Look for a flow control message to update the space available in the buffer.
+ uhd::transport::managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1);
+ if (buff)
+ {
+ uhd::transport::vrt::if_packet_info_t if_packet_info;
+ if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
+ const uint32_t *packet_buff = buff->cast<const uint32_t *>();
+ try {
+ fc_cache->unpack(packet_buff, if_packet_info);
+ }
+ catch(const std::exception &ex)
+ {
+ UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl;
+ continue;
+ }
+
+ if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC)
+ {
+ UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl;
+ continue;
+ }
+
+ const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32];
+ const uint32_t pkt_count = fc_cache->to_host(payload[0]);
+ const uint32_t byte_count = fc_cache->to_host(payload[1]);
+
+ // update the amount of space
+ fc_cache->last_byte_ack = byte_count;
+ fc_cache->last_seq_ack = pkt_count;
+
+ fc_cache->fc_received = true;
+ }
+ }
+ return false;
+}
+
+inline void tx_flow_ctrl_ack(
+ boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ uhd::transport::zero_copy_if::sptr send_xport,
+ uhd::sid_t send_sid
+) {
+ if (not fc_cache->fc_received)
+ {
+ return;
+ }
+
+ // Time to send a flow control ACK packet
+ // Get a send buffer
+ uhd::transport::managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0);
+ if (not fc_buff) {
+ UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer";
+ return;
+ }
+ uint32_t *pkt = fc_buff->cast<uint32_t *>();
+
+ // Load packet info
+ uhd::transport::vrt::if_packet_info_t packet_info;
+ packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_ACK;
+ packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32;
+ packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t);
+ packet_info.packet_count = fc_cache->fc_ack_seqnum++;
+ packet_info.sob = false;
+ packet_info.eob = true;
+ packet_info.error = false;
+ packet_info.fc_ack = false;
+ packet_info.sid = send_sid.get();
+ packet_info.has_sid = true;
+ packet_info.has_cid = false;
+ packet_info.has_tsi = false;
+ packet_info.has_tsf = false;
+ packet_info.has_tlr = false;
+
+ // Load Header:
+ fc_cache->pack(pkt, packet_info);
+
+ // Update counters to include this packet
+ size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32);
+ fc_cache->byte_count += fc_ack_pkt_size;
+ // Round up to nearest word
+ if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE)
+ {
+ fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE);
+ }
+ fc_cache->pkt_count++;
+
+ // Load Payload: Packet count, and byte count
+ pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] =
+ fc_cache->from_host(fc_cache->pkt_count);
+ pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] =
+ fc_cache->from_host(fc_cache->byte_count);
+
+ // Send the buffer over the interface
+ fc_buff->commit(fc_ack_pkt_size);
+
+ // Reset for next FC
+ fc_cache->fc_received = false;
+}
+
+#endif /* INCLUDED_DEVICE3_FLOW_CTRL_HPP */