aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
Diffstat (limited to 'host')
-rw-r--r--host/include/uhd/rfnoc/constants.hpp10
-rw-r--r--host/include/uhd/rfnoc/graph.hpp4
-rw-r--r--host/include/uhd/rfnoc/sink_block_ctrl_base.hpp13
-rw-r--r--host/include/uhd/rfnoc/source_block_ctrl_base.hpp16
-rw-r--r--host/include/uhd/transport/CMakeLists.txt1
-rw-r--r--host/include/uhd/transport/vrt_if_packet.hpp4
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph_impl.hpp2
-rw-r--r--host/lib/rfnoc/ctrl_iface.cpp1
-rw-r--r--host/lib/rfnoc/graph_impl.cpp31
-rw-r--r--host/lib/rfnoc/legacy_compat.cpp4
-rw-r--r--host/lib/rfnoc/sink_block_ctrl_base.cpp19
-rw-r--r--host/lib/rfnoc/source_block_ctrl_base.cpp33
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/chdr.cpp7
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp143
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp41
-rw-r--r--host/lib/transport/zero_copy_flow_ctrl.cpp5
-rw-r--r--host/lib/usrp/device3/device3_impl.hpp81
-rw-r--r--host/lib/usrp/device3/device3_io_impl.cpp686
-rw-r--r--host/lib/usrp/x300/x300_impl.cpp4
-rw-r--r--host/lib/usrp/x300/x300_io_impl.cpp14
21 files changed, 657 insertions, 463 deletions
diff --git a/host/include/uhd/rfnoc/constants.hpp b/host/include/uhd/rfnoc/constants.hpp
index 6df4c535f..bda4c6440 100644
--- a/host/include/uhd/rfnoc/constants.hpp
+++ b/host/include/uhd/rfnoc/constants.hpp
@@ -25,7 +25,7 @@ static const std::string XML_PATH_ENV = "UHD_RFNOC_DIR";
//! If the block name can't be automatically detected, this name is used
static const std::string DEFAULT_BLOCK_NAME = "Block";
static const uint64_t DEFAULT_NOC_ID = 0xFFFFFFFFFFFFFFFF;
-static const size_t NOC_SHELL_COMPAT_MAJOR = 3;
+static const size_t NOC_SHELL_COMPAT_MAJOR = 4;
static const size_t NOC_SHELL_COMPAT_MINOR = 0;
static const size_t MAX_PACKET_SIZE = 8000; // bytes
@@ -35,7 +35,7 @@ static const size_t DEFAULT_PACKET_SIZE = 1456; // bytes
static const size_t BYTES_PER_LINE = 8;
//! For flow control within a single crossbar
-static const size_t DEFAULT_FC_XBAR_PKTS_PER_ACK = 2;
+static const size_t DEFAULT_FC_XBAR_RESPONSE_FREQ = 8;
//! For flow control when data is flowing from device to host (rx)
static const size_t DEFAULT_FC_RX_RESPONSE_FREQ = 64; // ACKs per flow control window
//! For flow control when data is flowing from host to device (tx)
@@ -48,15 +48,15 @@ static const size_t DEFAULT_FC_TX_RESPONSE_FREQ = 8; // ACKs per flow control wi
static const double DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR = 0.80;
// Common settings registers.
-static const uint32_t SR_FLOW_CTRL_CYCS_PER_ACK = 0;
-static const uint32_t SR_FLOW_CTRL_PKTS_PER_ACK = 1;
+static const uint32_t SR_FLOW_CTRL_BYTES_PER_ACK = 1;
static const uint32_t SR_FLOW_CTRL_WINDOW_SIZE = 2;
-static const uint32_t SR_FLOW_CTRL_WINDOW_EN = 3;
+static const uint32_t SR_FLOW_CTRL_EN = 3;
static const uint32_t SR_ERROR_POLICY = 4;
static const uint32_t SR_BLOCK_SID = 5; // TODO rename to SRC_SID
static const uint32_t SR_NEXT_DST_SID = 6;
static const uint32_t SR_RESP_IN_DST_SID = 7;
static const uint32_t SR_RESP_OUT_DST_SID = 8;
+static const uint32_t SR_FLOW_CTRL_PKT_LIMIT = 9;
static const uint32_t SR_READBACK_ADDR = 124;
static const uint32_t SR_READBACK = 127;
diff --git a/host/include/uhd/rfnoc/graph.hpp b/host/include/uhd/rfnoc/graph.hpp
index f92e6b528..d7b7cc43e 100644
--- a/host/include/uhd/rfnoc/graph.hpp
+++ b/host/include/uhd/rfnoc/graph.hpp
@@ -67,12 +67,12 @@ public:
*
* \param sink_block Sink block ID
* \param dst_block_port Destination (sink) block port
- * \param pkts_per_ack Flow controlf frequency in packets
+ * \param bytes_per_ack Flow control frequency in bytes
*/
virtual void connect_sink(
const block_id_t &sink_block,
const size_t dst_block_port,
- const size_t pkts_per_ack
+ const size_t bytes_per_ack
) = 0;
virtual std::string get_name() const = 0;
diff --git a/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp b/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp
index ec3f28e32..f90361cf1 100644
--- a/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp
+++ b/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp
@@ -70,20 +70,17 @@ public:
* send out ACKs, telling the upstream block which packets have been consumed,
* so the upstream block can increase his flow control credit.
*
- * In the default implementation, this just sets registers
- * SR_FLOW_CTRL_CYCS_PER_ACK and SR_FLOW_CTRL_PKTS_PER_ACK accordingly.
+ * In the default implementation, this just sets register SR_FLOW_CTRL_PKTS_PER_ACK
+ * accordingly.
*
* Override this function if your block has port-specific flow control settings.
*
- * \param cycles Send an ACK after this many clock cycles.
- * Setting this to zero disables this type of flow control acknowledgement.
- * \param packets Send an ACK after this many packets have been consumed.
- * Setting this to zero disables this type of flow control acknowledgement.
+ * \param bytes Send an ACK after this many bytes have been consumed.
+ * Setting this to zero disables flow control acknowledgement.
* \param block_port Set up flow control for a stream coming in on this particular block port.
*/
virtual void configure_flow_control_in(
- size_t cycles,
- size_t packets,
+ size_t bytes,
size_t block_port=0
);
diff --git a/host/include/uhd/rfnoc/source_block_ctrl_base.hpp b/host/include/uhd/rfnoc/source_block_ctrl_base.hpp
index 7d0a65107..39e7411f3 100644
--- a/host/include/uhd/rfnoc/source_block_ctrl_base.hpp
+++ b/host/include/uhd/rfnoc/source_block_ctrl_base.hpp
@@ -97,15 +97,23 @@ public:
*
* Override this function if your block has port-specific flow control settings.
*
- * \param buf_size_pkts The size of the downstream block's input FIFO size in number of packets. Setting
- * this to zero disables flow control. The block will then produce data as fast as it can.
- * \b Warning: This can cause head-of-line blocking, and potentially lock up your device!
+ * \param enable_output Enable flow control module's output. If disabled, no packets will be output
+ * regardless of flow control state.
+ * \param buf_size_bytes The size of the downstream block's input FIFO size in number of bytes. Setting
+ * this to zero disables byte based flow control. If both byte based flow control and
+ * the packet limit are set to zero, the block will then produce data as fast as it can.
+ * \b Warning: This can cause head-of-line blocking, and potentially lock up your device!
+ * \param pkt_limit Limit the maximum number of packets in flight. Setting this to zero disables packet limiting.
+ * Usually kept disabled except for special case connections (such as DMA) that support only
+ * a finite number of packets in flight.
* \param block_port Specify on which outgoing port this setting is valid.
* \param sid The SID for which this is valid. This is meant for cases where the outgoing block port is
* not sufficient to set the flow control, and as such is rarely used.
*/
virtual void configure_flow_control_out(
- size_t buf_size_pkts,
+ bool enable_output,
+ size_t buf_size_bytes,
+ size_t pkt_limit=0,
size_t block_port=0,
const uhd::sid_t &sid=uhd::sid_t()
);
diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt
index 3ce06b5b1..785e2d53d 100644
--- a/host/include/uhd/transport/CMakeLists.txt
+++ b/host/include/uhd/transport/CMakeLists.txt
@@ -20,6 +20,7 @@ UHD_INSTALL(FILES
usb_device_handle.hpp
vrt_if_packet.hpp
zero_copy.hpp
+ zero_copy_flow_ctrl.hpp
DESTINATION ${INCLUDE_DIR}/uhd/transport
COMPONENT headers
)
diff --git a/host/include/uhd/transport/vrt_if_packet.hpp b/host/include/uhd/transport/vrt_if_packet.hpp
index 579ac77d2..07792f13f 100644
--- a/host/include/uhd/transport/vrt_if_packet.hpp
+++ b/host/include/uhd/transport/vrt_if_packet.hpp
@@ -68,6 +68,8 @@ namespace vrt{
bool sob, eob;
//! This is asserted for command responses that are errors (CHDR only)
bool error;
+ //! This is asserted for flow control packets are ACKS (CHDR only)
+ bool fc_ack;
//optional fields
//! Stream ID (SID). See uhd::sid_t
@@ -185,7 +187,7 @@ namespace vrt{
num_packet_words32(0),
packet_count(0),
sob(false), eob(false),
- error(false),
+ error(false), fc_ack(false),
has_sid(false), sid(0),
has_cid(false), cid(0),
has_tsi(false), tsi(0),
diff --git a/host/lib/include/uhdlib/rfnoc/graph_impl.hpp b/host/lib/include/uhdlib/rfnoc/graph_impl.hpp
index 182befbf4..404369618 100644
--- a/host/lib/include/uhdlib/rfnoc/graph_impl.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph_impl.hpp
@@ -55,7 +55,7 @@ public:
void connect_sink(
const block_id_t &sink_block,
const size_t dst_block_port,
- const size_t pkts_per_ack
+ const size_t bytes_per_ack
);
/************************************************************************
diff --git a/host/lib/rfnoc/ctrl_iface.cpp b/host/lib/rfnoc/ctrl_iface.cpp
index 11dfa7aaa..29e18fc3a 100644
--- a/host/lib/rfnoc/ctrl_iface.cpp
+++ b/host/lib/rfnoc/ctrl_iface.cpp
@@ -108,6 +108,7 @@ private:
packet_info.tsf = timestamp;
packet_info.sob = false;
packet_info.eob = false;
+ packet_info.fc_ack = false;
packet_info.sid = _xports.send_sid;
packet_info.has_sid = true;
packet_info.has_cid = false;
diff --git a/host/lib/rfnoc/graph_impl.cpp b/host/lib/rfnoc/graph_impl.cpp
index c361ea8f2..a2e0e64f4 100644
--- a/host/lib/rfnoc/graph_impl.cpp
+++ b/host/lib/rfnoc/graph_impl.cpp
@@ -122,9 +122,9 @@ void graph_impl::connect(
UHD_LOGGER_WARNING("RFNOC") << "Assuming max packet size for " << src->get_block_id() ;
pkt_size = uhd::rfnoc::MAX_PACKET_SIZE;
}
- // FC window (in packets) depends on FIFO size... ...and packet size.
- size_t buf_size_pkts = dst->get_fifo_size(dst_block_port) / pkt_size;
- if (buf_size_pkts == 0) {
+ // FC window (in bytes) depends on FIFO size.
+ size_t buf_size_bytes = dst->get_fifo_size(dst_block_port);
+ if (buf_size_bytes < pkt_size) {
throw uhd::runtime_error(str(
boost::format("Input FIFO for block %s is too small (%d kiB) for packets of size %d kiB\n"
"coming from block %s.")
@@ -132,19 +132,20 @@ void graph_impl::connect(
% (pkt_size / 1024) % src->get_block_id().get()
));
}
- src->configure_flow_control_out(buf_size_pkts, src_block_port);
- // On the same crossbar, use lots of FC packets
- size_t pkts_per_ack = std::min(
- uhd::rfnoc::DEFAULT_FC_XBAR_PKTS_PER_ACK,
- buf_size_pkts - 1
+ src->configure_flow_control_out(
+ true, /* enable output */
+ buf_size_bytes,
+ 0, /* no packet limit. We need to revisit this at some point. */
+ src_block_port
);
+ // On the same crossbar, use lots of FC packets
+ size_t bytes_per_response = std::ceil<size_t>(buf_size_bytes / uhd::rfnoc::DEFAULT_FC_XBAR_RESPONSE_FREQ);
// Over the network, use less or we'd flood the transport
if (sid.get_src_addr() != sid.get_dst_addr()) {
- pkts_per_ack = std::max<size_t>(buf_size_pkts / uhd::rfnoc::DEFAULT_FC_TX_RESPONSE_FREQ, 1);
+ bytes_per_response = std::ceil<size_t>(buf_size_bytes / uhd::rfnoc::DEFAULT_FC_TX_RESPONSE_FREQ);
}
dst->configure_flow_control_in(
- 0, // Default to not use cycles
- pkts_per_ack,
+ bytes_per_response,
dst_block_port
);
@@ -209,7 +210,7 @@ void graph_impl::connect_src(
void graph_impl::connect_sink(
const block_id_t &sink_block,
const size_t dst_block_port,
- const size_t pkts_per_ack
+ const size_t bytes_per_ack
) {
device3::sptr device_ptr = _device_ptr.lock();
if (not device_ptr) {
@@ -222,11 +223,7 @@ void graph_impl::connect_sink(
uhd::rfnoc::sink_block_ctrl_base::sptr dst =
device_ptr->get_block_ctrl<rfnoc::sink_block_ctrl_base>(sink_block);
- dst->configure_flow_control_in(
- 0,
- pkts_per_ack,
- dst_block_port
- );
+ dst->configure_flow_control_in(bytes_per_ack, dst_block_port);
/********************************************************************
* 5. Configure error policy
diff --git a/host/lib/rfnoc/legacy_compat.cpp b/host/lib/rfnoc/legacy_compat.cpp
index 7e9eec20e..c5bd7891a 100644
--- a/host/lib/rfnoc/legacy_compat.cpp
+++ b/host/lib/rfnoc/legacy_compat.cpp
@@ -158,10 +158,10 @@ public:
UHD_LOGGER_WARNING("RFNOC") << "[legacy_compat] No DUCs detected. You will only be able to transmit at the radio frontend rate." ;
}
if (args.has_key("skip_dram")) {
- UHD_LEGACY_LOG() << "[legacy_compat] Skipping DRAM by user request." << std::endl;
+ UHD_LEGACY_LOG() << "[legacy_compat] Skipping DRAM by user request." ;
}
if (args.has_key("skip_sram")) {
- UHD_LEGACY_LOG() << "[legacy_compat] Skipping SRAM by user request." << std::endl;
+ UHD_LEGACY_LOG() << "[legacy_compat] Skipping SRAM by user request.";
}
if (not _has_dmafifo and not _has_sramfifo) {
UHD_LOGGER_WARNING("RFNOC") << "[legacy_compat] No FIFO detected. Higher transmit rates may encounter errors.";
diff --git a/host/lib/rfnoc/sink_block_ctrl_base.cpp b/host/lib/rfnoc/sink_block_ctrl_base.cpp
index b620f917f..1562e134b 100644
--- a/host/lib/rfnoc/sink_block_ctrl_base.cpp
+++ b/host/lib/rfnoc/sink_block_ctrl_base.cpp
@@ -52,22 +52,17 @@ size_t sink_block_ctrl_base::get_fifo_size(size_t block_port) const {
}
void sink_block_ctrl_base::configure_flow_control_in(
- size_t cycles,
- size_t packets,
+ size_t bytes,
size_t block_port
) {
- UHD_RFNOC_BLOCK_TRACE() << boost::format("sink_block_ctrl_base::configure_flow_control_in(cycles=%d, packets=%d)") % cycles % packets ;
- uint32_t cycles_word = 0;
- if (cycles) {
- cycles_word = (1<<31) | cycles;
- }
- sr_write(SR_FLOW_CTRL_CYCS_PER_ACK, cycles_word, block_port);
+ UHD_RFNOC_BLOCK_TRACE() << boost::format("sink_block_ctrl_base::configure_flow_control_in(bytes=%d)") % bytes;
- uint32_t packets_word = 0;
- if (packets) {
- packets_word = (1<<31) | packets;
+ uint32_t bytes_word = 0;
+ if (bytes) {
+ // Bit 32 enables flow control
+ bytes_word = (1<<31) | bytes;
}
- sr_write(SR_FLOW_CTRL_PKTS_PER_ACK, packets_word, block_port);
+ sr_write(SR_FLOW_CTRL_BYTES_PER_ACK, bytes_word, block_port);
}
void sink_block_ctrl_base::set_error_policy(
diff --git a/host/lib/rfnoc/source_block_ctrl_base.cpp b/host/lib/rfnoc/source_block_ctrl_base.cpp
index 5ede899cb..afec6ba1b 100644
--- a/host/lib/rfnoc/source_block_ctrl_base.cpp
+++ b/host/lib/rfnoc/source_block_ctrl_base.cpp
@@ -77,25 +77,27 @@ void source_block_ctrl_base::set_destination(
}
void source_block_ctrl_base::configure_flow_control_out(
- size_t buf_size_pkts,
+ bool enable_fc_output,
+ size_t buf_size_bytes,
+ size_t pkt_limit,
size_t block_port,
UHD_UNUSED(const uhd::sid_t &sid)
) {
- UHD_RFNOC_BLOCK_TRACE() << "source_block_ctrl_base::configure_flow_control_out() buf_size_pkts==" << buf_size_pkts ;
- if (buf_size_pkts < 2) {
+ UHD_RFNOC_BLOCK_TRACE() << "source_block_ctrl_base::configure_flow_control_out() buf_size_bytes==" << buf_size_bytes;
+ if (buf_size_bytes == 0) {
throw uhd::runtime_error(str(
- boost::format("Invalid window size %d for block %s. Window size must at least be 2.")
- % buf_size_pkts % unique_id()
+ boost::format("Invalid window size %d for block %s. Window size cannot be 0 bytes.")
+ % buf_size_bytes % unique_id()
));
}
- //Disable the window and let all upstream data flush out
+ //Disable flow control entirely and let all upstream data flush out
//We need to do this every time the window is changed because
//a) We don't know what state the flow-control module was left in
// in the previous run (it should still be enabled)
//b) Changing the window size where data is buffered upstream may
// result in stale packets entering the stream.
- sr_write(SR_FLOW_CTRL_WINDOW_EN, 0, block_port);
+ sr_write(SR_FLOW_CTRL_EN, 0, block_port);
//Wait for data to flush out.
//In the FPGA we are guaranteed that all buffered packets are more-or-less consecutive.
@@ -107,13 +109,24 @@ void source_block_ctrl_base::configure_flow_control_out(
// module is done flushing.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ //Enable source flow control module and conditionally enable byte based and/or packet count
+ //based flow control
+ const bool enable_byte_fc = (buf_size_bytes != 0);
+ const bool enable_pkt_cnt_fc = (pkt_limit != 0);
+ const size_t config = enable_fc_output + (enable_byte_fc << 1) + (enable_pkt_cnt_fc << 2);
+
//Resize the FC window.
//Precondition: No data can be buffered upstream.
- sr_write(SR_FLOW_CTRL_WINDOW_SIZE, buf_size_pkts, block_port);
+ if (enable_byte_fc) {
+ sr_write(SR_FLOW_CTRL_WINDOW_SIZE, buf_size_bytes, block_port);
+ }
+ if (enable_pkt_cnt_fc) {
+ sr_write(SR_FLOW_CTRL_PKT_LIMIT, pkt_limit, block_port);
+ }
//Enable the FC window.
- //Precondition: The window size must be set.
- sr_write(SR_FLOW_CTRL_WINDOW_EN, (buf_size_pkts != 0), block_port);
+ //Precondition: The window size and/or packet limit must be set.
+ sr_write(SR_FLOW_CTRL_EN, config, block_port);
}
/***********************************************************************
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index ae903d956..15771697a 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -112,6 +112,7 @@ LIBUHD_PYTHON_GEN_SOURCE(
)
LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp
${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp
diff --git a/host/lib/transport/chdr.cpp b/host/lib/transport/chdr.cpp
index 48263f57e..36f380d62 100644
--- a/host/lib/transport/chdr.cpp
+++ b/host/lib/transport/chdr.cpp
@@ -23,6 +23,7 @@ using namespace uhd::transport::vrt;
static const uint32_t HDR_FLAG_TSF = (1 << 29);
static const uint32_t HDR_FLAG_EOB = (1 << 28);
static const uint32_t HDR_FLAG_ERROR = (1 << 28);
+static const uint32_t HDR_FLAG_FCACK = (1 << 28);
/***************************************************************************/
/* Packing */
@@ -45,8 +46,8 @@ UHD_INLINE uint32_t _hdr_pack_chdr(
| (if_packet_info.packet_type << 30)
// 1 Bit: Has time
| (if_packet_info.has_tsf ? HDR_FLAG_TSF : 0)
- // 1 Bit: EOB or Error
- | ((if_packet_info.eob or if_packet_info.error) ? HDR_FLAG_EOB : 0)
+ // 1 Bit: EOB or Error or FC ACK
+ | ((if_packet_info.eob or if_packet_info.error or if_packet_info.fc_ack) ? HDR_FLAG_EOB : 0)
// 12 Bits: Sequence number
| ((if_packet_info.packet_count & 0xFFF) << 16)
// 16 Bits: Total packet length
@@ -111,6 +112,8 @@ UHD_INLINE void _hdr_unpack_chdr(
&& ((chdr & HDR_FLAG_EOB) > 0);
if_packet_info.error = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_RESP)
&& ((chdr & HDR_FLAG_ERROR) > 0);
+ if_packet_info.fc_ack = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_FC)
+ && ((chdr & HDR_FLAG_FCACK) > 0);
if_packet_info.packet_count = (chdr >> 16) & 0xFFF;
// Set packet length variables
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 921bfdfa0..c962d40e6 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -12,9 +12,9 @@
#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/byteswap.hpp>
+#include <uhd/utils/log.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
@@ -22,7 +22,6 @@
#include <boost/dynamic_bitset.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
-#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
#include <vector>
@@ -59,6 +58,7 @@ class recv_packet_handler{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
typedef boost::function<void(const size_t)> handle_flowctrl_type;
+ typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type;
typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
@@ -102,33 +102,6 @@ public:
_header_offset_words32 = header_offset_words32;
}
- ////////////////// RFNOC ///////////////////////////
- //! Set the stream ID for a specific channel (or no SID)
- void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){
- _props.at(xport_chan).has_sid = has_sid;
- _props.at(xport_chan).sid = sid;
- }
-
- //! Get the stream ID for a specific channel (or zero if no SID)
- uint32_t get_xport_chan_sid(const size_t xport_chan) const {
- if (_props.at(xport_chan).has_sid) {
- return _props.at(xport_chan).sid;
- } else {
- return 0;
- }
- }
-
- void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator)
- {
- _terminator = terminator;
- }
-
- uhd::rfnoc::rx_stream_terminator::sptr get_terminator()
- {
- return _terminator;
- }
- ////////////////// RFNOC ///////////////////////////
-
/*!
* Set the threshold for alignment failure.
* How many packets throw out before giving up?
@@ -183,6 +156,13 @@ public:
if (do_init) handle_flowctrl(0);
}
+ void set_xport_handle_flowctrl_ack(
+ const size_t xport_chan,
+ const handle_flowctrl_ack_type &handle_flowctrl_ack
+ ) {
+ _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack;
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_outputs = id.num_outputs;
@@ -211,12 +191,11 @@ public:
//! Overload call to issue stream commands
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
{
- // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this.
- //if (stream_cmd.stream_now
- //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
- //and _props.size() > 1) {
- //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
- //}
+ if (size() > 1 and stream_cmd.stream_now and
+ stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
+ {
+ throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align.");
+ }
for (size_t i = 0; i < _props.size(); i++)
{
@@ -307,11 +286,8 @@ private:
size_t packet_count;
handle_overflow_type handle_overflow;
handle_flowctrl_type handle_flowctrl;
+ handle_flowctrl_ack_type handle_flowctrl_ack;
size_t fc_update_window;
- /////// RFNOC ///////////
- bool has_sid;
- uint32_t sid;
- /////// RFNOC ///////////
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -340,6 +316,7 @@ private:
buffers_info_type(const size_t size):
std::vector<per_buffer_info_type>(size),
indexes_todo(size, true),
+ alignment_time(0),
alignment_time_valid(false),
data_bytes_to_copy(0),
fragment_offset_in_samps(0)
@@ -384,8 +361,6 @@ private:
int recvd_packets;
#endif
- uhd::rfnoc::rx_stream_terminator::sptr _terminator;
-
/*******************************************************************
* Get and process a single packet from the transport:
* Receive a single packet at the given index.
@@ -398,42 +373,58 @@ private:
per_buffer_info_type &curr_buffer_info,
double timeout
){
- //get a single packet from the transport layer
managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
- buff = _props[index].get_buff(timeout);
- if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
-
- #ifdef ERROR_INJECT_DROPPED_PACKETS
- if (++recvd_packets > 1000)
+ per_buffer_info_type &info = curr_buffer_info;
+ while (1)
{
- recvd_packets = 0;
- buff.reset();
+ //get a single packet from the transport layer
buff = _props[index].get_buff(timeout);
if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
- }
- #endif
- //bounds check before extract
- size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
- if (num_packet_words32 <= _header_offset_words32){
- throw std::runtime_error("recv buffer smaller than vrt packet offset");
- }
+ #ifdef ERROR_INJECT_DROPPED_PACKETS
+ if (++recvd_packets > 1000)
+ {
+ recvd_packets = 0;
+ buff.reset();
+ buff = _props[index].get_buff(timeout);
+ if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR;
+ }
+ #endif
- //extract packet info
- per_buffer_info_type &info = curr_buffer_info;
- info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
- info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
- _vrt_unpacker(info.vrt_hdr, info.ifpi);
- info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
- info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
-
- //handle flow control
- if (_props[index].handle_flowctrl)
- {
- if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
+ //bounds check before extract
+ size_t num_packet_words32 = buff->size()/sizeof(uint32_t);
+ if (num_packet_words32 <= _header_offset_words32){
+ throw std::runtime_error("recv buffer smaller than vrt packet offset");
+ }
+
+ //extract packet info
+ info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
+ info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32;
+ _vrt_unpacker(info.vrt_hdr, info.ifpi);
+ info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
+ info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+
+ //handle flow control
+ if (_props[index].handle_flowctrl)
{
- _props[index].handle_flowctrl(info.ifpi.packet_count);
+ if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0)
+ {
+ _props[index].handle_flowctrl(info.ifpi.packet_count);
+ }
}
+
+ //handle flow control ack
+ if (info.ifpi.fc_ack){
+ if (_props[index].handle_flowctrl_ack) {
+ _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff));
+ }
+ // Process the next packet
+ buff.reset();
+ info.copy_buff = nullptr;
+ continue;
+ }
+
+ break;
}
//--------------------------------------------------------------
@@ -605,7 +596,7 @@ private:
rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
curr_info.metadata = metadata;
- UHD_LOG_FASTPATH("O")
+ UHD_LOG_FASTPATH("O");
}
curr_info[index].buff.reset();
curr_info[index].copy_buff = nullptr;
@@ -627,7 +618,7 @@ private:
prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate);
curr_info.metadata.out_of_sequence = true;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
- UHD_LOG_FASTPATH("D")
+ UHD_LOG_FASTPATH("D");
return;
}
@@ -635,10 +626,10 @@ private:
//too many iterations: detect alignment failure
if (iterations++ > _alignment_failure_threshold){
UHD_LOGGER_ERROR("STREAMER") << boost::format(
- "The receive packet handler failed to time-align packets. "
- "%u received packets were processed by the handler. "
- "However, a timestamp match could not be determined."
- ) % iterations;
+ "The receive packet handler failed to time-align packets.\n"
+ "%u received packets were processed by the handler.\n"
+ "However, a timestamp match could not be determined.\n"
+ ) % iterations << std::endl;
std::swap(curr_info, next_info); //save progress from curr -> next
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
_props[index].handle_overflow();
@@ -659,7 +650,7 @@ private:
}
/*******************************************************************
- * Receive a single packet:
+ * Receive a single packet on all channels
* Handles fragmentation, messages, errors, and copy-conversion.
* When no fragments are available, call the get aligned buffers.
* Then copy-convert available data into the user's IO buffers.
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index e824ef4e9..5cba570a7 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -12,7 +12,6 @@
#include <uhd/exception.hpp>
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/thread.hpp>
@@ -49,6 +48,7 @@ namespace sph {
class send_packet_handler{
public:
typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<void(void)> post_send_cb_type;
typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;
typedef void(*vrt_packer_type)(uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
@@ -93,27 +93,6 @@ public:
_props.at(xport_chan).sid = sid;
}
- ///////// RFNOC ///////////////////
- //! Get the stream ID for a specific channel (or zero if no SID)
- uint32_t get_xport_chan_sid(const size_t xport_chan) const {
- if (_props.at(xport_chan).has_sid) {
- return _props.at(xport_chan).sid;
- } else {
- return 0;
- }
- }
-
- void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator)
- {
- _terminator = terminator;
- }
-
- uhd::rfnoc::tx_stream_terminator::sptr get_terminator()
- {
- return _terminator;
- }
- ///////// RFNOC ///////////////////
-
void set_enable_trailer(const bool enable)
{
_has_tlr = enable;
@@ -138,6 +117,15 @@ public:
_props.at(xport_chan).get_buff = get_buff;
}
+ /*!
+ * Set the callback function for post-send.
+ * \param xport_chan which transport channel
+ * \param cb post-send callback
+ */
+ void set_xport_chan_post_send_cb(const size_t xport_chan, const post_send_cb_type &cb){
+ _props.at(xport_chan).go_postal = cb;
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_inputs = id.num_inputs;
@@ -198,6 +186,7 @@ public:
if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate);
if_packet_info.sob = metadata.start_of_burst;
if_packet_info.eob = metadata.end_of_burst;
+ if_packet_info.fc_ack = false; //This is a data packet
/*
* Metadata is cached when we get a send requesting a start of burst with no samples.
@@ -291,6 +280,7 @@ private:
struct xport_chan_props_type{
xport_chan_props_type(void):has_sid(false),sid(0){}
get_buff_type get_buff;
+ post_send_cb_type go_postal;
bool has_sid;
uint32_t sid;
managed_send_buffer::sptr buff;
@@ -308,8 +298,6 @@ private:
bool _cached_metadata;
uhd::tx_metadata_t _metadata_cache;
- uhd::rfnoc::tx_stream_terminator::sptr _terminator;
-
#ifdef UHD_TXRX_DEBUG_PRINTS
struct dbg_send_stat_t {
dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate):
@@ -428,6 +416,11 @@ private:
const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
buff->commit(num_vita_words32*sizeof(uint32_t));
buff.reset(); //effectively a release
+
+ if (_props[index].go_postal)
+ {
+ _props[index].go_postal();
+ }
}
//! Shared variables for the worker threads
diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp
index 709b9e981..25be35569 100644
--- a/host/lib/transport/zero_copy_flow_ctrl.cpp
+++ b/host/lib/transport/zero_copy_flow_ctrl.cpp
@@ -65,7 +65,7 @@ public:
zero_copy_flow_ctrl_mrb(
flow_ctrl_func flow_ctrl
) :
- _mb(nullptr),
+ _mb(NULL),
_flow_ctrl(flow_ctrl)
{
/* NOP */
@@ -80,8 +80,6 @@ public:
{
if (_mb)
{
- _mb->commit(size());
- while (_flow_ctrl and not _flow_ctrl(_mb)) {}
_mb.reset();
}
}
@@ -89,6 +87,7 @@ public:
UHD_INLINE sptr get(sptr &mb)
{
_mb = mb;
+ while (_flow_ctrl and not _flow_ctrl(_mb)) {}
return make(this, _mb->cast<void *>(), _mb->size());
}
diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp
index 8ecb1f72b..580268e4a 100644
--- a/host/lib/usrp/device3/device3_impl.hpp
+++ b/host/lib/usrp/device3/device3_impl.hpp
@@ -21,6 +21,10 @@
#include <uhd/types/direction.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/device3.hpp>
+#include "../../transport/super_send_packet_handler.hpp"
+#include "../../transport/super_recv_packet_handler.hpp"
+#include <uhdlib/rfnoc/tx_stream_terminator.hpp>
+#include <uhdlib/rfnoc/rx_stream_terminator.hpp>
#include <uhdlib/rfnoc/xports.hpp>
namespace uhd { namespace usrp {
@@ -30,11 +34,83 @@ namespace uhd { namespace usrp {
**********************************************************************/
static const size_t DEVICE3_RX_FC_REQUEST_FREQ = 32; //per flow-control window
static const size_t DEVICE3_TX_FC_RESPONSE_FREQ = 8;
-static const size_t DEVICE3_TX_FC_RESPONSE_CYCLES = 0; // Cycles: Off.
+static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2;
+static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET = 0;
+static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET = 1;
+static const size_t DEVICE3_LINE_SIZE = 8;
static const size_t DEVICE3_TX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes
static const size_t DEVICE3_RX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes
+// This class manages the lifetime of the TX async message handler task, transports, and terminator
+class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer
+{
+public:
+ device3_send_packet_streamer(
+ const size_t max_num_samps,
+ const uhd::rfnoc::tx_stream_terminator::sptr terminator,
+ const both_xports_t data_xport,
+ const both_xports_t async_msg_xport
+ ) :
+ uhd::transport::sph::send_packet_streamer(max_num_samps),
+ _terminator(terminator),
+ _data_xport(data_xport),
+ _async_msg_xport(async_msg_xport)
+ {};
+
+ ~device3_send_packet_streamer()
+ {
+ // Make sure the async task is destroyed before the transports
+ _tx_async_msg_tasks.clear();
+ };
+
+ uhd::rfnoc::tx_stream_terminator::sptr get_terminator()
+ {
+ return _terminator;
+ }
+
+ void add_async_msg_task(task::sptr task)
+ {
+ _tx_async_msg_tasks.push_back(task);
+ }
+
+private:
+ uhd::rfnoc::tx_stream_terminator::sptr _terminator;
+ both_xports_t _data_xport;
+ both_xports_t _async_msg_xport;
+ std::vector<task::sptr> _tx_async_msg_tasks;
+};
+
+// This class manages the lifetime of the RX transports and terminator and provides access to both
+class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer
+{
+public:
+ device3_recv_packet_streamer(
+ const size_t max_num_samps,
+ const uhd::rfnoc::rx_stream_terminator::sptr terminator,
+ const both_xports_t xport
+ ) :
+ uhd::transport::sph::recv_packet_streamer(max_num_samps),
+ _terminator(terminator),
+ _xport(xport) {};
+
+ ~device3_recv_packet_streamer() {};
+
+ both_xports_t get_xport()
+ {
+ return _xport;
+ }
+
+ uhd::rfnoc::rx_stream_terminator::sptr get_terminator()
+ {
+ return _terminator;
+ }
+
+private:
+ uhd::rfnoc::rx_stream_terminator::sptr _terminator;
+ both_xports_t _xport;
+};
+
class device3_impl : public uhd::device3, public boost::enable_shared_from_this<device3_impl>
{
public:
@@ -64,14 +140,11 @@ public:
size_t rx_fc_request_freq;
//! How often the downstream block should send ACKs per one full FC window
size_t tx_fc_response_freq;
- //! How often the downstream block should send ACKs in cycles
- size_t tx_fc_response_cycles;
stream_options_t(void)
: tx_max_len_hdr(DEVICE3_TX_MAX_HDR_LEN)
, rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN)
, rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ)
, tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ)
- , tx_fc_response_cycles(DEVICE3_TX_FC_RESPONSE_CYCLES)
{};
};
diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp
index 8882552af..92865b6fe 100644
--- a/host/lib/usrp/device3/device3_io_impl.cpp
+++ b/host/lib/usrp/device3/device3_io_impl.cpp
@@ -13,8 +13,6 @@
#include <uhd/rfnoc/sink_block_ctrl_base.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/log.hpp>
-#include "../../transport/super_recv_packet_handler.hpp"
-#include "../../transport/super_send_packet_handler.hpp"
#include <uhd/rfnoc/rate_node_ctrl.hpp>
#include <uhd/rfnoc/radio_ctrl.hpp>
#include <uhd/transport/zero_copy_flow_ctrl.hpp>
@@ -30,10 +28,6 @@ using namespace uhd;
using namespace uhd::usrp;
using namespace uhd::transport;
-//! CHDR uses 12-Bit sequence numbers
-static const uint32_t HW_SEQ_NUM_MASK = 0xfff;
-
-
/***********************************************************************
* Helper functions for get_?x_stream()
**********************************************************************/
@@ -146,8 +140,25 @@ void generate_channel_list(
struct rx_fc_cache_t
{
rx_fc_cache_t():
- last_seq_in(0){}
- size_t last_seq_in;
+ interval(0),
+ last_byte_count(0),
+ total_bytes_consumed(0),
+ total_packets_consumed(0),
+ seq_num(0) {}
+
+ //! Flow control interval in bytes
+ size_t interval;
+ //! Byte count at last flow control packet
+ uint32_t last_byte_count;
+ //! This will wrap around, but that's OK, because math.
+ uint32_t total_bytes_consumed;
+ //! This will wrap around, but that's OK, because math.
+ uint32_t total_packets_consumed;
+ //! Sequence number of next flow control packet
+ uint64_t seq_num;
+ sid_t sid;
+ zero_copy_if::sptr xport;
+ endianness_t endianness;
};
/*! Determine the size of the flow control window in number of packets.
@@ -180,109 +191,149 @@ static size_t get_rx_flow_control_window(
throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)");
}
- size_t window_in_pkts = (static_cast<size_t>(sw_buff_size * fullness_factor) / pkt_size);
+ size_t window_in_bytes = (static_cast<size_t>(sw_buff_size * fullness_factor));
if (rx_args.has_key("max_recv_window")) {
- window_in_pkts = std::min(
- window_in_pkts,
- rx_args.cast<size_t>("max_recv_window", window_in_pkts)
+ window_in_bytes = std::min(
+ window_in_bytes,
+ rx_args.cast<size_t>("max_recv_window", window_in_bytes)
);
}
- if (window_in_pkts == 0) {
+ if (window_in_bytes < pkt_size) {
throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size.");
}
- UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= pkt_size * window_in_pkts);
- return window_in_pkts;
+ UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= window_in_bytes);
+ return window_in_bytes;
}
/*! Send out RX flow control packets.
*
- * For an rx stream, this function takes care of sending back
- * a flow control packet to the source telling it which
- * packets have been consumed.
- *
- * This function should only be called by the function handling
- * the rx stream, usually recv() in super_recv_packet_handler.
+ * This function handles updating the counters for the consumed
+ * bytes and packets, determines if a flow control message is
+ * is necessary, and sends one if it is. Passing a nullptr for
+ * the buff parameter will skip the counter update.
*
- * \param sid The SID that goes into this packet. This is the reversed()
- * version of the data stream's SID.
- * \param xport A transport object over which to send the data
- * \param big_endian Endianness of the transport
- * \param seq32_state Pointer to a variable that saves the 32-Bit state
- * of the sequence numbers, since we only have 12 Bit
- * sequence numbers in CHDR.
- * \param last_seq The value to send: The last consumed packet's sequence number.
+ * \param fc_cache RX flow control state information
+ * \param buff Receive buffer. Setting to nullptr will
+ * skip the counter update.
*/
-static void handle_rx_flowctrl(
- const sid_t &sid,
- zero_copy_if::sptr xport,
- endianness_t endianness,
+static bool rx_flow_ctrl(
boost::shared_ptr<rx_fc_cache_t> fc_cache,
- const size_t last_seq
+ managed_buffer::sptr buff
) {
- static const size_t RXFC_PACKET_LEN_IN_WORDS = 2;
- static const size_t RXFC_CMD_CODE_OFFSET = 0;
- static const size_t RXFC_SEQ_NUM_OFFSET = 1;
+ // If the caller supplied a buffer
+ if (buff)
+ {
+ // Unpack the header
+ vrt::if_packet_info_t packet_info;
+ packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
+ const uint32_t *pkt = buff->cast<const uint32_t *>();
+ try {
+ if (fc_cache->endianness == ENDIANNESS_BIG)
+ {
+ vrt::chdr::if_hdr_unpack_be(pkt, packet_info);
+ } else {
+ vrt::chdr::if_hdr_unpack_le(pkt, packet_info);
+ }
+ }
+ catch(const std::exception &ex)
+ {
+ // Log and ignore
+ UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl;
+ return true;
+ }
- managed_send_buffer::sptr buff = xport->get_send_buff(0.0);
- if (not buff) {
- throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer");
+ // Update counters assuming the buffer is a consumed packet
+ if (not packet_info.error)
+ {
+ fc_cache->total_bytes_consumed += buff->size();
+ fc_cache->total_packets_consumed++;
+ }
}
- uint32_t *pkt = buff->cast<uint32_t *>();
- // Recover sequence number. The sequence numbers handled by the streamers
- // are 12 Bits, but we want to know the 32-Bit sequence number.
- size_t &seq32 = fc_cache->last_seq_in;
- const size_t seq12 = seq32 & HW_SEQ_NUM_MASK;
- if (last_seq < seq12)
- seq32 += (HW_SEQ_NUM_MASK + 1);
- seq32 &= ~HW_SEQ_NUM_MASK;
- seq32 |= last_seq;
+ // Just return if there is no need to send a flow control packet
+ if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval)
+ {
+ return true;
+ }
- // Super-verbose mode:
- //static size_t fc_pkt_count = 0;
- //UHD_LOGGER_INFO("STREAMER") << "sending flow ctrl packet " << fc_pkt_count++ << ", acking " << str(boost::format("%04d\tseq_sw==0x%08x") % last_seq % seq32) ;
+ // Time to send a flow control packet
+ // Get a send buffer
+ managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0);
+ if (not fc_buff) {
+ throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");
+ }
+ uint32_t *pkt = fc_buff->cast<uint32_t *>();
//load packet info
vrt::if_packet_info_t packet_info;
packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC;
- packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS;
+ packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32;
packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t);
- packet_info.packet_count = seq32;
+ packet_info.packet_count = fc_cache->seq_num++;
packet_info.sob = false;
packet_info.eob = false;
- packet_info.sid = sid.get();
+ packet_info.error = false;
+ packet_info.fc_ack = false;
+ packet_info.sid = fc_cache->sid.get();
packet_info.has_sid = true;
packet_info.has_cid = false;
packet_info.has_tsi = false;
packet_info.has_tsf = false;
packet_info.has_tlr = false;
- if (endianness == ENDIANNESS_BIG) {
+ if (fc_cache->endianness == ENDIANNESS_BIG) {
// Load Header:
vrt::chdr::if_hdr_pack_be(pkt, packet_info);
- // Load Payload: (the sequence number)
- pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htonx<uint32_t>(0);
- pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET] = uhd::htonx<uint32_t>(seq32);
+ // Load Payload: Packet count, and byte count
+ pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
+ uhd::htonx<uint32_t>(fc_cache->total_packets_consumed);
+ pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
+ uhd::htonx<uint32_t>(fc_cache->total_bytes_consumed);
} else {
// Load Header:
vrt::chdr::if_hdr_pack_le(pkt, packet_info);
- // Load Payload: (the sequence number)
- pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htowx<uint32_t>(0);
- pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET] = uhd::htowx<uint32_t>(seq32);
+ // Load Payload: Packet count, and byte count
+ pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
+ uhd::htowx<uint32_t>(fc_cache->total_packets_consumed);
+ pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
+ uhd::htowx<uint32_t>(fc_cache->total_bytes_consumed);
}
- //std::cout << " SID=" << std::hex << sid << " hdr bits=" << packet_info.packet_type << " seq32=" << seq32 << std::endl;
- //std::cout << "num_packet_words32: " << packet_info.num_packet_words32 << std::endl;
- //for (size_t i = 0; i < packet_info.num_packet_words32; i++) {
- //std::cout << str(boost::format("0x%08x") % pkt[i]) << " ";
- //if (i % 2) {
- //std::cout << std::endl;
- //}
- //}
-
//send the buffer over the interface
- buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32));
+ fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32));
+
+ //update byte count
+ fc_cache->last_byte_count = fc_cache->total_bytes_consumed;
+
+ return true;
+}
+
+/*! Handle RX flow control ACK packets.
+ *
+ */
+static void handle_rx_flowctrl_ack(
+ boost::shared_ptr<rx_fc_cache_t> fc_cache,
+ const uint32_t *payload
+) {
+ const uint32_t pkt_count = (fc_cache->endianness == ENDIANNESS_BIG) ?
+ uhd::ntohx<uint32_t>(payload[0]) :
+ uhd::wtohx<uint32_t>(payload[0]);
+ const uint32_t byte_count = (fc_cache->endianness == ENDIANNESS_BIG) ?
+ uhd::ntohx<uint32_t>(payload[1]) :
+ uhd::wtohx<uint32_t>(payload[1]);
+ if (fc_cache->total_bytes_consumed != byte_count)
+ {
+ UHD_LOGGER_DEBUG("device3")
+ << "oh noes: byte_count==" << byte_count
+ << " total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::endl
+ ;
+ }
+ fc_cache->total_bytes_consumed = byte_count;
+ fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too?
+
+ // This will send a flow control packet if there is a significant discrepancy
+ rx_flow_ctrl(fc_cache, nullptr);
}
/***********************************************************************
@@ -293,56 +344,51 @@ static void handle_rx_flowctrl(
//! Stores the state of TX flow control
struct tx_fc_cache_t
{
- tx_fc_cache_t(size_t capacity):
+ tx_fc_cache_t(uint32_t capacity):
+ last_byte_ack(0),
last_seq_ack(0),
- space(capacity) {}
-
- size_t last_seq_ack;
- size_t space;
+ byte_count(0),
+ pkt_count(0),
+ window_size(capacity),
+ fc_ack_seqnum(0),
+ fc_received(false) {}
+
+ uint32_t last_byte_ack;
+ uint32_t last_seq_ack;
+ uint32_t byte_count;
+ uint32_t pkt_count;
+ uint32_t window_size;
+ uint32_t fc_ack_seqnum;
+ bool fc_received;
};
-/*! Return the size of the flow control window in packets.
- *
- * If the return value of this function is F, the last tx'd packet
- * has index N and the last ack'd packet has index M, the amount of
- * FC credit we have is C = F + M - N (i.e. we can send C more packets
- * before getting another ack).
- *
- * Note: If `send_buff_size` is set in \p tx_hints, this will
- * override hw_buff_size_.
- */
-static size_t get_tx_flow_control_window(
- size_t pkt_size,
- const double hw_buff_size_,
- const device_addr_t& tx_hints
-) {
- double hw_buff_size = tx_hints.cast<double>("send_buff_size", hw_buff_size_);
- size_t window_in_pkts = (static_cast<size_t>(hw_buff_size) / pkt_size);
- if (window_in_pkts == 0) {
- throw uhd::value_error("send_buff_size must be larger than the send_frame_size.");
- }
- return window_in_pkts;
-}
-
static bool tx_flow_ctrl(
boost::shared_ptr<tx_fc_cache_t> fc_cache,
- zero_copy_if::sptr async_xport,
- uint32_t (*endian_conv)(uint32_t),
+ zero_copy_if::sptr xport,
+ uint32_t (*to_host)(uint32_t),
void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &),
- managed_buffer::sptr
+ managed_buffer::sptr buff
) {
while (true)
{
// If there is space
- if (fc_cache->space)
+ if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size())
{
// All is good - packet will be sent
- fc_cache->space--;
+ fc_cache->byte_count += buff->size();
+ // Round up to nearest word
+ if (fc_cache->byte_count % DEVICE3_LINE_SIZE)
+ {
+ fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE);
+ }
+ fc_cache->pkt_count++;
return true;
}
// Look for a flow control message to update the space available in the buffer.
- managed_recv_buffer::sptr buff = async_xport->get_recv_buff();
+ // A minimal timeout is used because larger timeouts can cause the thread to be
+ // scheduled out for too long at high data rates and result in underruns.
+ managed_recv_buffer::sptr buff = xport->get_recv_buff(0.000001);
if (buff)
{
vrt::if_packet_info_t if_packet_info;
@@ -353,28 +399,94 @@ static bool tx_flow_ctrl(
}
catch(const std::exception &ex)
{
- UHD_LOG_ERROR("TX FLOW CTRL", "Error unpacking async flow control packet: " << ex.what());
+ UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl;
continue;
}
if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC)
{
- UHD_LOG_ERROR(
- "TX FLOW CTRL",
- "Unexpected packet type received by flow control handler: " << if_packet_info.packet_type
- );
+ UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl;
continue;
}
+ const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32];
+ const uint32_t pkt_count = to_host(payload[0]);
+ const uint32_t byte_count = to_host(payload[1]);
+
// update the amount of space
- size_t seq_ack = endian_conv(packet_buff[if_packet_info.num_header_words32+1]);
- fc_cache->space += (seq_ack - fc_cache->last_seq_ack) & HW_SEQ_NUM_MASK;
- fc_cache->last_seq_ack = seq_ack;
+ fc_cache->last_byte_ack = byte_count;
+ fc_cache->last_seq_ack = pkt_count;
+
+ fc_cache->fc_received = true;
}
}
return false;
}
+static void tx_flow_ctrl_ack(
+ boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ zero_copy_if::sptr send_xport,
+ sid_t send_sid,
+ uint32_t (*from_host)(uint32_t),
+ void (*pack)(uint32_t *packet_buff, vrt::if_packet_info_t &)
+) {
+ if (not fc_cache->fc_received)
+ {
+ return;
+ }
+
+ // Time to send a flow control ACK packet
+ // Get a send buffer
+ managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0);
+ if (not fc_buff) {
+ UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer";
+ return;
+ }
+ uint32_t *pkt = fc_buff->cast<uint32_t *>();
+
+ // Load packet info
+ vrt::if_packet_info_t packet_info;
+ packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK;
+ packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32;
+ packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t);
+ packet_info.packet_count = fc_cache->fc_ack_seqnum++;
+ packet_info.sob = false;
+ packet_info.eob = true;
+ packet_info.error = false;
+ packet_info.fc_ack = false;
+ packet_info.sid = send_sid.get();
+ packet_info.has_sid = true;
+ packet_info.has_cid = false;
+ packet_info.has_tsi = false;
+ packet_info.has_tsf = false;
+ packet_info.has_tlr = false;
+
+ // Load Header:
+ pack(pkt, packet_info);
+
+ // Update counters to include this packet
+ size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32);
+ fc_cache->byte_count += fc_ack_pkt_size;
+ // Round up to nearest word
+ if (fc_cache->byte_count % DEVICE3_LINE_SIZE)
+ {
+ fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE);
+ }
+ fc_cache->pkt_count++;
+
+ // Load Payload: Packet count, and byte count
+ pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] =
+ from_host(fc_cache->pkt_count);
+ pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] =
+ from_host(fc_cache->byte_count);
+
+ // Send the buffer over the interface
+ fc_buff->commit(fc_ack_pkt_size);
+
+ // Reset for next FC
+ fc_cache->fc_received = false;
+}
+
/***********************************************************************
* TX Async Message Functions
**********************************************************************/
@@ -445,13 +557,10 @@ static void handle_tx_async_msgs(
async_info->stream_channel
);
- // Filter out any flow control messages and cache the rest
+ // Filter out any flow control messages and cache the rest
if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL)
{
- UHD_LOG_ERROR(
- "TX ASYNC",
- "Unexpected flow control message found in async message handling"
- );
+ UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl;
} else {
async_info->async_queue->push_with_pop_on_full(metadata);
metadata.channel = async_info->device_channel;
@@ -474,8 +583,8 @@ void device3_impl::update_rx_streamers(double /* rate */)
{
for(const std::string &block_id: _rx_streamers.keys()) {
UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id;
- boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
- boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[block_id].lock());
+ boost::shared_ptr<device3_recv_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<device3_recv_packet_streamer>(_rx_streamers[block_id].lock());
if (my_streamer) {
double tick_rate = my_streamer->get_terminator()->get_tick_rate();
if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) {
@@ -511,12 +620,15 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
// Note: All 'args.args' are merged into chan_args now.
// II. Iterate over all channels
- boost::shared_ptr<sph::recv_packet_streamer> my_streamer;
+ boost::shared_ptr<device3_recv_packet_streamer> my_streamer;
// The terminator's lifetime is coupled to the streamer.
// There is only one terminator. If the streamer has multiple channels,
// it will be connected to each upstream block.
rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make();
- for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) {
+ for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++)
+ {
+ // First, configure blocks and create transport
+
// Get block ID and mb index
uhd::rfnoc::block_id_t block_id = chan_list[stream_i];
UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ;
@@ -549,7 +661,36 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
uhd::sid_t stream_address = blk_ctrl->get_address(block_port);
UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ;
both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints);
- UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size ;
+ UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size;
+
+ // Configure the block
+ // Flow control setup
+ const size_t pkt_size = xport.recv->get_recv_frame_size();
+ // Leave one pkt_size space for overrun packets - TODO make this obsolete
+ const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size;
+ const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq);
+ UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window;
+ blk_ctrl->configure_flow_control_out(
+ true,
+ fc_window,
+ rx_hints.cast<size_t>("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32
+ block_port
+ );
+
+ // Add flow control transport
+ boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
+ fc_cache->sid = xport.send_sid;
+ fc_cache->xport = xport.send;
+ fc_cache->endianness = xport.endianness;
+ fc_cache->interval = fc_handle_window;
+ xport.recv = zero_copy_flow_ctrl::make
+ (
+ xport.recv,
+ NULL,
+ [=](managed_buffer::sptr buff) {
+ return rx_flow_ctrl(fc_cache, buff);
+ }
+ );
// Configure the block
// Note: We need to set_destination() after writing to SR_CLEAR_TX_FC.
@@ -558,8 +699,10 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
// other settings.
blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x1, block_port);
blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x0, block_port);
+ // Configure routing for data
blk_ctrl->set_destination(xport.send_sid.get_src(), block_port);
+ // Configure routing for responses
blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port);
UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ;
@@ -570,17 +713,24 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port);
}
- // To calculate the max number of samples per packet, we assume the maximum header length
- // to avoid fragmentation should the entire header be used.
- const size_t bpp = xport.recv->get_recv_frame_size() - stream_options.rx_max_len_hdr; // bytes per packet
- const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item
- const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet
- UHD_RX_STREAMER_LOG() << "spp == " << spp ;
+ // Second, configure the streamer
//make the new streamer given the samples per packet
if (not my_streamer)
- my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp);
- my_streamer->resize(chan_list.size());
+ {
+ // To calculate the max number of samples per packet, we assume the maximum header length
+ // to avoid fragmentation should the entire header be used.
+ const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet
+ const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item
+ const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet
+ UHD_RX_STREAMER_LOG() << "spp == " << spp ;
+
+ my_streamer = boost::make_shared<device3_recv_packet_streamer>(
+ spp,
+ recv_terminator,
+ xport);
+ my_streamer->resize(chan_list.size());
+ }
//init some streamer stuff
std::string conv_endianness;
@@ -600,72 +750,51 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)
id.num_outputs = 1;
my_streamer->set_converter(id);
- //flow control setup
- const size_t pkt_size = spp * bpi + stream_options.rx_max_len_hdr;
- const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints);
- const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq);
- UHD_RX_STREAMER_LOG()<< "Flow Control Window (minus one) = " << fc_window-1 << ", Flow Control Handler Window = " << fc_handle_window ;
- blk_ctrl->configure_flow_control_out(
- fc_window-1, // Leave one space for overrun packets TODO make this obsolete
- block_port
+ // Give the streamer a functor to handle flow control ACK messages
+ my_streamer->set_xport_handle_flowctrl_ack(
+ stream_i,
+ [=](const uint32_t *payload) {
+ handle_rx_flowctrl_ack(
+ fc_cache,
+ payload
+ );
+ }
);
//Give the streamer a functor to get the recv_buffer
- //bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency
my_streamer->set_xport_chan_get_buff(
stream_i,
- boost::bind(&zero_copy_if::get_recv_buff, xport.recv, _1),
+ [=](double timeout) {return xport.recv->get_recv_buff(timeout);},
true /*flush*/
);
//Give the streamer a functor to handle overruns
//bind requires a weak_ptr to break the a streamer->streamer circular dependency
//Using "this" is OK because we know that this device3_impl will outlive the streamer
+ boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer);
my_streamer->set_overflow_handler(
- stream_i,
- boost::bind(
- &uhd::rfnoc::rx_stream_terminator::handle_overrun, recv_terminator,
- boost::weak_ptr<uhd::rx_streamer>(my_streamer), stream_i
- )
- );
-
- //Give the streamer a functor to send flow control messages
- //handle_rx_flowctrl is static and has no lifetime issues
- boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
- my_streamer->set_xport_handle_flowctrl(
- stream_i, boost::bind(
- &handle_rx_flowctrl,
- xport.send_sid,
- xport.send,
- xport.endianness,
- fc_cache,
- _1
- ),
- fc_handle_window,
- true/*init*/
+ stream_i,
+ [=]() {
+ recv_terminator->handle_overrun(
+ weak_ptr,
+ stream_i);
+ }
);
//Give the streamer a functor issue stream cmd
- //bind requires a shared pointer to add a streamer->framer lifetime dependency
my_streamer->set_issue_stream_cmd(
stream_i,
- boost::bind(&uhd::rfnoc::source_block_ctrl_base::issue_stream_cmd, blk_ctrl, _1, block_port)
+ [=](const stream_cmd_t& stream_cmd) {blk_ctrl->issue_stream_cmd(stream_cmd, block_port);}
);
-
- // Tell the streamer which SID is valid for this channel
- my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid);
}
- // Connect the terminator to the streamer
- my_streamer->set_terminator(recv_terminator);
-
// Notify all blocks in this chain that they are connected to an active streamer
recv_terminator->set_rx_streamer(true, 0);
// Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency.
// Note that we store the streamer only once, and use its terminator's
// ID to do so.
- _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<sph::recv_packet_streamer>(my_streamer);
+ _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<uhd::rx_streamer>(my_streamer);
// Sets tick rate, samp rate and scaling on this streamer.
// A registered terminator is required to do this.
@@ -682,8 +811,8 @@ void device3_impl::update_tx_streamers(double /* rate */)
{
for(const std::string &block_id: _tx_streamers.keys()) {
UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id;
- boost::shared_ptr<sph::send_packet_streamer> my_streamer =
- boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[block_id].lock());
+ boost::shared_ptr<device3_send_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<device3_send_packet_streamer>(_tx_streamers[block_id].lock());
if (my_streamer) {
double tick_rate = my_streamer->get_terminator()->get_tick_rate();
if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) {
@@ -705,20 +834,6 @@ void device3_impl::update_tx_streamers(double /* rate */)
}
}
-// This class manages the lifetime of the TX async message handler task and transports
-class device3_send_packet_streamer : public sph::send_packet_streamer
-{
-public:
- device3_send_packet_streamer(const size_t max_num_samps) : sph::send_packet_streamer(max_num_samps) {};
- ~device3_send_packet_streamer() {
- _tx_async_msg_task.reset(); // Make sure the async task is destroyed before the transports
- };
-
- both_xports_t _xport;
- both_xports_t _async_xport;
- task::sptr _tx_async_msg_task;
-};
-
tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
{
boost::mutex::scoped_lock lock(_transport_setup_mutex);
@@ -739,7 +854,10 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
// There is only one terminator. If the streamer has multiple channels,
// it will be connected to each downstream block.
rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make();
- for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) {
+ for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++)
+ {
+ // First, configure the downstream blocks and create the transports
+
// Get block ID and mb index
uhd::rfnoc::block_id_t block_id = chan_list[stream_i];
// Update args so args.args is always valid for this particular channel:
@@ -768,87 +886,42 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
// Setup the dsp transport hints
device_addr_t tx_hints = get_tx_hints(mb_index);
- //allocate sid and create transport
+ // Allocate sid and create transport
uhd::sid_t stream_address = blk_ctrl->get_address(block_port);
UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ;
both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints);
both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t(""));
- UHD_TX_STREAMER_LOG() << std::hex << "[TX Streamer] data_sid = " << xport.send_sid << std::dec << std::endl;
-
- // To calculate the max number of samples per packet, we assume the maximum header length
- // to avoid fragmentation should the entire header be used.
- const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr;
- const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item
- const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet
- UHD_TX_STREAMER_LOG() << "spp == " << spp ;
-
- //make the new streamer given the samples per packet
- if (not my_streamer)
- my_streamer = boost::make_shared<device3_send_packet_streamer>(spp);
- my_streamer->resize(chan_list.size());
- my_streamer->_xport = xport;
- my_streamer->_async_xport = async_xport;
-
- //init some streamer stuff
- std::string conv_endianness;
- if (xport.endianness == ENDIANNESS_BIG) {
- my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be);
- conv_endianness = "be";
- } else {
- my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le);
- conv_endianness = "le";
- }
-
- //set the converter
- uhd::convert::id_type id;
- id.input_format = args.cpu_format;
- id.num_inputs = 1;
- id.output_format = args.otw_format + "_item32_" + conv_endianness;
- id.num_outputs = 1;
- my_streamer->set_converter(id);
+ UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ;
- //flow control setup
- const size_t pkt_size = spp * bpi + stream_options.tx_max_len_hdr;
- // For flow control, this value is used to determine the window size in *packets*
- size_t fc_window = get_tx_flow_control_window(
- pkt_size, // This is the maximum packet size
- blk_ctrl->get_fifo_size(block_port),
- tx_hints // This can override the value reported by the block!
- );
+ // Configure flow control
+ // This disables the FC module's output, do this before configuring flow control
+ blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port);
+ blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port);
+ // Configure flow control on downstream block
+ const size_t fc_window = tx_hints.cast<size_t>("send_buff_size", blk_ctrl->get_fifo_size(block_port));
const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.tx_fc_response_freq);
UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ;
blk_ctrl->configure_flow_control_in(
- stream_options.tx_fc_response_cycles,
- fc_handle_window, /*pkts*/
+ fc_handle_window, /*bytes*/
block_port
);
-
- boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t());
- async_tx_info->stream_channel = args.channels[stream_i];
- async_tx_info->device_channel = mb_index;
- async_tx_info->async_queue = async_md;
- async_tx_info->old_async_queue = _async_md;
-
- boost::function<double(void)> tick_rate_retriever = boost::bind(
- &rfnoc::tick_node_ctrl::get_tick_rate,
- send_terminator,
- std::set< rfnoc::node_ctrl_base::sptr >() // Need to specify default args with bind
- );
-
- my_streamer->_tx_async_msg_task = task::make(
- boost::bind(
- &handle_tx_async_msgs,
- async_tx_info,
- my_streamer->_async_xport.recv,
- xport.endianness,
- tick_rate_retriever
- ),
- "tx_async_msgs_task"
+ // Add flow control transport
+ boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window));
+ xport.send = zero_copy_flow_ctrl::make(
+ xport.send,
+ [=](managed_buffer::sptr buff) {
+ return tx_flow_ctrl(
+ fc_cache,
+ xport.recv,
+ (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>),
+ (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le),
+ buff);
+ },
+ NULL
);
- blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port);
- blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port);
- blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port);
+ // Configure return path for async messages
+ blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port);
UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ;
// FIXME: Once there is a better way to map the radio block and port
@@ -863,7 +936,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size();
for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) {
if (node->get_block_id() == radio_id) {
- node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), radio_port);
+ node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port);
}
}
} else {
@@ -876,39 +949,96 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>();
UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size();
for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) {
- node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port);
+ node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port);
}
}
- // Add flow control
- boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window));
- my_streamer->_xport.send = zero_copy_flow_ctrl::make(
- my_streamer->_xport.send,
- boost::bind(
- &tx_flow_ctrl,
- fc_cache,
- my_streamer->_xport.recv,
- (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>),
- (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le),
- _1),
- NULL);
+ // Second, configure the streamer now that the blocks and transports are configured
+
+ //make the new streamer given the samples per packet
+ if (not my_streamer)
+ {
+ // To calculate the max number of samples per packet, we assume the maximum header length
+ // to avoid fragmentation should the entire header be used.
+ const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr;
+ const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item
+ const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet
+ UHD_TX_STREAMER_LOG() << "spp == " << spp ;
+
+ my_streamer = boost::make_shared<device3_send_packet_streamer>(
+ spp,
+ send_terminator,
+ xport,
+ async_xport);
+ my_streamer->resize(chan_list.size());
+ }
+
+ //init some streamer stuff
+ std::string conv_endianness;
+ if (xport.endianness == ENDIANNESS_BIG) {
+ my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be);
+ conv_endianness = "be";
+ } else {
+ my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le);
+ conv_endianness = "le";
+ }
+
+ //set the converter
+ uhd::convert::id_type id;
+ id.input_format = args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = args.otw_format + "_item32_" + conv_endianness;
+ id.num_outputs = 1;
+ my_streamer->set_converter(id);
+
+ boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t());
+ async_tx_info->stream_channel = args.channels[stream_i];
+ async_tx_info->device_channel = mb_index;
+ async_tx_info->async_queue = async_md;
+ async_tx_info->old_async_queue = _async_md;
+
+ task::sptr async_task = task::make(
+ [=]() {
+ handle_tx_async_msgs(
+ async_tx_info,
+ async_xport.recv,
+ xport.endianness,
+ [=]() {return send_terminator->get_tick_rate();}
+ );
+ }
+ );
+ my_streamer->add_async_msg_task(async_task);
//Give the streamer a functor to get the send buffer
my_streamer->set_xport_chan_get_buff(
stream_i,
- boost::bind(&zero_copy_if::get_send_buff, my_streamer->_xport.send, _1)
+ [=](const double timeout) {
+ return xport.send->get_send_buff(timeout);
+ }
);
//Give the streamer a functor handled received async messages
my_streamer->set_async_receiver(
- boost::bind(&async_md_type::pop_with_timed_wait, async_md, _1, _2)
+ [=](uhd::async_metadata_t& md, const double timeout) {
+ return async_md->pop_with_timed_wait(md, timeout);
+ }
);
my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid);
// CHDR does not support trailers
my_streamer->set_enable_trailer(false);
- }
- // Connect the terminator to the streamer
- my_streamer->set_terminator(send_terminator);
+ my_streamer->set_xport_chan_post_send_cb(
+ stream_i,
+ [=]() {
+ tx_flow_ctrl_ack(
+ fc_cache,
+ xport.send,
+ xport.send_sid,
+ (xport.endianness == ENDIANNESS_BIG ? uhd::htonx<uint32_t> : uhd::htowx<uint32_t>),
+ (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_pack_be : vrt::chdr::if_hdr_pack_le)
+ );
+ }
+ );
+ }
// Notify all blocks in this chain that they are connected to an active streamer
send_terminator->set_tx_streamer(true, 0);
@@ -916,7 +1046,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
// Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency.
// Note that we store the streamer only once, and use its terminator's
// ID to do so.
- _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<sph::send_packet_streamer>(my_streamer);
+ _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<uhd::tx_streamer>(my_streamer);
// Sets tick rate, samp rate and scaling on this streamer
// A registered terminator is required to do this.
diff --git a/host/lib/usrp/x300/x300_impl.cpp b/host/lib/usrp/x300/x300_impl.cpp
index 0c8d78834..351cb4e10 100644
--- a/host/lib/usrp/x300/x300_impl.cpp
+++ b/host/lib/usrp/x300/x300_impl.cpp
@@ -1231,8 +1231,8 @@ uhd::both_xports_t x300_impl::make_transport(
? X300_PCIE_RX_DATA_FRAME_SIZE
: X300_PCIE_MSG_FRAME_SIZE;
- default_buff_args.num_send_frames =
- (xport_type == TX_DATA)
+ default_buff_args.num_send_frames =
+ (xport_type == TX_DATA)
? X300_PCIE_TX_DATA_NUM_FRAMES
: X300_PCIE_MSG_NUM_FRAMES;
diff --git a/host/lib/usrp/x300/x300_io_impl.cpp b/host/lib/usrp/x300/x300_io_impl.cpp
index 082d83185..af5aa7c9e 100644
--- a/host/lib/usrp/x300/x300_io_impl.cpp
+++ b/host/lib/usrp/x300/x300_io_impl.cpp
@@ -7,19 +7,9 @@
#include "x300_regs.hpp"
#include "x300_impl.hpp"
-#include "../../transport/super_recv_packet_handler.hpp"
-#include "../../transport/super_send_packet_handler.hpp"
-#include <uhdlib/usrp/common/async_packet_handler.hpp>
-#include <uhd/transport/nirio_zero_copy.hpp>
-#include <uhd/transport/bounded_buffer.hpp>
-#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/log.hpp>
-#include <boost/bind.hpp>
-#include <boost/make_shared.hpp>
using namespace uhd;
using namespace uhd::usrp;
-using namespace uhd::transport;
/***********************************************************************
* Hooks for get_tx_stream() and get_rx_stream()
@@ -62,8 +52,8 @@ void x300_impl::post_streamer_hooks(direction_t dir)
// Loop through all tx streamers. Find all radios connected to one
// streamer. Sync those.
for(const boost::weak_ptr<uhd::tx_streamer> &streamer_w: _tx_streamers.vals()) {
- const boost::shared_ptr<sph::send_packet_streamer> streamer =
- boost::dynamic_pointer_cast<sph::send_packet_streamer>(streamer_w.lock());
+ const boost::shared_ptr<device3_send_packet_streamer> streamer =
+ boost::dynamic_pointer_cast<device3_send_packet_streamer>(streamer_w.lock());
if (not streamer) {
continue;
}