diff options
author | Martin Braun <martin.braun@ettus.com> | 2016-11-14 14:30:34 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2018-07-25 15:34:03 -0700 |
commit | 988515ab19a715773086a7a8c023ddb8249c7e37 (patch) | |
tree | 71c861c3a1d0a5e295dad5939358dd30e0a33f3b | |
parent | 8b16ab706fb4768f802ddb65a81fc26e1562cb0d (diff) | |
download | uhd-988515ab19a715773086a7a8c023ddb8249c7e37.tar.gz uhd-988515ab19a715773086a7a8c023ddb8249c7e37.tar.bz2 uhd-988515ab19a715773086a7a8c023ddb8249c7e37.zip |
Device3: Change packet-based flow control to byte-based flow control
24 files changed, 706 insertions, 468 deletions
diff --git a/fpga-src b/fpga-src -Subproject f27926410328883a315d5230146936d2d782bd0 +Subproject c398755579f01a1d020e742fb7d129950939a5d diff --git a/host/include/uhd/rfnoc/constants.hpp b/host/include/uhd/rfnoc/constants.hpp index 6df4c535f..bda4c6440 100644 --- a/host/include/uhd/rfnoc/constants.hpp +++ b/host/include/uhd/rfnoc/constants.hpp @@ -25,7 +25,7 @@ static const std::string XML_PATH_ENV = "UHD_RFNOC_DIR"; //! If the block name can't be automatically detected, this name is used static const std::string DEFAULT_BLOCK_NAME = "Block"; static const uint64_t DEFAULT_NOC_ID = 0xFFFFFFFFFFFFFFFF; -static const size_t NOC_SHELL_COMPAT_MAJOR = 3; +static const size_t NOC_SHELL_COMPAT_MAJOR = 4; static const size_t NOC_SHELL_COMPAT_MINOR = 0; static const size_t MAX_PACKET_SIZE = 8000; // bytes @@ -35,7 +35,7 @@ static const size_t DEFAULT_PACKET_SIZE = 1456; // bytes static const size_t BYTES_PER_LINE = 8; //! For flow control within a single crossbar -static const size_t DEFAULT_FC_XBAR_PKTS_PER_ACK = 2; +static const size_t DEFAULT_FC_XBAR_RESPONSE_FREQ = 8; //! For flow control when data is flowing from device to host (rx) static const size_t DEFAULT_FC_RX_RESPONSE_FREQ = 64; // ACKs per flow control window //! For flow control when data is flowing from host to device (tx) @@ -48,15 +48,15 @@ static const size_t DEFAULT_FC_TX_RESPONSE_FREQ = 8; // ACKs per flow control wi static const double DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR = 0.80; // Common settings registers. -static const uint32_t SR_FLOW_CTRL_CYCS_PER_ACK = 0; -static const uint32_t SR_FLOW_CTRL_PKTS_PER_ACK = 1; +static const uint32_t SR_FLOW_CTRL_BYTES_PER_ACK = 1; static const uint32_t SR_FLOW_CTRL_WINDOW_SIZE = 2; -static const uint32_t SR_FLOW_CTRL_WINDOW_EN = 3; +static const uint32_t SR_FLOW_CTRL_EN = 3; static const uint32_t SR_ERROR_POLICY = 4; static const uint32_t SR_BLOCK_SID = 5; // TODO rename to SRC_SID static const uint32_t SR_NEXT_DST_SID = 6; static const uint32_t SR_RESP_IN_DST_SID = 7; static const uint32_t SR_RESP_OUT_DST_SID = 8; +static const uint32_t SR_FLOW_CTRL_PKT_LIMIT = 9; static const uint32_t SR_READBACK_ADDR = 124; static const uint32_t SR_READBACK = 127; diff --git a/host/include/uhd/rfnoc/graph.hpp b/host/include/uhd/rfnoc/graph.hpp index f92e6b528..d7b7cc43e 100644 --- a/host/include/uhd/rfnoc/graph.hpp +++ b/host/include/uhd/rfnoc/graph.hpp @@ -67,12 +67,12 @@ public: * * \param sink_block Sink block ID * \param dst_block_port Destination (sink) block port - * \param pkts_per_ack Flow controlf frequency in packets + * \param bytes_per_ack Flow control frequency in bytes */ virtual void connect_sink( const block_id_t &sink_block, const size_t dst_block_port, - const size_t pkts_per_ack + const size_t bytes_per_ack ) = 0; virtual std::string get_name() const = 0; diff --git a/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp b/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp index ec3f28e32..f90361cf1 100644 --- a/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp +++ b/host/include/uhd/rfnoc/sink_block_ctrl_base.hpp @@ -70,20 +70,17 @@ public: * send out ACKs, telling the upstream block which packets have been consumed, * so the upstream block can increase his flow control credit. * - * In the default implementation, this just sets registers - * SR_FLOW_CTRL_CYCS_PER_ACK and SR_FLOW_CTRL_PKTS_PER_ACK accordingly. + * In the default implementation, this just sets register SR_FLOW_CTRL_PKTS_PER_ACK + * accordingly. * * Override this function if your block has port-specific flow control settings. * - * \param cycles Send an ACK after this many clock cycles. - * Setting this to zero disables this type of flow control acknowledgement. - * \param packets Send an ACK after this many packets have been consumed. - * Setting this to zero disables this type of flow control acknowledgement. + * \param bytes Send an ACK after this many bytes have been consumed. + * Setting this to zero disables flow control acknowledgement. * \param block_port Set up flow control for a stream coming in on this particular block port. */ virtual void configure_flow_control_in( - size_t cycles, - size_t packets, + size_t bytes, size_t block_port=0 ); diff --git a/host/include/uhd/rfnoc/source_block_ctrl_base.hpp b/host/include/uhd/rfnoc/source_block_ctrl_base.hpp index 7d0a65107..39e7411f3 100644 --- a/host/include/uhd/rfnoc/source_block_ctrl_base.hpp +++ b/host/include/uhd/rfnoc/source_block_ctrl_base.hpp @@ -97,15 +97,23 @@ public: * * Override this function if your block has port-specific flow control settings. * - * \param buf_size_pkts The size of the downstream block's input FIFO size in number of packets. Setting - * this to zero disables flow control. The block will then produce data as fast as it can. - * \b Warning: This can cause head-of-line blocking, and potentially lock up your device! + * \param enable_output Enable flow control module's output. If disabled, no packets will be output + * regardless of flow control state. + * \param buf_size_bytes The size of the downstream block's input FIFO size in number of bytes. Setting + * this to zero disables byte based flow control. If both byte based flow control and + * the packet limit are set to zero, the block will then produce data as fast as it can. + * \b Warning: This can cause head-of-line blocking, and potentially lock up your device! + * \param pkt_limit Limit the maximum number of packets in flight. Setting this to zero disables packet limiting. + * Usually kept disabled except for special case connections (such as DMA) that support only + * a finite number of packets in flight. * \param block_port Specify on which outgoing port this setting is valid. * \param sid The SID for which this is valid. This is meant for cases where the outgoing block port is * not sufficient to set the flow control, and as such is rarely used. */ virtual void configure_flow_control_out( - size_t buf_size_pkts, + bool enable_output, + size_t buf_size_bytes, + size_t pkt_limit=0, size_t block_port=0, const uhd::sid_t &sid=uhd::sid_t() ); diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt index 3ce06b5b1..785e2d53d 100644 --- a/host/include/uhd/transport/CMakeLists.txt +++ b/host/include/uhd/transport/CMakeLists.txt @@ -20,6 +20,7 @@ UHD_INSTALL(FILES usb_device_handle.hpp vrt_if_packet.hpp zero_copy.hpp + zero_copy_flow_ctrl.hpp DESTINATION ${INCLUDE_DIR}/uhd/transport COMPONENT headers ) diff --git a/host/include/uhd/transport/vrt_if_packet.hpp b/host/include/uhd/transport/vrt_if_packet.hpp index 579ac77d2..07792f13f 100644 --- a/host/include/uhd/transport/vrt_if_packet.hpp +++ b/host/include/uhd/transport/vrt_if_packet.hpp @@ -68,6 +68,8 @@ namespace vrt{ bool sob, eob; //! This is asserted for command responses that are errors (CHDR only) bool error; + //! This is asserted for flow control packets are ACKS (CHDR only) + bool fc_ack; //optional fields //! Stream ID (SID). See uhd::sid_t @@ -185,7 +187,7 @@ namespace vrt{ num_packet_words32(0), packet_count(0), sob(false), eob(false), - error(false), + error(false), fc_ack(false), has_sid(false), sid(0), has_cid(false), cid(0), has_tsi(false), tsi(0), diff --git a/host/lib/include/uhdlib/rfnoc/graph_impl.hpp b/host/lib/include/uhdlib/rfnoc/graph_impl.hpp index 182befbf4..404369618 100644 --- a/host/lib/include/uhdlib/rfnoc/graph_impl.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph_impl.hpp @@ -55,7 +55,7 @@ public: void connect_sink( const block_id_t &sink_block, const size_t dst_block_port, - const size_t pkts_per_ack + const size_t bytes_per_ack ); /************************************************************************ diff --git a/host/lib/rfnoc/ctrl_iface.cpp b/host/lib/rfnoc/ctrl_iface.cpp index 11dfa7aaa..29e18fc3a 100644 --- a/host/lib/rfnoc/ctrl_iface.cpp +++ b/host/lib/rfnoc/ctrl_iface.cpp @@ -108,6 +108,7 @@ private: packet_info.tsf = timestamp; packet_info.sob = false; packet_info.eob = false; + packet_info.fc_ack = false; packet_info.sid = _xports.send_sid; packet_info.has_sid = true; packet_info.has_cid = false; diff --git a/host/lib/rfnoc/graph_impl.cpp b/host/lib/rfnoc/graph_impl.cpp index c361ea8f2..a2e0e64f4 100644 --- a/host/lib/rfnoc/graph_impl.cpp +++ b/host/lib/rfnoc/graph_impl.cpp @@ -122,9 +122,9 @@ void graph_impl::connect( UHD_LOGGER_WARNING("RFNOC") << "Assuming max packet size for " << src->get_block_id() ; pkt_size = uhd::rfnoc::MAX_PACKET_SIZE; } - // FC window (in packets) depends on FIFO size... ...and packet size. - size_t buf_size_pkts = dst->get_fifo_size(dst_block_port) / pkt_size; - if (buf_size_pkts == 0) { + // FC window (in bytes) depends on FIFO size. + size_t buf_size_bytes = dst->get_fifo_size(dst_block_port); + if (buf_size_bytes < pkt_size) { throw uhd::runtime_error(str( boost::format("Input FIFO for block %s is too small (%d kiB) for packets of size %d kiB\n" "coming from block %s.") @@ -132,19 +132,20 @@ void graph_impl::connect( % (pkt_size / 1024) % src->get_block_id().get() )); } - src->configure_flow_control_out(buf_size_pkts, src_block_port); - // On the same crossbar, use lots of FC packets - size_t pkts_per_ack = std::min( - uhd::rfnoc::DEFAULT_FC_XBAR_PKTS_PER_ACK, - buf_size_pkts - 1 + src->configure_flow_control_out( + true, /* enable output */ + buf_size_bytes, + 0, /* no packet limit. We need to revisit this at some point. */ + src_block_port ); + // On the same crossbar, use lots of FC packets + size_t bytes_per_response = std::ceil<size_t>(buf_size_bytes / uhd::rfnoc::DEFAULT_FC_XBAR_RESPONSE_FREQ); // Over the network, use less or we'd flood the transport if (sid.get_src_addr() != sid.get_dst_addr()) { - pkts_per_ack = std::max<size_t>(buf_size_pkts / uhd::rfnoc::DEFAULT_FC_TX_RESPONSE_FREQ, 1); + bytes_per_response = std::ceil<size_t>(buf_size_bytes / uhd::rfnoc::DEFAULT_FC_TX_RESPONSE_FREQ); } dst->configure_flow_control_in( - 0, // Default to not use cycles - pkts_per_ack, + bytes_per_response, dst_block_port ); @@ -209,7 +210,7 @@ void graph_impl::connect_src( void graph_impl::connect_sink( const block_id_t &sink_block, const size_t dst_block_port, - const size_t pkts_per_ack + const size_t bytes_per_ack ) { device3::sptr device_ptr = _device_ptr.lock(); if (not device_ptr) { @@ -222,11 +223,7 @@ void graph_impl::connect_sink( uhd::rfnoc::sink_block_ctrl_base::sptr dst = device_ptr->get_block_ctrl<rfnoc::sink_block_ctrl_base>(sink_block); - dst->configure_flow_control_in( - 0, - pkts_per_ack, - dst_block_port - ); + dst->configure_flow_control_in(bytes_per_ack, dst_block_port); /******************************************************************** * 5. Configure error policy diff --git a/host/lib/rfnoc/legacy_compat.cpp b/host/lib/rfnoc/legacy_compat.cpp index 7e9eec20e..c5bd7891a 100644 --- a/host/lib/rfnoc/legacy_compat.cpp +++ b/host/lib/rfnoc/legacy_compat.cpp @@ -158,10 +158,10 @@ public: UHD_LOGGER_WARNING("RFNOC") << "[legacy_compat] No DUCs detected. You will only be able to transmit at the radio frontend rate." ; } if (args.has_key("skip_dram")) { - UHD_LEGACY_LOG() << "[legacy_compat] Skipping DRAM by user request." << std::endl; + UHD_LEGACY_LOG() << "[legacy_compat] Skipping DRAM by user request." ; } if (args.has_key("skip_sram")) { - UHD_LEGACY_LOG() << "[legacy_compat] Skipping SRAM by user request." << std::endl; + UHD_LEGACY_LOG() << "[legacy_compat] Skipping SRAM by user request."; } if (not _has_dmafifo and not _has_sramfifo) { UHD_LOGGER_WARNING("RFNOC") << "[legacy_compat] No FIFO detected. Higher transmit rates may encounter errors."; diff --git a/host/lib/rfnoc/sink_block_ctrl_base.cpp b/host/lib/rfnoc/sink_block_ctrl_base.cpp index b620f917f..1562e134b 100644 --- a/host/lib/rfnoc/sink_block_ctrl_base.cpp +++ b/host/lib/rfnoc/sink_block_ctrl_base.cpp @@ -52,22 +52,17 @@ size_t sink_block_ctrl_base::get_fifo_size(size_t block_port) const { } void sink_block_ctrl_base::configure_flow_control_in( - size_t cycles, - size_t packets, + size_t bytes, size_t block_port ) { - UHD_RFNOC_BLOCK_TRACE() << boost::format("sink_block_ctrl_base::configure_flow_control_in(cycles=%d, packets=%d)") % cycles % packets ; - uint32_t cycles_word = 0; - if (cycles) { - cycles_word = (1<<31) | cycles; - } - sr_write(SR_FLOW_CTRL_CYCS_PER_ACK, cycles_word, block_port); + UHD_RFNOC_BLOCK_TRACE() << boost::format("sink_block_ctrl_base::configure_flow_control_in(bytes=%d)") % bytes; - uint32_t packets_word = 0; - if (packets) { - packets_word = (1<<31) | packets; + uint32_t bytes_word = 0; + if (bytes) { + // Bit 32 enables flow control + bytes_word = (1<<31) | bytes; } - sr_write(SR_FLOW_CTRL_PKTS_PER_ACK, packets_word, block_port); + sr_write(SR_FLOW_CTRL_BYTES_PER_ACK, bytes_word, block_port); } void sink_block_ctrl_base::set_error_policy( diff --git a/host/lib/rfnoc/source_block_ctrl_base.cpp b/host/lib/rfnoc/source_block_ctrl_base.cpp index 5ede899cb..afec6ba1b 100644 --- a/host/lib/rfnoc/source_block_ctrl_base.cpp +++ b/host/lib/rfnoc/source_block_ctrl_base.cpp @@ -77,25 +77,27 @@ void source_block_ctrl_base::set_destination( } void source_block_ctrl_base::configure_flow_control_out( - size_t buf_size_pkts, + bool enable_fc_output, + size_t buf_size_bytes, + size_t pkt_limit, size_t block_port, UHD_UNUSED(const uhd::sid_t &sid) ) { - UHD_RFNOC_BLOCK_TRACE() << "source_block_ctrl_base::configure_flow_control_out() buf_size_pkts==" << buf_size_pkts ; - if (buf_size_pkts < 2) { + UHD_RFNOC_BLOCK_TRACE() << "source_block_ctrl_base::configure_flow_control_out() buf_size_bytes==" << buf_size_bytes; + if (buf_size_bytes == 0) { throw uhd::runtime_error(str( - boost::format("Invalid window size %d for block %s. Window size must at least be 2.") - % buf_size_pkts % unique_id() + boost::format("Invalid window size %d for block %s. Window size cannot be 0 bytes.") + % buf_size_bytes % unique_id() )); } - //Disable the window and let all upstream data flush out + //Disable flow control entirely and let all upstream data flush out //We need to do this every time the window is changed because //a) We don't know what state the flow-control module was left in // in the previous run (it should still be enabled) //b) Changing the window size where data is buffered upstream may // result in stale packets entering the stream. - sr_write(SR_FLOW_CTRL_WINDOW_EN, 0, block_port); + sr_write(SR_FLOW_CTRL_EN, 0, block_port); //Wait for data to flush out. //In the FPGA we are guaranteed that all buffered packets are more-or-less consecutive. @@ -107,13 +109,24 @@ void source_block_ctrl_base::configure_flow_control_out( // module is done flushing. std::this_thread::sleep_for(std::chrono::milliseconds(1)); + //Enable source flow control module and conditionally enable byte based and/or packet count + //based flow control + const bool enable_byte_fc = (buf_size_bytes != 0); + const bool enable_pkt_cnt_fc = (pkt_limit != 0); + const size_t config = enable_fc_output + (enable_byte_fc << 1) + (enable_pkt_cnt_fc << 2); + //Resize the FC window. //Precondition: No data can be buffered upstream. - sr_write(SR_FLOW_CTRL_WINDOW_SIZE, buf_size_pkts, block_port); + if (enable_byte_fc) { + sr_write(SR_FLOW_CTRL_WINDOW_SIZE, buf_size_bytes, block_port); + } + if (enable_pkt_cnt_fc) { + sr_write(SR_FLOW_CTRL_PKT_LIMIT, pkt_limit, block_port); + } //Enable the FC window. - //Precondition: The window size must be set. - sr_write(SR_FLOW_CTRL_WINDOW_EN, (buf_size_pkts != 0), block_port); + //Precondition: The window size and/or packet limit must be set. + sr_write(SR_FLOW_CTRL_EN, config, block_port); } /*********************************************************************** diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index ae903d956..15771697a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -112,6 +112,7 @@ LIBUHD_PYTHON_GEN_SOURCE( ) LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp diff --git a/host/lib/transport/chdr.cpp b/host/lib/transport/chdr.cpp index 48263f57e..36f380d62 100644 --- a/host/lib/transport/chdr.cpp +++ b/host/lib/transport/chdr.cpp @@ -23,6 +23,7 @@ using namespace uhd::transport::vrt; static const uint32_t HDR_FLAG_TSF = (1 << 29); static const uint32_t HDR_FLAG_EOB = (1 << 28); static const uint32_t HDR_FLAG_ERROR = (1 << 28); +static const uint32_t HDR_FLAG_FCACK = (1 << 28); /***************************************************************************/ /* Packing */ @@ -45,8 +46,8 @@ UHD_INLINE uint32_t _hdr_pack_chdr( | (if_packet_info.packet_type << 30) // 1 Bit: Has time | (if_packet_info.has_tsf ? HDR_FLAG_TSF : 0) - // 1 Bit: EOB or Error - | ((if_packet_info.eob or if_packet_info.error) ? HDR_FLAG_EOB : 0) + // 1 Bit: EOB or Error or FC ACK + | ((if_packet_info.eob or if_packet_info.error or if_packet_info.fc_ack) ? HDR_FLAG_EOB : 0) // 12 Bits: Sequence number | ((if_packet_info.packet_count & 0xFFF) << 16) // 16 Bits: Total packet length @@ -111,6 +112,8 @@ UHD_INLINE void _hdr_unpack_chdr( && ((chdr & HDR_FLAG_EOB) > 0); if_packet_info.error = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_RESP) && ((chdr & HDR_FLAG_ERROR) > 0); + if_packet_info.fc_ack = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_FC) + && ((chdr & HDR_FLAG_FCACK) > 0); if_packet_info.packet_count = (chdr >> 16) & 0xFFF; // Set packet length variables diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 921bfdfa0..c962d40e6 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -12,9 +12,9 @@ #include <uhd/exception.hpp> #include <uhd/convert.hpp> #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> @@ -22,7 +22,6 @@ #include <boost/dynamic_bitset.hpp> #include <boost/function.hpp> #include <boost/format.hpp> -#include <boost/bind.hpp> #include <boost/make_shared.hpp> #include <iostream> #include <vector> @@ -59,6 +58,7 @@ class recv_packet_handler{ public: typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; typedef boost::function<void(const size_t)> handle_flowctrl_type; + typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type; typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type; typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; @@ -102,33 +102,6 @@ public: _header_offset_words32 = header_offset_words32; } - ////////////////// RFNOC /////////////////////////// - //! Set the stream ID for a specific channel (or no SID) - void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){ - _props.at(xport_chan).has_sid = has_sid; - _props.at(xport_chan).sid = sid; - } - - //! Get the stream ID for a specific channel (or zero if no SID) - uint32_t get_xport_chan_sid(const size_t xport_chan) const { - if (_props.at(xport_chan).has_sid) { - return _props.at(xport_chan).sid; - } else { - return 0; - } - } - - void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator) - { - _terminator = terminator; - } - - uhd::rfnoc::rx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - ////////////////// RFNOC /////////////////////////// - /*! * Set the threshold for alignment failure. * How many packets throw out before giving up? @@ -183,6 +156,13 @@ public: if (do_init) handle_flowctrl(0); } + void set_xport_handle_flowctrl_ack( + const size_t xport_chan, + const handle_flowctrl_ack_type &handle_flowctrl_ack + ) { + _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack; + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_outputs = id.num_outputs; @@ -211,12 +191,11 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { - // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this. - //if (stream_cmd.stream_now - //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS - //and _props.size() > 1) { - //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); - //} + if (size() > 1 and stream_cmd.stream_now and + stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) + { + throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align."); + } for (size_t i = 0; i < _props.size(); i++) { @@ -307,11 +286,8 @@ private: size_t packet_count; handle_overflow_type handle_overflow; handle_flowctrl_type handle_flowctrl; + handle_flowctrl_ack_type handle_flowctrl_ack; size_t fc_update_window; - /////// RFNOC /////////// - bool has_sid; - uint32_t sid; - /////// RFNOC /////////// }; std::vector<xport_chan_props_type> _props; size_t _num_outputs; @@ -340,6 +316,7 @@ private: buffers_info_type(const size_t size): std::vector<per_buffer_info_type>(size), indexes_todo(size, true), + alignment_time(0), alignment_time_valid(false), data_bytes_to_copy(0), fragment_offset_in_samps(0) @@ -384,8 +361,6 @@ private: int recvd_packets; #endif - uhd::rfnoc::rx_stream_terminator::sptr _terminator; - /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -398,42 +373,58 @@ private: per_buffer_info_type &curr_buffer_info, double timeout ){ - //get a single packet from the transport layer managed_recv_buffer::sptr &buff = curr_buffer_info.buff; - buff = _props[index].get_buff(timeout); - if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - - #ifdef ERROR_INJECT_DROPPED_PACKETS - if (++recvd_packets > 1000) + per_buffer_info_type &info = curr_buffer_info; + while (1) { - recvd_packets = 0; - buff.reset(); + //get a single packet from the transport layer buff = _props[index].get_buff(timeout); if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - } - #endif - //bounds check before extract - size_t num_packet_words32 = buff->size()/sizeof(uint32_t); - if (num_packet_words32 <= _header_offset_words32){ - throw std::runtime_error("recv buffer smaller than vrt packet offset"); - } + #ifdef ERROR_INJECT_DROPPED_PACKETS + if (++recvd_packets > 1000) + { + recvd_packets = 0; + buff.reset(); + buff = _props[index].get_buff(timeout); + if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; + } + #endif - //extract packet info - per_buffer_info_type &info = curr_buffer_info; - info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; - info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; - _vrt_unpacker(info.vrt_hdr, info.ifpi); - info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true - info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); - - //handle flow control - if (_props[index].handle_flowctrl) - { - if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true + info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + + //handle flow control + if (_props[index].handle_flowctrl) { - _props[index].handle_flowctrl(info.ifpi.packet_count); + if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + { + _props[index].handle_flowctrl(info.ifpi.packet_count); + } } + + //handle flow control ack + if (info.ifpi.fc_ack){ + if (_props[index].handle_flowctrl_ack) { + _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff)); + } + // Process the next packet + buff.reset(); + info.copy_buff = nullptr; + continue; + } + + break; } //-------------------------------------------------------------- @@ -605,7 +596,7 @@ private: rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); curr_info.metadata = metadata; - UHD_LOG_FASTPATH("O") + UHD_LOG_FASTPATH("O"); } curr_info[index].buff.reset(); curr_info[index].copy_buff = nullptr; @@ -627,7 +618,7 @@ private: prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate); curr_info.metadata.out_of_sequence = true; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; - UHD_LOG_FASTPATH("D") + UHD_LOG_FASTPATH("D"); return; } @@ -635,10 +626,10 @@ private: //too many iterations: detect alignment failure if (iterations++ > _alignment_failure_threshold){ UHD_LOGGER_ERROR("STREAMER") << boost::format( - "The receive packet handler failed to time-align packets. " - "%u received packets were processed by the handler. " - "However, a timestamp match could not be determined." - ) % iterations; + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; _props[index].handle_overflow(); @@ -659,7 +650,7 @@ private: } /******************************************************************* - * Receive a single packet: + * Receive a single packet on all channels * Handles fragmentation, messages, errors, and copy-conversion. * When no fragments are available, call the get aligned buffers. * Then copy-convert available data into the user's IO buffers. diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index e824ef4e9..5cba570a7 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -12,7 +12,6 @@ #include <uhd/exception.hpp> #include <uhd/convert.hpp> #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/thread.hpp> @@ -49,6 +48,7 @@ namespace sph { class send_packet_handler{ public: typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; + typedef boost::function<void(void)> post_send_cb_type; typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type; typedef void(*vrt_packer_type)(uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; @@ -93,27 +93,6 @@ public: _props.at(xport_chan).sid = sid; } - ///////// RFNOC /////////////////// - //! Get the stream ID for a specific channel (or zero if no SID) - uint32_t get_xport_chan_sid(const size_t xport_chan) const { - if (_props.at(xport_chan).has_sid) { - return _props.at(xport_chan).sid; - } else { - return 0; - } - } - - void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator) - { - _terminator = terminator; - } - - uhd::rfnoc::tx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - ///////// RFNOC /////////////////// - void set_enable_trailer(const bool enable) { _has_tlr = enable; @@ -138,6 +117,15 @@ public: _props.at(xport_chan).get_buff = get_buff; } + /*! + * Set the callback function for post-send. + * \param xport_chan which transport channel + * \param cb post-send callback + */ + void set_xport_chan_post_send_cb(const size_t xport_chan, const post_send_cb_type &cb){ + _props.at(xport_chan).go_postal = cb; + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_inputs = id.num_inputs; @@ -198,6 +186,7 @@ public: if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate); if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; + if_packet_info.fc_ack = false; //This is a data packet /* * Metadata is cached when we get a send requesting a start of burst with no samples. @@ -291,6 +280,7 @@ private: struct xport_chan_props_type{ xport_chan_props_type(void):has_sid(false),sid(0){} get_buff_type get_buff; + post_send_cb_type go_postal; bool has_sid; uint32_t sid; managed_send_buffer::sptr buff; @@ -308,8 +298,6 @@ private: bool _cached_metadata; uhd::tx_metadata_t _metadata_cache; - uhd::rfnoc::tx_stream_terminator::sptr _terminator; - #ifdef UHD_TXRX_DEBUG_PRINTS struct dbg_send_stat_t { dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate): @@ -428,6 +416,11 @@ private: const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; buff->commit(num_vita_words32*sizeof(uint32_t)); buff.reset(); //effectively a release + + if (_props[index].go_postal) + { + _props[index].go_postal(); + } } //! Shared variables for the worker threads diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp index 709b9e981..25be35569 100644 --- a/host/lib/transport/zero_copy_flow_ctrl.cpp +++ b/host/lib/transport/zero_copy_flow_ctrl.cpp @@ -65,7 +65,7 @@ public: zero_copy_flow_ctrl_mrb( flow_ctrl_func flow_ctrl ) : - _mb(nullptr), + _mb(NULL), _flow_ctrl(flow_ctrl) { /* NOP */ @@ -80,8 +80,6 @@ public: { if (_mb) { - _mb->commit(size()); - while (_flow_ctrl and not _flow_ctrl(_mb)) {} _mb.reset(); } } @@ -89,6 +87,7 @@ public: UHD_INLINE sptr get(sptr &mb) { _mb = mb; + while (_flow_ctrl and not _flow_ctrl(_mb)) {} return make(this, _mb->cast<void *>(), _mb->size()); } diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index 8ecb1f72b..580268e4a 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -21,6 +21,10 @@ #include <uhd/types/direction.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/device3.hpp> +#include "../../transport/super_send_packet_handler.hpp" +#include "../../transport/super_recv_packet_handler.hpp" +#include <uhdlib/rfnoc/tx_stream_terminator.hpp> +#include <uhdlib/rfnoc/rx_stream_terminator.hpp> #include <uhdlib/rfnoc/xports.hpp> namespace uhd { namespace usrp { @@ -30,11 +34,83 @@ namespace uhd { namespace usrp { **********************************************************************/ static const size_t DEVICE3_RX_FC_REQUEST_FREQ = 32; //per flow-control window static const size_t DEVICE3_TX_FC_RESPONSE_FREQ = 8; -static const size_t DEVICE3_TX_FC_RESPONSE_CYCLES = 0; // Cycles: Off. +static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2; +static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET = 0; +static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET = 1; +static const size_t DEVICE3_LINE_SIZE = 8; static const size_t DEVICE3_TX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes static const size_t DEVICE3_RX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes +// This class manages the lifetime of the TX async message handler task, transports, and terminator +class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer +{ +public: + device3_send_packet_streamer( + const size_t max_num_samps, + const uhd::rfnoc::tx_stream_terminator::sptr terminator, + const both_xports_t data_xport, + const both_xports_t async_msg_xport + ) : + uhd::transport::sph::send_packet_streamer(max_num_samps), + _terminator(terminator), + _data_xport(data_xport), + _async_msg_xport(async_msg_xport) + {}; + + ~device3_send_packet_streamer() + { + // Make sure the async task is destroyed before the transports + _tx_async_msg_tasks.clear(); + }; + + uhd::rfnoc::tx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + + void add_async_msg_task(task::sptr task) + { + _tx_async_msg_tasks.push_back(task); + } + +private: + uhd::rfnoc::tx_stream_terminator::sptr _terminator; + both_xports_t _data_xport; + both_xports_t _async_msg_xport; + std::vector<task::sptr> _tx_async_msg_tasks; +}; + +// This class manages the lifetime of the RX transports and terminator and provides access to both +class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer +{ +public: + device3_recv_packet_streamer( + const size_t max_num_samps, + const uhd::rfnoc::rx_stream_terminator::sptr terminator, + const both_xports_t xport + ) : + uhd::transport::sph::recv_packet_streamer(max_num_samps), + _terminator(terminator), + _xport(xport) {}; + + ~device3_recv_packet_streamer() {}; + + both_xports_t get_xport() + { + return _xport; + } + + uhd::rfnoc::rx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + +private: + uhd::rfnoc::rx_stream_terminator::sptr _terminator; + both_xports_t _xport; +}; + class device3_impl : public uhd::device3, public boost::enable_shared_from_this<device3_impl> { public: @@ -64,14 +140,11 @@ public: size_t rx_fc_request_freq; //! How often the downstream block should send ACKs per one full FC window size_t tx_fc_response_freq; - //! How often the downstream block should send ACKs in cycles - size_t tx_fc_response_cycles; stream_options_t(void) : tx_max_len_hdr(DEVICE3_TX_MAX_HDR_LEN) , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN) , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ) , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) - , tx_fc_response_cycles(DEVICE3_TX_FC_RESPONSE_CYCLES) {}; }; diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 8882552af..92865b6fe 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -13,8 +13,6 @@ #include <uhd/rfnoc/sink_block_ctrl_base.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/log.hpp> -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp" #include <uhd/rfnoc/rate_node_ctrl.hpp> #include <uhd/rfnoc/radio_ctrl.hpp> #include <uhd/transport/zero_copy_flow_ctrl.hpp> @@ -30,10 +28,6 @@ using namespace uhd; using namespace uhd::usrp; using namespace uhd::transport; -//! CHDR uses 12-Bit sequence numbers -static const uint32_t HW_SEQ_NUM_MASK = 0xfff; - - /*********************************************************************** * Helper functions for get_?x_stream() **********************************************************************/ @@ -146,8 +140,25 @@ void generate_channel_list( struct rx_fc_cache_t { rx_fc_cache_t(): - last_seq_in(0){} - size_t last_seq_in; + interval(0), + last_byte_count(0), + total_bytes_consumed(0), + total_packets_consumed(0), + seq_num(0) {} + + //! Flow control interval in bytes + size_t interval; + //! Byte count at last flow control packet + uint32_t last_byte_count; + //! This will wrap around, but that's OK, because math. + uint32_t total_bytes_consumed; + //! This will wrap around, but that's OK, because math. + uint32_t total_packets_consumed; + //! Sequence number of next flow control packet + uint64_t seq_num; + sid_t sid; + zero_copy_if::sptr xport; + endianness_t endianness; }; /*! Determine the size of the flow control window in number of packets. @@ -180,109 +191,149 @@ static size_t get_rx_flow_control_window( throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); } - size_t window_in_pkts = (static_cast<size_t>(sw_buff_size * fullness_factor) / pkt_size); + size_t window_in_bytes = (static_cast<size_t>(sw_buff_size * fullness_factor)); if (rx_args.has_key("max_recv_window")) { - window_in_pkts = std::min( - window_in_pkts, - rx_args.cast<size_t>("max_recv_window", window_in_pkts) + window_in_bytes = std::min( + window_in_bytes, + rx_args.cast<size_t>("max_recv_window", window_in_bytes) ); } - if (window_in_pkts == 0) { + if (window_in_bytes < pkt_size) { throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); } - UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= pkt_size * window_in_pkts); - return window_in_pkts; + UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= window_in_bytes); + return window_in_bytes; } /*! Send out RX flow control packets. * - * For an rx stream, this function takes care of sending back - * a flow control packet to the source telling it which - * packets have been consumed. - * - * This function should only be called by the function handling - * the rx stream, usually recv() in super_recv_packet_handler. + * This function handles updating the counters for the consumed + * bytes and packets, determines if a flow control message is + * is necessary, and sends one if it is. Passing a nullptr for + * the buff parameter will skip the counter update. * - * \param sid The SID that goes into this packet. This is the reversed() - * version of the data stream's SID. - * \param xport A transport object over which to send the data - * \param big_endian Endianness of the transport - * \param seq32_state Pointer to a variable that saves the 32-Bit state - * of the sequence numbers, since we only have 12 Bit - * sequence numbers in CHDR. - * \param last_seq The value to send: The last consumed packet's sequence number. + * \param fc_cache RX flow control state information + * \param buff Receive buffer. Setting to nullptr will + * skip the counter update. */ -static void handle_rx_flowctrl( - const sid_t &sid, - zero_copy_if::sptr xport, - endianness_t endianness, +static bool rx_flow_ctrl( boost::shared_ptr<rx_fc_cache_t> fc_cache, - const size_t last_seq + managed_buffer::sptr buff ) { - static const size_t RXFC_PACKET_LEN_IN_WORDS = 2; - static const size_t RXFC_CMD_CODE_OFFSET = 0; - static const size_t RXFC_SEQ_NUM_OFFSET = 1; + // If the caller supplied a buffer + if (buff) + { + // Unpack the header + vrt::if_packet_info_t packet_info; + packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); + const uint32_t *pkt = buff->cast<const uint32_t *>(); + try { + if (fc_cache->endianness == ENDIANNESS_BIG) + { + vrt::chdr::if_hdr_unpack_be(pkt, packet_info); + } else { + vrt::chdr::if_hdr_unpack_le(pkt, packet_info); + } + } + catch(const std::exception &ex) + { + // Log and ignore + UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; + return true; + } - managed_send_buffer::sptr buff = xport->get_send_buff(0.0); - if (not buff) { - throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer"); + // Update counters assuming the buffer is a consumed packet + if (not packet_info.error) + { + fc_cache->total_bytes_consumed += buff->size(); + fc_cache->total_packets_consumed++; + } } - uint32_t *pkt = buff->cast<uint32_t *>(); - // Recover sequence number. The sequence numbers handled by the streamers - // are 12 Bits, but we want to know the 32-Bit sequence number. - size_t &seq32 = fc_cache->last_seq_in; - const size_t seq12 = seq32 & HW_SEQ_NUM_MASK; - if (last_seq < seq12) - seq32 += (HW_SEQ_NUM_MASK + 1); - seq32 &= ~HW_SEQ_NUM_MASK; - seq32 |= last_seq; + // Just return if there is no need to send a flow control packet + if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) + { + return true; + } - // Super-verbose mode: - //static size_t fc_pkt_count = 0; - //UHD_LOGGER_INFO("STREAMER") << "sending flow ctrl packet " << fc_pkt_count++ << ", acking " << str(boost::format("%04d\tseq_sw==0x%08x") % last_seq % seq32) ; + // Time to send a flow control packet + // Get a send buffer + managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); + if (not fc_buff) { + throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); + } + uint32_t *pkt = fc_buff->cast<uint32_t *>(); //load packet info vrt::if_packet_info_t packet_info; packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC; - packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS; + packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = seq32; + packet_info.packet_count = fc_cache->seq_num++; packet_info.sob = false; packet_info.eob = false; - packet_info.sid = sid.get(); + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = fc_cache->sid.get(); packet_info.has_sid = true; packet_info.has_cid = false; packet_info.has_tsi = false; packet_info.has_tsf = false; packet_info.has_tlr = false; - if (endianness == ENDIANNESS_BIG) { + if (fc_cache->endianness == ENDIANNESS_BIG) { // Load Header: vrt::chdr::if_hdr_pack_be(pkt, packet_info); - // Load Payload: (the sequence number) - pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htonx<uint32_t>(0); - pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET] = uhd::htonx<uint32_t>(seq32); + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + uhd::htonx<uint32_t>(fc_cache->total_packets_consumed); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + uhd::htonx<uint32_t>(fc_cache->total_bytes_consumed); } else { // Load Header: vrt::chdr::if_hdr_pack_le(pkt, packet_info); - // Load Payload: (the sequence number) - pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htowx<uint32_t>(0); - pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET] = uhd::htowx<uint32_t>(seq32); + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + uhd::htowx<uint32_t>(fc_cache->total_packets_consumed); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + uhd::htowx<uint32_t>(fc_cache->total_bytes_consumed); } - //std::cout << " SID=" << std::hex << sid << " hdr bits=" << packet_info.packet_type << " seq32=" << seq32 << std::endl; - //std::cout << "num_packet_words32: " << packet_info.num_packet_words32 << std::endl; - //for (size_t i = 0; i < packet_info.num_packet_words32; i++) { - //std::cout << str(boost::format("0x%08x") % pkt[i]) << " "; - //if (i % 2) { - //std::cout << std::endl; - //} - //} - //send the buffer over the interface - buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + + //update byte count + fc_cache->last_byte_count = fc_cache->total_bytes_consumed; + + return true; +} + +/*! Handle RX flow control ACK packets. + * + */ +static void handle_rx_flowctrl_ack( + boost::shared_ptr<rx_fc_cache_t> fc_cache, + const uint32_t *payload +) { + const uint32_t pkt_count = (fc_cache->endianness == ENDIANNESS_BIG) ? + uhd::ntohx<uint32_t>(payload[0]) : + uhd::wtohx<uint32_t>(payload[0]); + const uint32_t byte_count = (fc_cache->endianness == ENDIANNESS_BIG) ? + uhd::ntohx<uint32_t>(payload[1]) : + uhd::wtohx<uint32_t>(payload[1]); + if (fc_cache->total_bytes_consumed != byte_count) + { + UHD_LOGGER_DEBUG("device3") + << "oh noes: byte_count==" << byte_count + << " total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::endl + ; + } + fc_cache->total_bytes_consumed = byte_count; + fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? + + // This will send a flow control packet if there is a significant discrepancy + rx_flow_ctrl(fc_cache, nullptr); } /*********************************************************************** @@ -293,56 +344,51 @@ static void handle_rx_flowctrl( //! Stores the state of TX flow control struct tx_fc_cache_t { - tx_fc_cache_t(size_t capacity): + tx_fc_cache_t(uint32_t capacity): + last_byte_ack(0), last_seq_ack(0), - space(capacity) {} - - size_t last_seq_ack; - size_t space; + byte_count(0), + pkt_count(0), + window_size(capacity), + fc_ack_seqnum(0), + fc_received(false) {} + + uint32_t last_byte_ack; + uint32_t last_seq_ack; + uint32_t byte_count; + uint32_t pkt_count; + uint32_t window_size; + uint32_t fc_ack_seqnum; + bool fc_received; }; -/*! Return the size of the flow control window in packets. - * - * If the return value of this function is F, the last tx'd packet - * has index N and the last ack'd packet has index M, the amount of - * FC credit we have is C = F + M - N (i.e. we can send C more packets - * before getting another ack). - * - * Note: If `send_buff_size` is set in \p tx_hints, this will - * override hw_buff_size_. - */ -static size_t get_tx_flow_control_window( - size_t pkt_size, - const double hw_buff_size_, - const device_addr_t& tx_hints -) { - double hw_buff_size = tx_hints.cast<double>("send_buff_size", hw_buff_size_); - size_t window_in_pkts = (static_cast<size_t>(hw_buff_size) / pkt_size); - if (window_in_pkts == 0) { - throw uhd::value_error("send_buff_size must be larger than the send_frame_size."); - } - return window_in_pkts; -} - static bool tx_flow_ctrl( boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr async_xport, - uint32_t (*endian_conv)(uint32_t), + zero_copy_if::sptr xport, + uint32_t (*to_host)(uint32_t), void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), - managed_buffer::sptr + managed_buffer::sptr buff ) { while (true) { // If there is space - if (fc_cache->space) + if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) { // All is good - packet will be sent - fc_cache->space--; + fc_cache->byte_count += buff->size(); + // Round up to nearest word + if (fc_cache->byte_count % DEVICE3_LINE_SIZE) + { + fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); + } + fc_cache->pkt_count++; return true; } // Look for a flow control message to update the space available in the buffer. - managed_recv_buffer::sptr buff = async_xport->get_recv_buff(); + // A minimal timeout is used because larger timeouts can cause the thread to be + // scheduled out for too long at high data rates and result in underruns. + managed_recv_buffer::sptr buff = xport->get_recv_buff(0.000001); if (buff) { vrt::if_packet_info_t if_packet_info; @@ -353,28 +399,94 @@ static bool tx_flow_ctrl( } catch(const std::exception &ex) { - UHD_LOG_ERROR("TX FLOW CTRL", "Error unpacking async flow control packet: " << ex.what()); + UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; continue; } if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC) { - UHD_LOG_ERROR( - "TX FLOW CTRL", - "Unexpected packet type received by flow control handler: " << if_packet_info.packet_type - ); + UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; continue; } + const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; + const uint32_t pkt_count = to_host(payload[0]); + const uint32_t byte_count = to_host(payload[1]); + // update the amount of space - size_t seq_ack = endian_conv(packet_buff[if_packet_info.num_header_words32+1]); - fc_cache->space += (seq_ack - fc_cache->last_seq_ack) & HW_SEQ_NUM_MASK; - fc_cache->last_seq_ack = seq_ack; + fc_cache->last_byte_ack = byte_count; + fc_cache->last_seq_ack = pkt_count; + + fc_cache->fc_received = true; } } return false; } +static void tx_flow_ctrl_ack( + boost::shared_ptr<tx_fc_cache_t> fc_cache, + zero_copy_if::sptr send_xport, + sid_t send_sid, + uint32_t (*from_host)(uint32_t), + void (*pack)(uint32_t *packet_buff, vrt::if_packet_info_t &) +) { + if (not fc_cache->fc_received) + { + return; + } + + // Time to send a flow control ACK packet + // Get a send buffer + managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); + if (not fc_buff) { + UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; + return; + } + uint32_t *pkt = fc_buff->cast<uint32_t *>(); + + // Load packet info + vrt::if_packet_info_t packet_info; + packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK; + packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; + packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); + packet_info.packet_count = fc_cache->fc_ack_seqnum++; + packet_info.sob = false; + packet_info.eob = true; + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = send_sid.get(); + packet_info.has_sid = true; + packet_info.has_cid = false; + packet_info.has_tsi = false; + packet_info.has_tsf = false; + packet_info.has_tlr = false; + + // Load Header: + pack(pkt, packet_info); + + // Update counters to include this packet + size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); + fc_cache->byte_count += fc_ack_pkt_size; + // Round up to nearest word + if (fc_cache->byte_count % DEVICE3_LINE_SIZE) + { + fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); + } + fc_cache->pkt_count++; + + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + from_host(fc_cache->pkt_count); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + from_host(fc_cache->byte_count); + + // Send the buffer over the interface + fc_buff->commit(fc_ack_pkt_size); + + // Reset for next FC + fc_cache->fc_received = false; +} + /*********************************************************************** * TX Async Message Functions **********************************************************************/ @@ -445,13 +557,10 @@ static void handle_tx_async_msgs( async_info->stream_channel ); - // Filter out any flow control messages and cache the rest + // Filter out any flow control messages and cache the rest if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - UHD_LOG_ERROR( - "TX ASYNC", - "Unexpected flow control message found in async message handling" - ); + UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; } else { async_info->async_queue->push_with_pop_on_full(metadata); metadata.channel = async_info->device_channel; @@ -474,8 +583,8 @@ void device3_impl::update_rx_streamers(double /* rate */) { for(const std::string &block_id: _rx_streamers.keys()) { UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id; - boost::shared_ptr<sph::recv_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[block_id].lock()); + boost::shared_ptr<device3_recv_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<device3_recv_packet_streamer>(_rx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -511,12 +620,15 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) // Note: All 'args.args' are merged into chan_args now. // II. Iterate over all channels - boost::shared_ptr<sph::recv_packet_streamer> my_streamer; + boost::shared_ptr<device3_recv_packet_streamer> my_streamer; // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each upstream block. rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { + for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) + { + // First, configure blocks and create transport + // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ; @@ -549,7 +661,36 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); - UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size ; + UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size; + + // Configure the block + // Flow control setup + const size_t pkt_size = xport.recv->get_recv_frame_size(); + // Leave one pkt_size space for overrun packets - TODO make this obsolete + const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size; + const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); + UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window; + blk_ctrl->configure_flow_control_out( + true, + fc_window, + rx_hints.cast<size_t>("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 + block_port + ); + + // Add flow control transport + boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); + fc_cache->sid = xport.send_sid; + fc_cache->xport = xport.send; + fc_cache->endianness = xport.endianness; + fc_cache->interval = fc_handle_window; + xport.recv = zero_copy_flow_ctrl::make + ( + xport.recv, + NULL, + [=](managed_buffer::sptr buff) { + return rx_flow_ctrl(fc_cache, buff); + } + ); // Configure the block // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. @@ -558,8 +699,10 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) // other settings. blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x1, block_port); blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x0, block_port); + // Configure routing for data blk_ctrl->set_destination(xport.send_sid.get_src(), block_port); + // Configure routing for responses blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ; @@ -570,17 +713,24 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); } - // To calculate the max number of samples per packet, we assume the maximum header length - // to avoid fragmentation should the entire header be used. - const size_t bpp = xport.recv->get_recv_frame_size() - stream_options.rx_max_len_hdr; // bytes per packet - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet - UHD_RX_STREAMER_LOG() << "spp == " << spp ; + // Second, configure the streamer //make the new streamer given the samples per packet if (not my_streamer) - my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); - my_streamer->resize(chan_list.size()); + { + // To calculate the max number of samples per packet, we assume the maximum header length + // to avoid fragmentation should the entire header be used. + const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet + const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item + const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet + UHD_RX_STREAMER_LOG() << "spp == " << spp ; + + my_streamer = boost::make_shared<device3_recv_packet_streamer>( + spp, + recv_terminator, + xport); + my_streamer->resize(chan_list.size()); + } //init some streamer stuff std::string conv_endianness; @@ -600,72 +750,51 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) id.num_outputs = 1; my_streamer->set_converter(id); - //flow control setup - const size_t pkt_size = spp * bpi + stream_options.rx_max_len_hdr; - const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints); - const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); - UHD_RX_STREAMER_LOG()<< "Flow Control Window (minus one) = " << fc_window-1 << ", Flow Control Handler Window = " << fc_handle_window ; - blk_ctrl->configure_flow_control_out( - fc_window-1, // Leave one space for overrun packets TODO make this obsolete - block_port + // Give the streamer a functor to handle flow control ACK messages + my_streamer->set_xport_handle_flowctrl_ack( + stream_i, + [=](const uint32_t *payload) { + handle_rx_flowctrl_ack( + fc_cache, + payload + ); + } ); //Give the streamer a functor to get the recv_buffer - //bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&zero_copy_if::get_recv_buff, xport.recv, _1), + [=](double timeout) {return xport.recv->get_recv_buff(timeout);}, true /*flush*/ ); //Give the streamer a functor to handle overruns //bind requires a weak_ptr to break the a streamer->streamer circular dependency //Using "this" is OK because we know that this device3_impl will outlive the streamer + boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer); my_streamer->set_overflow_handler( - stream_i, - boost::bind( - &uhd::rfnoc::rx_stream_terminator::handle_overrun, recv_terminator, - boost::weak_ptr<uhd::rx_streamer>(my_streamer), stream_i - ) - ); - - //Give the streamer a functor to send flow control messages - //handle_rx_flowctrl is static and has no lifetime issues - boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); - my_streamer->set_xport_handle_flowctrl( - stream_i, boost::bind( - &handle_rx_flowctrl, - xport.send_sid, - xport.send, - xport.endianness, - fc_cache, - _1 - ), - fc_handle_window, - true/*init*/ + stream_i, + [=]() { + recv_terminator->handle_overrun( + weak_ptr, + stream_i); + } ); //Give the streamer a functor issue stream cmd - //bind requires a shared pointer to add a streamer->framer lifetime dependency my_streamer->set_issue_stream_cmd( stream_i, - boost::bind(&uhd::rfnoc::source_block_ctrl_base::issue_stream_cmd, blk_ctrl, _1, block_port) + [=](const stream_cmd_t& stream_cmd) {blk_ctrl->issue_stream_cmd(stream_cmd, block_port);} ); - - // Tell the streamer which SID is valid for this channel - my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); } - // Connect the terminator to the streamer - my_streamer->set_terminator(recv_terminator); - // Notify all blocks in this chain that they are connected to an active streamer recv_terminator->set_rx_streamer(true, 0); // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. // Note that we store the streamer only once, and use its terminator's // ID to do so. - _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<sph::recv_packet_streamer>(my_streamer); + _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<uhd::rx_streamer>(my_streamer); // Sets tick rate, samp rate and scaling on this streamer. // A registered terminator is required to do this. @@ -682,8 +811,8 @@ void device3_impl::update_tx_streamers(double /* rate */) { for(const std::string &block_id: _tx_streamers.keys()) { UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id; - boost::shared_ptr<sph::send_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[block_id].lock()); + boost::shared_ptr<device3_send_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<device3_send_packet_streamer>(_tx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -705,20 +834,6 @@ void device3_impl::update_tx_streamers(double /* rate */) } } -// This class manages the lifetime of the TX async message handler task and transports -class device3_send_packet_streamer : public sph::send_packet_streamer -{ -public: - device3_send_packet_streamer(const size_t max_num_samps) : sph::send_packet_streamer(max_num_samps) {}; - ~device3_send_packet_streamer() { - _tx_async_msg_task.reset(); // Make sure the async task is destroyed before the transports - }; - - both_xports_t _xport; - both_xports_t _async_xport; - task::sptr _tx_async_msg_task; -}; - tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); @@ -739,7 +854,10 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // There is only one terminator. If the streamer has multiple channels, // it will be connected to each downstream block. rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { + for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) + { + // First, configure the downstream blocks and create the transports + // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; // Update args so args.args is always valid for this particular channel: @@ -768,87 +886,42 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // Setup the dsp transport hints device_addr_t tx_hints = get_tx_hints(mb_index); - //allocate sid and create transport + // Allocate sid and create transport uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); - UHD_TX_STREAMER_LOG() << std::hex << "[TX Streamer] data_sid = " << xport.send_sid << std::dec << std::endl; - - // To calculate the max number of samples per packet, we assume the maximum header length - // to avoid fragmentation should the entire header be used. - const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet - UHD_TX_STREAMER_LOG() << "spp == " << spp ; - - //make the new streamer given the samples per packet - if (not my_streamer) - my_streamer = boost::make_shared<device3_send_packet_streamer>(spp); - my_streamer->resize(chan_list.size()); - my_streamer->_xport = xport; - my_streamer->_async_xport = async_xport; - - //init some streamer stuff - std::string conv_endianness; - if (xport.endianness == ENDIANNESS_BIG) { - my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); - conv_endianness = "be"; - } else { - my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); - conv_endianness = "le"; - } - - //set the converter - uhd::convert::id_type id; - id.input_format = args.cpu_format; - id.num_inputs = 1; - id.output_format = args.otw_format + "_item32_" + conv_endianness; - id.num_outputs = 1; - my_streamer->set_converter(id); + UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; - //flow control setup - const size_t pkt_size = spp * bpi + stream_options.tx_max_len_hdr; - // For flow control, this value is used to determine the window size in *packets* - size_t fc_window = get_tx_flow_control_window( - pkt_size, // This is the maximum packet size - blk_ctrl->get_fifo_size(block_port), - tx_hints // This can override the value reported by the block! - ); + // Configure flow control + // This disables the FC module's output, do this before configuring flow control + blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); + blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); + // Configure flow control on downstream block + const size_t fc_window = tx_hints.cast<size_t>("send_buff_size", blk_ctrl->get_fifo_size(block_port)); const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.tx_fc_response_freq); UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ; blk_ctrl->configure_flow_control_in( - stream_options.tx_fc_response_cycles, - fc_handle_window, /*pkts*/ + fc_handle_window, /*bytes*/ block_port ); - - boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); - async_tx_info->stream_channel = args.channels[stream_i]; - async_tx_info->device_channel = mb_index; - async_tx_info->async_queue = async_md; - async_tx_info->old_async_queue = _async_md; - - boost::function<double(void)> tick_rate_retriever = boost::bind( - &rfnoc::tick_node_ctrl::get_tick_rate, - send_terminator, - std::set< rfnoc::node_ctrl_base::sptr >() // Need to specify default args with bind - ); - - my_streamer->_tx_async_msg_task = task::make( - boost::bind( - &handle_tx_async_msgs, - async_tx_info, - my_streamer->_async_xport.recv, - xport.endianness, - tick_rate_retriever - ), - "tx_async_msgs_task" + // Add flow control transport + boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); + xport.send = zero_copy_flow_ctrl::make( + xport.send, + [=](managed_buffer::sptr buff) { + return tx_flow_ctrl( + fc_cache, + xport.recv, + (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), + (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), + buff); + }, + NULL ); - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); + // Configure return path for async messages + blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ; // FIXME: Once there is a better way to map the radio block and port @@ -863,7 +936,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) { if (node->get_block_id() == radio_id) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), radio_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port); } } } else { @@ -876,39 +949,96 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); } } - // Add flow control - boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); - my_streamer->_xport.send = zero_copy_flow_ctrl::make( - my_streamer->_xport.send, - boost::bind( - &tx_flow_ctrl, - fc_cache, - my_streamer->_xport.recv, - (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), - (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), - _1), - NULL); + // Second, configure the streamer now that the blocks and transports are configured + + //make the new streamer given the samples per packet + if (not my_streamer) + { + // To calculate the max number of samples per packet, we assume the maximum header length + // to avoid fragmentation should the entire header be used. + const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; + const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item + const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet + UHD_TX_STREAMER_LOG() << "spp == " << spp ; + + my_streamer = boost::make_shared<device3_send_packet_streamer>( + spp, + send_terminator, + xport, + async_xport); + my_streamer->resize(chan_list.size()); + } + + //init some streamer stuff + std::string conv_endianness; + if (xport.endianness == ENDIANNESS_BIG) { + my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); + conv_endianness = "be"; + } else { + my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); + conv_endianness = "le"; + } + + //set the converter + uhd::convert::id_type id; + id.input_format = args.cpu_format; + id.num_inputs = 1; + id.output_format = args.otw_format + "_item32_" + conv_endianness; + id.num_outputs = 1; + my_streamer->set_converter(id); + + boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); + async_tx_info->stream_channel = args.channels[stream_i]; + async_tx_info->device_channel = mb_index; + async_tx_info->async_queue = async_md; + async_tx_info->old_async_queue = _async_md; + + task::sptr async_task = task::make( + [=]() { + handle_tx_async_msgs( + async_tx_info, + async_xport.recv, + xport.endianness, + [=]() {return send_terminator->get_tick_rate();} + ); + } + ); + my_streamer->add_async_msg_task(async_task); //Give the streamer a functor to get the send buffer my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&zero_copy_if::get_send_buff, my_streamer->_xport.send, _1) + [=](const double timeout) { + return xport.send->get_send_buff(timeout); + } ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( - boost::bind(&async_md_type::pop_with_timed_wait, async_md, _1, _2) + [=](uhd::async_metadata_t& md, const double timeout) { + return async_md->pop_with_timed_wait(md, timeout); + } ); my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); // CHDR does not support trailers my_streamer->set_enable_trailer(false); - } - // Connect the terminator to the streamer - my_streamer->set_terminator(send_terminator); + my_streamer->set_xport_chan_post_send_cb( + stream_i, + [=]() { + tx_flow_ctrl_ack( + fc_cache, + xport.send, + xport.send_sid, + (xport.endianness == ENDIANNESS_BIG ? uhd::htonx<uint32_t> : uhd::htowx<uint32_t>), + (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_pack_be : vrt::chdr::if_hdr_pack_le) + ); + } + ); + } // Notify all blocks in this chain that they are connected to an active streamer send_terminator->set_tx_streamer(true, 0); @@ -916,7 +1046,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. // Note that we store the streamer only once, and use its terminator's // ID to do so. - _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<sph::send_packet_streamer>(my_streamer); + _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<uhd::tx_streamer>(my_streamer); // Sets tick rate, samp rate and scaling on this streamer // A registered terminator is required to do this. diff --git a/host/lib/usrp/x300/x300_impl.cpp b/host/lib/usrp/x300/x300_impl.cpp index 0c8d78834..351cb4e10 100644 --- a/host/lib/usrp/x300/x300_impl.cpp +++ b/host/lib/usrp/x300/x300_impl.cpp @@ -1231,8 +1231,8 @@ uhd::both_xports_t x300_impl::make_transport( ? X300_PCIE_RX_DATA_FRAME_SIZE : X300_PCIE_MSG_FRAME_SIZE; - default_buff_args.num_send_frames = - (xport_type == TX_DATA) + default_buff_args.num_send_frames = + (xport_type == TX_DATA) ? X300_PCIE_TX_DATA_NUM_FRAMES : X300_PCIE_MSG_NUM_FRAMES; diff --git a/host/lib/usrp/x300/x300_io_impl.cpp b/host/lib/usrp/x300/x300_io_impl.cpp index 082d83185..af5aa7c9e 100644 --- a/host/lib/usrp/x300/x300_io_impl.cpp +++ b/host/lib/usrp/x300/x300_io_impl.cpp @@ -7,19 +7,9 @@ #include "x300_regs.hpp" #include "x300_impl.hpp" -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp" -#include <uhdlib/usrp/common/async_packet_handler.hpp> -#include <uhd/transport/nirio_zero_copy.hpp> -#include <uhd/transport/bounded_buffer.hpp> -#include <uhd/utils/tasks.hpp> -#include <uhd/utils/log.hpp> -#include <boost/bind.hpp> -#include <boost/make_shared.hpp> using namespace uhd; using namespace uhd::usrp; -using namespace uhd::transport; /*********************************************************************** * Hooks for get_tx_stream() and get_rx_stream() @@ -62,8 +52,8 @@ void x300_impl::post_streamer_hooks(direction_t dir) // Loop through all tx streamers. Find all radios connected to one // streamer. Sync those. for(const boost::weak_ptr<uhd::tx_streamer> &streamer_w: _tx_streamers.vals()) { - const boost::shared_ptr<sph::send_packet_streamer> streamer = - boost::dynamic_pointer_cast<sph::send_packet_streamer>(streamer_w.lock()); + const boost::shared_ptr<device3_send_packet_streamer> streamer = + boost::dynamic_pointer_cast<device3_send_packet_streamer>(streamer_w.lock()); if (not streamer) { continue; } diff --git a/images/manifest.txt b/images/manifest.txt index 266f564ab..4978e75b6 100644 --- a/images/manifest.txt +++ b/images/manifest.txt @@ -1,18 +1,18 @@ # UHD Image Manifest File # Target hash url SHA256 # X300-Series -x3xx_x310_fpga_default fpga-f279264 x3xx/fpga-f279264/x3xx_x310_fpga_default-gf279264.zip a9d0f4b9f75bca4724333f3320989b2f9a56cf75e3a7bc569b4e868af3feb095 -x3xx_x300_fpga_default fpga-f279264 x3xx/fpga-f279264/x3xx_x300_fpga_default-gf279264.zip 872c3833dc03ed8ff5a6dea2053d7ccb353a63ba760b82c4a48c3c3b839a725c +x3xx_x310_fpga_default fpga-c398755 x3xx/fpga-c398755/x3xx_x310_fpga_default-gc398755.zip 22bf9147373bae395063ed0f2c016f2d1c47da01a40e042fc7f84e9e2eecd331 +x3xx_x300_fpga_default fpga-c398755 x3xx/fpga-c398755/x3xx_x300_fpga_default-gc398755.zip 19c5ff0109cc3c09a698bf2349a7b1f0573528a8b8d29f6e37720ec7c263efe6 # Example daughterboard targets (none currently exist) #x3xx_twinrx_cpld_default example_target #dboard_ubx_cpld_default example_target # E-Series -e3xx_e310_fpga_default fpga-f279264 e3xx/fpga-f279264/e3xx_e310_fpga_default-gf279264.zip 707428e8703b29ad9df60725f25fed78223deab5f4a7becf4ed1886b43f18f92 +e3xx_e310_fpga_default fpga-c398755 e3xx/fpga-c398755/e3xx_e310_fpga_default-gc398755.zip 707428e8703b29ad9df60725f25fed78223deab5f4a7becf4ed1886b43f18f92 e3xx_e310_fpga_rfnoc fpga-d6a878b e3xx/fpga-d6a878b/e3xx_e310_fpga_rfnoc-gd6a878b.zip 5c9b89fb6293423644868c22e914de386a9af39ff031da6800a1cf39a90ea73b # N300-Series -n3xx_n310_fpga_default fpga-f279264 n3xx/fpga-f279264/n3xx_n310_fpga_default-gf279264.zip e31c2a71014e7fdb140e00ef3f6e9814b720e0f8de4544d2bad8a50cf1c61ce1 -n3xx_n300_fpga_default fpga-f279264 n3xx/fpga-f279264/n3xx_n300_fpga_default-gf279264.zip 83f880d2b79f666a8fe660aa949cd1c1eaf5256d31036696fd270ae59491acc6 +n3xx_n310_fpga_default fpga-c398755 n3xx/fpga-c398755/n3xx_n310_fpga_default-gc398755.zip 6637bf9ce4a9c212bd30494aa5e0117e4a076a367a5370b4542053beed69e85a +n3xx_n300_fpga_default fpga-c398755 n3xx/fpga-c398755/n3xx_n300_fpga_default-gc398755.zip 2d45af7c433449a3b0f4aca90bf30b3c1bc918b452893d22c8397f086e2812d4 #n3xx_n310_fpga_aurora fpga-1107862 n3xx/fpga-1107862/n3xx_n310_fpga_aurora-g1107862.zip 3926d6b247a8f931809460d3957cec51f8407cd3f7aea6f4f3b91d1bbb427c7d #n3xx_n300_fpga_aurora fpga-1107862 n3xx/fpga-1107862/n3xx_n300_fpga_aurora-g1107862.zip e34e9343572adfba905433a1570cb394fe45207d442268d0fa400c3406253530 #n3xx_n310_cpld_default fpga-6bea23d n3xx/fpga-6bea23d/n3xx_n310_cpld_default-g6bea23d.zip 0 diff --git a/tools/dissectors/packet-chdr.c b/tools/dissectors/packet-chdr.c index 95e537518..e5dcd6e5e 100644 --- a/tools/dissectors/packet-chdr.c +++ b/tools/dissectors/packet-chdr.c @@ -41,6 +41,9 @@ static int hf_chdr_type = -1; static int hf_chdr_has_time = -1; static int hf_chdr_eob = -1; static int hf_chdr_error = -1; +static int hf_chdr_fc_ack = -1; +static int hf_chdr_fc_pktcount = -1; +static int hf_chdr_fc_bytecount = -1; static int hf_chdr_sequence = -1; static int hf_chdr_packet_size = -1; static int hf_chdr_stream_id = -1; @@ -53,6 +56,7 @@ static int hf_chdr_dst_blockport = -1; static int hf_chdr_timestamp = -1; static int hf_chdr_payload = -1; static int hf_chdr_ext_response = -1; +static int hf_chdr_ext_fc = -1; static int hf_chdr_ext_status_code = -1; static int hf_chdr_ext_seq_num = -1; static int hf_chdr_cmd = -1; @@ -63,6 +67,7 @@ static const value_string CHDR_PACKET_TYPES[] = { { 0, "Data" }, { 1, "Data (End-of-Burst)" }, { 4, "Flow Control" }, + { 5, "Flow Control (ACK)" }, { 8, "Command" }, { 12, "Response" }, { 13, "Error Response" }, @@ -72,6 +77,7 @@ static const value_string CHDR_PACKET_TYPES_SHORT[] = { { 0, "data" }, { 1, "data" }, { 4, "fc" }, + { 5, "fc" }, { 8, "cmd" }, { 12, "resp" }, { 13, "resp" }, @@ -87,6 +93,7 @@ static gint ett_chdr = -1; static gint ett_chdr_header = -1; static gint ett_chdr_id = -1; static gint ett_chdr_response = -1; +static gint ett_chdr_fc = -1; static gint ett_chdr_cmd = -1; /* Forward-declare the dissector functions */ @@ -158,6 +165,8 @@ static void dissect_chdr(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) proto_tree *response_tree; proto_item *cmd_item; proto_tree *cmd_tree; + proto_item *fc_item; + proto_tree *fc_tree; gint len; gint flag_offset; @@ -170,6 +179,7 @@ static void dissect_chdr(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) gboolean flag_is_resp = 0; gboolean flag_is_eob = 0; gboolean flag_is_error = 0; + gboolean flag_is_fcack = 0; uint64_t timestamp; gboolean is_network; gint endianness; @@ -213,6 +223,7 @@ static void dissect_chdr(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) flag_is_resp = (pkt_type == 3); flag_is_eob = flag_is_data && (hdr_bits & 0x1); flag_is_error = flag_is_resp && (hdr_bits & 0x1); + flag_is_fcack = flag_is_fc && (hdr_bits & 0x1); flag_has_time = hdr_bits & 0x2; if (flag_has_time) { header_size += 8; // 64-bit timestamp. @@ -246,6 +257,9 @@ static void dissect_chdr(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) proto_tree_add_boolean(header_tree, hf_chdr_error, tvb, flag_offset, 1, flag_is_error); /*proto_tree_add_boolean(header_tree, hf_chdr_error, tvb, flag_offset, 1, true);*/ } + if (flag_is_fc) { + proto_tree_add_boolean(header_tree, hf_chdr_fc_ack, tvb, flag_offset, 1, flag_is_fcack); + } /* These lines add sequence, packet_size and stream ID */ proto_tree_add_item(chdr_tree, hf_chdr_sequence, tvb, (is_network ? 0:2), 2, endianness); @@ -306,6 +320,11 @@ static void dissect_chdr(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) proto_tree_add_item(response_tree, hf_chdr_ext_status_code, tvb, header_size, 4, endianness); /* This will show the 12-bits of sequence ID in the last 2 bytes */ proto_tree_add_item(response_tree, hf_chdr_ext_seq_num, tvb, (header_size + 4 + (is_network ? 2 : 0)), 2, endianness); + } else if (flag_is_fc) { + fc_item = proto_tree_add_item(chdr_tree, hf_chdr_ext_fc, tvb, header_size, 8, endianness); + fc_tree = proto_item_add_subtree(fc_item, ett_chdr_fc); + proto_tree_add_item(fc_tree, hf_chdr_fc_pktcount, tvb, header_size, 4, endianness); + proto_tree_add_item(fc_tree, hf_chdr_fc_bytecount, tvb, header_size + 4, 4, endianness); } else if (show_raw_payload) { proto_tree_add_item(chdr_tree, hf_chdr_payload, tvb, header_size, -1, ENC_NA); } @@ -347,6 +366,30 @@ void proto_register_chdr(void) NULL, 0x10, NULL, HFILL } }, + { &hf_chdr_fc_ack, + { "FC ACK Flag", "chdr.hdr.fc_ack", + FT_BOOLEAN, BASE_NONE, + NULL, 0x10, + NULL, HFILL } + }, + { &hf_chdr_ext_fc, + { "Flow Control", "chdr.fc", + FT_BYTES, BASE_NONE, + NULL, 0x0, + NULL, HFILL } + }, + { &hf_chdr_fc_bytecount, + { "FC Byte Count", "chdr.hdr.fc.byte_count", + FT_UINT32, BASE_DEC, + NULL, 0x0, + NULL, HFILL } + }, + { &hf_chdr_fc_pktcount, + { "FC Packet Count", "chdr.hdr.fc.pkt_count", + FT_UINT32, BASE_DEC, + NULL, 0x0, + NULL, HFILL } + }, { &hf_chdr_sequence, { "Sequence ID", "chdr.seq", FT_UINT16, BASE_DEC, @@ -457,6 +500,7 @@ void proto_register_chdr(void) &ett_chdr_header, &ett_chdr_id, &ett_chdr_response, + &ett_chdr_fc, &ett_chdr_cmd }; |