diff options
author | Martin Braun <martin.braun@ettus.com> | 2016-11-14 14:30:34 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2018-07-25 15:34:03 -0700 |
commit | 988515ab19a715773086a7a8c023ddb8249c7e37 (patch) | |
tree | 71c861c3a1d0a5e295dad5939358dd30e0a33f3b /host/lib/usrp/device3 | |
parent | 8b16ab706fb4768f802ddb65a81fc26e1562cb0d (diff) | |
download | uhd-988515ab19a715773086a7a8c023ddb8249c7e37.tar.gz uhd-988515ab19a715773086a7a8c023ddb8249c7e37.tar.bz2 uhd-988515ab19a715773086a7a8c023ddb8249c7e37.zip |
Device3: Change packet-based flow control to byte-based flow control
Diffstat (limited to 'host/lib/usrp/device3')
-rw-r--r-- | host/lib/usrp/device3/device3_impl.hpp | 81 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 686 |
2 files changed, 485 insertions, 282 deletions
diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index 8ecb1f72b..580268e4a 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -21,6 +21,10 @@ #include <uhd/types/direction.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/device3.hpp> +#include "../../transport/super_send_packet_handler.hpp" +#include "../../transport/super_recv_packet_handler.hpp" +#include <uhdlib/rfnoc/tx_stream_terminator.hpp> +#include <uhdlib/rfnoc/rx_stream_terminator.hpp> #include <uhdlib/rfnoc/xports.hpp> namespace uhd { namespace usrp { @@ -30,11 +34,83 @@ namespace uhd { namespace usrp { **********************************************************************/ static const size_t DEVICE3_RX_FC_REQUEST_FREQ = 32; //per flow-control window static const size_t DEVICE3_TX_FC_RESPONSE_FREQ = 8; -static const size_t DEVICE3_TX_FC_RESPONSE_CYCLES = 0; // Cycles: Off. +static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2; +static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET = 0; +static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET = 1; +static const size_t DEVICE3_LINE_SIZE = 8; static const size_t DEVICE3_TX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes static const size_t DEVICE3_RX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes +// This class manages the lifetime of the TX async message handler task, transports, and terminator +class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer +{ +public: + device3_send_packet_streamer( + const size_t max_num_samps, + const uhd::rfnoc::tx_stream_terminator::sptr terminator, + const both_xports_t data_xport, + const both_xports_t async_msg_xport + ) : + uhd::transport::sph::send_packet_streamer(max_num_samps), + _terminator(terminator), + _data_xport(data_xport), + _async_msg_xport(async_msg_xport) + {}; + + ~device3_send_packet_streamer() + { + // Make sure the async task is destroyed before the transports + _tx_async_msg_tasks.clear(); + }; + + uhd::rfnoc::tx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + + void add_async_msg_task(task::sptr task) + { + _tx_async_msg_tasks.push_back(task); + } + +private: + uhd::rfnoc::tx_stream_terminator::sptr _terminator; + both_xports_t _data_xport; + both_xports_t _async_msg_xport; + std::vector<task::sptr> _tx_async_msg_tasks; +}; + +// This class manages the lifetime of the RX transports and terminator and provides access to both +class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer +{ +public: + device3_recv_packet_streamer( + const size_t max_num_samps, + const uhd::rfnoc::rx_stream_terminator::sptr terminator, + const both_xports_t xport + ) : + uhd::transport::sph::recv_packet_streamer(max_num_samps), + _terminator(terminator), + _xport(xport) {}; + + ~device3_recv_packet_streamer() {}; + + both_xports_t get_xport() + { + return _xport; + } + + uhd::rfnoc::rx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + +private: + uhd::rfnoc::rx_stream_terminator::sptr _terminator; + both_xports_t _xport; +}; + class device3_impl : public uhd::device3, public boost::enable_shared_from_this<device3_impl> { public: @@ -64,14 +140,11 @@ public: size_t rx_fc_request_freq; //! How often the downstream block should send ACKs per one full FC window size_t tx_fc_response_freq; - //! How often the downstream block should send ACKs in cycles - size_t tx_fc_response_cycles; stream_options_t(void) : tx_max_len_hdr(DEVICE3_TX_MAX_HDR_LEN) , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN) , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ) , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) - , tx_fc_response_cycles(DEVICE3_TX_FC_RESPONSE_CYCLES) {}; }; diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 8882552af..92865b6fe 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -13,8 +13,6 @@ #include <uhd/rfnoc/sink_block_ctrl_base.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/log.hpp> -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp" #include <uhd/rfnoc/rate_node_ctrl.hpp> #include <uhd/rfnoc/radio_ctrl.hpp> #include <uhd/transport/zero_copy_flow_ctrl.hpp> @@ -30,10 +28,6 @@ using namespace uhd; using namespace uhd::usrp; using namespace uhd::transport; -//! CHDR uses 12-Bit sequence numbers -static const uint32_t HW_SEQ_NUM_MASK = 0xfff; - - /*********************************************************************** * Helper functions for get_?x_stream() **********************************************************************/ @@ -146,8 +140,25 @@ void generate_channel_list( struct rx_fc_cache_t { rx_fc_cache_t(): - last_seq_in(0){} - size_t last_seq_in; + interval(0), + last_byte_count(0), + total_bytes_consumed(0), + total_packets_consumed(0), + seq_num(0) {} + + //! Flow control interval in bytes + size_t interval; + //! Byte count at last flow control packet + uint32_t last_byte_count; + //! This will wrap around, but that's OK, because math. + uint32_t total_bytes_consumed; + //! This will wrap around, but that's OK, because math. + uint32_t total_packets_consumed; + //! Sequence number of next flow control packet + uint64_t seq_num; + sid_t sid; + zero_copy_if::sptr xport; + endianness_t endianness; }; /*! Determine the size of the flow control window in number of packets. @@ -180,109 +191,149 @@ static size_t get_rx_flow_control_window( throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); } - size_t window_in_pkts = (static_cast<size_t>(sw_buff_size * fullness_factor) / pkt_size); + size_t window_in_bytes = (static_cast<size_t>(sw_buff_size * fullness_factor)); if (rx_args.has_key("max_recv_window")) { - window_in_pkts = std::min( - window_in_pkts, - rx_args.cast<size_t>("max_recv_window", window_in_pkts) + window_in_bytes = std::min( + window_in_bytes, + rx_args.cast<size_t>("max_recv_window", window_in_bytes) ); } - if (window_in_pkts == 0) { + if (window_in_bytes < pkt_size) { throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); } - UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= pkt_size * window_in_pkts); - return window_in_pkts; + UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= window_in_bytes); + return window_in_bytes; } /*! Send out RX flow control packets. * - * For an rx stream, this function takes care of sending back - * a flow control packet to the source telling it which - * packets have been consumed. - * - * This function should only be called by the function handling - * the rx stream, usually recv() in super_recv_packet_handler. + * This function handles updating the counters for the consumed + * bytes and packets, determines if a flow control message is + * is necessary, and sends one if it is. Passing a nullptr for + * the buff parameter will skip the counter update. * - * \param sid The SID that goes into this packet. This is the reversed() - * version of the data stream's SID. - * \param xport A transport object over which to send the data - * \param big_endian Endianness of the transport - * \param seq32_state Pointer to a variable that saves the 32-Bit state - * of the sequence numbers, since we only have 12 Bit - * sequence numbers in CHDR. - * \param last_seq The value to send: The last consumed packet's sequence number. + * \param fc_cache RX flow control state information + * \param buff Receive buffer. Setting to nullptr will + * skip the counter update. */ -static void handle_rx_flowctrl( - const sid_t &sid, - zero_copy_if::sptr xport, - endianness_t endianness, +static bool rx_flow_ctrl( boost::shared_ptr<rx_fc_cache_t> fc_cache, - const size_t last_seq + managed_buffer::sptr buff ) { - static const size_t RXFC_PACKET_LEN_IN_WORDS = 2; - static const size_t RXFC_CMD_CODE_OFFSET = 0; - static const size_t RXFC_SEQ_NUM_OFFSET = 1; + // If the caller supplied a buffer + if (buff) + { + // Unpack the header + vrt::if_packet_info_t packet_info; + packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); + const uint32_t *pkt = buff->cast<const uint32_t *>(); + try { + if (fc_cache->endianness == ENDIANNESS_BIG) + { + vrt::chdr::if_hdr_unpack_be(pkt, packet_info); + } else { + vrt::chdr::if_hdr_unpack_le(pkt, packet_info); + } + } + catch(const std::exception &ex) + { + // Log and ignore + UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; + return true; + } - managed_send_buffer::sptr buff = xport->get_send_buff(0.0); - if (not buff) { - throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer"); + // Update counters assuming the buffer is a consumed packet + if (not packet_info.error) + { + fc_cache->total_bytes_consumed += buff->size(); + fc_cache->total_packets_consumed++; + } } - uint32_t *pkt = buff->cast<uint32_t *>(); - // Recover sequence number. The sequence numbers handled by the streamers - // are 12 Bits, but we want to know the 32-Bit sequence number. - size_t &seq32 = fc_cache->last_seq_in; - const size_t seq12 = seq32 & HW_SEQ_NUM_MASK; - if (last_seq < seq12) - seq32 += (HW_SEQ_NUM_MASK + 1); - seq32 &= ~HW_SEQ_NUM_MASK; - seq32 |= last_seq; + // Just return if there is no need to send a flow control packet + if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) + { + return true; + } - // Super-verbose mode: - //static size_t fc_pkt_count = 0; - //UHD_LOGGER_INFO("STREAMER") << "sending flow ctrl packet " << fc_pkt_count++ << ", acking " << str(boost::format("%04d\tseq_sw==0x%08x") % last_seq % seq32) ; + // Time to send a flow control packet + // Get a send buffer + managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); + if (not fc_buff) { + throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); + } + uint32_t *pkt = fc_buff->cast<uint32_t *>(); //load packet info vrt::if_packet_info_t packet_info; packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC; - packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS; + packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = seq32; + packet_info.packet_count = fc_cache->seq_num++; packet_info.sob = false; packet_info.eob = false; - packet_info.sid = sid.get(); + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = fc_cache->sid.get(); packet_info.has_sid = true; packet_info.has_cid = false; packet_info.has_tsi = false; packet_info.has_tsf = false; packet_info.has_tlr = false; - if (endianness == ENDIANNESS_BIG) { + if (fc_cache->endianness == ENDIANNESS_BIG) { // Load Header: vrt::chdr::if_hdr_pack_be(pkt, packet_info); - // Load Payload: (the sequence number) - pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htonx<uint32_t>(0); - pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET] = uhd::htonx<uint32_t>(seq32); + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + uhd::htonx<uint32_t>(fc_cache->total_packets_consumed); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + uhd::htonx<uint32_t>(fc_cache->total_bytes_consumed); } else { // Load Header: vrt::chdr::if_hdr_pack_le(pkt, packet_info); - // Load Payload: (the sequence number) - pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htowx<uint32_t>(0); - pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET] = uhd::htowx<uint32_t>(seq32); + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + uhd::htowx<uint32_t>(fc_cache->total_packets_consumed); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + uhd::htowx<uint32_t>(fc_cache->total_bytes_consumed); } - //std::cout << " SID=" << std::hex << sid << " hdr bits=" << packet_info.packet_type << " seq32=" << seq32 << std::endl; - //std::cout << "num_packet_words32: " << packet_info.num_packet_words32 << std::endl; - //for (size_t i = 0; i < packet_info.num_packet_words32; i++) { - //std::cout << str(boost::format("0x%08x") % pkt[i]) << " "; - //if (i % 2) { - //std::cout << std::endl; - //} - //} - //send the buffer over the interface - buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + + //update byte count + fc_cache->last_byte_count = fc_cache->total_bytes_consumed; + + return true; +} + +/*! Handle RX flow control ACK packets. + * + */ +static void handle_rx_flowctrl_ack( + boost::shared_ptr<rx_fc_cache_t> fc_cache, + const uint32_t *payload +) { + const uint32_t pkt_count = (fc_cache->endianness == ENDIANNESS_BIG) ? + uhd::ntohx<uint32_t>(payload[0]) : + uhd::wtohx<uint32_t>(payload[0]); + const uint32_t byte_count = (fc_cache->endianness == ENDIANNESS_BIG) ? + uhd::ntohx<uint32_t>(payload[1]) : + uhd::wtohx<uint32_t>(payload[1]); + if (fc_cache->total_bytes_consumed != byte_count) + { + UHD_LOGGER_DEBUG("device3") + << "oh noes: byte_count==" << byte_count + << " total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::endl + ; + } + fc_cache->total_bytes_consumed = byte_count; + fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? + + // This will send a flow control packet if there is a significant discrepancy + rx_flow_ctrl(fc_cache, nullptr); } /*********************************************************************** @@ -293,56 +344,51 @@ static void handle_rx_flowctrl( //! Stores the state of TX flow control struct tx_fc_cache_t { - tx_fc_cache_t(size_t capacity): + tx_fc_cache_t(uint32_t capacity): + last_byte_ack(0), last_seq_ack(0), - space(capacity) {} - - size_t last_seq_ack; - size_t space; + byte_count(0), + pkt_count(0), + window_size(capacity), + fc_ack_seqnum(0), + fc_received(false) {} + + uint32_t last_byte_ack; + uint32_t last_seq_ack; + uint32_t byte_count; + uint32_t pkt_count; + uint32_t window_size; + uint32_t fc_ack_seqnum; + bool fc_received; }; -/*! Return the size of the flow control window in packets. - * - * If the return value of this function is F, the last tx'd packet - * has index N and the last ack'd packet has index M, the amount of - * FC credit we have is C = F + M - N (i.e. we can send C more packets - * before getting another ack). - * - * Note: If `send_buff_size` is set in \p tx_hints, this will - * override hw_buff_size_. - */ -static size_t get_tx_flow_control_window( - size_t pkt_size, - const double hw_buff_size_, - const device_addr_t& tx_hints -) { - double hw_buff_size = tx_hints.cast<double>("send_buff_size", hw_buff_size_); - size_t window_in_pkts = (static_cast<size_t>(hw_buff_size) / pkt_size); - if (window_in_pkts == 0) { - throw uhd::value_error("send_buff_size must be larger than the send_frame_size."); - } - return window_in_pkts; -} - static bool tx_flow_ctrl( boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr async_xport, - uint32_t (*endian_conv)(uint32_t), + zero_copy_if::sptr xport, + uint32_t (*to_host)(uint32_t), void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), - managed_buffer::sptr + managed_buffer::sptr buff ) { while (true) { // If there is space - if (fc_cache->space) + if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) { // All is good - packet will be sent - fc_cache->space--; + fc_cache->byte_count += buff->size(); + // Round up to nearest word + if (fc_cache->byte_count % DEVICE3_LINE_SIZE) + { + fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); + } + fc_cache->pkt_count++; return true; } // Look for a flow control message to update the space available in the buffer. - managed_recv_buffer::sptr buff = async_xport->get_recv_buff(); + // A minimal timeout is used because larger timeouts can cause the thread to be + // scheduled out for too long at high data rates and result in underruns. + managed_recv_buffer::sptr buff = xport->get_recv_buff(0.000001); if (buff) { vrt::if_packet_info_t if_packet_info; @@ -353,28 +399,94 @@ static bool tx_flow_ctrl( } catch(const std::exception &ex) { - UHD_LOG_ERROR("TX FLOW CTRL", "Error unpacking async flow control packet: " << ex.what()); + UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; continue; } if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC) { - UHD_LOG_ERROR( - "TX FLOW CTRL", - "Unexpected packet type received by flow control handler: " << if_packet_info.packet_type - ); + UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; continue; } + const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; + const uint32_t pkt_count = to_host(payload[0]); + const uint32_t byte_count = to_host(payload[1]); + // update the amount of space - size_t seq_ack = endian_conv(packet_buff[if_packet_info.num_header_words32+1]); - fc_cache->space += (seq_ack - fc_cache->last_seq_ack) & HW_SEQ_NUM_MASK; - fc_cache->last_seq_ack = seq_ack; + fc_cache->last_byte_ack = byte_count; + fc_cache->last_seq_ack = pkt_count; + + fc_cache->fc_received = true; } } return false; } +static void tx_flow_ctrl_ack( + boost::shared_ptr<tx_fc_cache_t> fc_cache, + zero_copy_if::sptr send_xport, + sid_t send_sid, + uint32_t (*from_host)(uint32_t), + void (*pack)(uint32_t *packet_buff, vrt::if_packet_info_t &) +) { + if (not fc_cache->fc_received) + { + return; + } + + // Time to send a flow control ACK packet + // Get a send buffer + managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); + if (not fc_buff) { + UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; + return; + } + uint32_t *pkt = fc_buff->cast<uint32_t *>(); + + // Load packet info + vrt::if_packet_info_t packet_info; + packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK; + packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; + packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); + packet_info.packet_count = fc_cache->fc_ack_seqnum++; + packet_info.sob = false; + packet_info.eob = true; + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = send_sid.get(); + packet_info.has_sid = true; + packet_info.has_cid = false; + packet_info.has_tsi = false; + packet_info.has_tsf = false; + packet_info.has_tlr = false; + + // Load Header: + pack(pkt, packet_info); + + // Update counters to include this packet + size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); + fc_cache->byte_count += fc_ack_pkt_size; + // Round up to nearest word + if (fc_cache->byte_count % DEVICE3_LINE_SIZE) + { + fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); + } + fc_cache->pkt_count++; + + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + from_host(fc_cache->pkt_count); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + from_host(fc_cache->byte_count); + + // Send the buffer over the interface + fc_buff->commit(fc_ack_pkt_size); + + // Reset for next FC + fc_cache->fc_received = false; +} + /*********************************************************************** * TX Async Message Functions **********************************************************************/ @@ -445,13 +557,10 @@ static void handle_tx_async_msgs( async_info->stream_channel ); - // Filter out any flow control messages and cache the rest + // Filter out any flow control messages and cache the rest if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - UHD_LOG_ERROR( - "TX ASYNC", - "Unexpected flow control message found in async message handling" - ); + UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; } else { async_info->async_queue->push_with_pop_on_full(metadata); metadata.channel = async_info->device_channel; @@ -474,8 +583,8 @@ void device3_impl::update_rx_streamers(double /* rate */) { for(const std::string &block_id: _rx_streamers.keys()) { UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id; - boost::shared_ptr<sph::recv_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[block_id].lock()); + boost::shared_ptr<device3_recv_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<device3_recv_packet_streamer>(_rx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -511,12 +620,15 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) // Note: All 'args.args' are merged into chan_args now. // II. Iterate over all channels - boost::shared_ptr<sph::recv_packet_streamer> my_streamer; + boost::shared_ptr<device3_recv_packet_streamer> my_streamer; // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each upstream block. rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { + for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) + { + // First, configure blocks and create transport + // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ; @@ -549,7 +661,36 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); - UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size ; + UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size; + + // Configure the block + // Flow control setup + const size_t pkt_size = xport.recv->get_recv_frame_size(); + // Leave one pkt_size space for overrun packets - TODO make this obsolete + const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size; + const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); + UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window; + blk_ctrl->configure_flow_control_out( + true, + fc_window, + rx_hints.cast<size_t>("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 + block_port + ); + + // Add flow control transport + boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); + fc_cache->sid = xport.send_sid; + fc_cache->xport = xport.send; + fc_cache->endianness = xport.endianness; + fc_cache->interval = fc_handle_window; + xport.recv = zero_copy_flow_ctrl::make + ( + xport.recv, + NULL, + [=](managed_buffer::sptr buff) { + return rx_flow_ctrl(fc_cache, buff); + } + ); // Configure the block // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. @@ -558,8 +699,10 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) // other settings. blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x1, block_port); blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x0, block_port); + // Configure routing for data blk_ctrl->set_destination(xport.send_sid.get_src(), block_port); + // Configure routing for responses blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ; @@ -570,17 +713,24 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); } - // To calculate the max number of samples per packet, we assume the maximum header length - // to avoid fragmentation should the entire header be used. - const size_t bpp = xport.recv->get_recv_frame_size() - stream_options.rx_max_len_hdr; // bytes per packet - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet - UHD_RX_STREAMER_LOG() << "spp == " << spp ; + // Second, configure the streamer //make the new streamer given the samples per packet if (not my_streamer) - my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); - my_streamer->resize(chan_list.size()); + { + // To calculate the max number of samples per packet, we assume the maximum header length + // to avoid fragmentation should the entire header be used. + const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet + const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item + const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet + UHD_RX_STREAMER_LOG() << "spp == " << spp ; + + my_streamer = boost::make_shared<device3_recv_packet_streamer>( + spp, + recv_terminator, + xport); + my_streamer->resize(chan_list.size()); + } //init some streamer stuff std::string conv_endianness; @@ -600,72 +750,51 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) id.num_outputs = 1; my_streamer->set_converter(id); - //flow control setup - const size_t pkt_size = spp * bpi + stream_options.rx_max_len_hdr; - const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints); - const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); - UHD_RX_STREAMER_LOG()<< "Flow Control Window (minus one) = " << fc_window-1 << ", Flow Control Handler Window = " << fc_handle_window ; - blk_ctrl->configure_flow_control_out( - fc_window-1, // Leave one space for overrun packets TODO make this obsolete - block_port + // Give the streamer a functor to handle flow control ACK messages + my_streamer->set_xport_handle_flowctrl_ack( + stream_i, + [=](const uint32_t *payload) { + handle_rx_flowctrl_ack( + fc_cache, + payload + ); + } ); //Give the streamer a functor to get the recv_buffer - //bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&zero_copy_if::get_recv_buff, xport.recv, _1), + [=](double timeout) {return xport.recv->get_recv_buff(timeout);}, true /*flush*/ ); //Give the streamer a functor to handle overruns //bind requires a weak_ptr to break the a streamer->streamer circular dependency //Using "this" is OK because we know that this device3_impl will outlive the streamer + boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer); my_streamer->set_overflow_handler( - stream_i, - boost::bind( - &uhd::rfnoc::rx_stream_terminator::handle_overrun, recv_terminator, - boost::weak_ptr<uhd::rx_streamer>(my_streamer), stream_i - ) - ); - - //Give the streamer a functor to send flow control messages - //handle_rx_flowctrl is static and has no lifetime issues - boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); - my_streamer->set_xport_handle_flowctrl( - stream_i, boost::bind( - &handle_rx_flowctrl, - xport.send_sid, - xport.send, - xport.endianness, - fc_cache, - _1 - ), - fc_handle_window, - true/*init*/ + stream_i, + [=]() { + recv_terminator->handle_overrun( + weak_ptr, + stream_i); + } ); //Give the streamer a functor issue stream cmd - //bind requires a shared pointer to add a streamer->framer lifetime dependency my_streamer->set_issue_stream_cmd( stream_i, - boost::bind(&uhd::rfnoc::source_block_ctrl_base::issue_stream_cmd, blk_ctrl, _1, block_port) + [=](const stream_cmd_t& stream_cmd) {blk_ctrl->issue_stream_cmd(stream_cmd, block_port);} ); - - // Tell the streamer which SID is valid for this channel - my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); } - // Connect the terminator to the streamer - my_streamer->set_terminator(recv_terminator); - // Notify all blocks in this chain that they are connected to an active streamer recv_terminator->set_rx_streamer(true, 0); // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. // Note that we store the streamer only once, and use its terminator's // ID to do so. - _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<sph::recv_packet_streamer>(my_streamer); + _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<uhd::rx_streamer>(my_streamer); // Sets tick rate, samp rate and scaling on this streamer. // A registered terminator is required to do this. @@ -682,8 +811,8 @@ void device3_impl::update_tx_streamers(double /* rate */) { for(const std::string &block_id: _tx_streamers.keys()) { UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id; - boost::shared_ptr<sph::send_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[block_id].lock()); + boost::shared_ptr<device3_send_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<device3_send_packet_streamer>(_tx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -705,20 +834,6 @@ void device3_impl::update_tx_streamers(double /* rate */) } } -// This class manages the lifetime of the TX async message handler task and transports -class device3_send_packet_streamer : public sph::send_packet_streamer -{ -public: - device3_send_packet_streamer(const size_t max_num_samps) : sph::send_packet_streamer(max_num_samps) {}; - ~device3_send_packet_streamer() { - _tx_async_msg_task.reset(); // Make sure the async task is destroyed before the transports - }; - - both_xports_t _xport; - both_xports_t _async_xport; - task::sptr _tx_async_msg_task; -}; - tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); @@ -739,7 +854,10 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // There is only one terminator. If the streamer has multiple channels, // it will be connected to each downstream block. rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { + for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) + { + // First, configure the downstream blocks and create the transports + // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; // Update args so args.args is always valid for this particular channel: @@ -768,87 +886,42 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // Setup the dsp transport hints device_addr_t tx_hints = get_tx_hints(mb_index); - //allocate sid and create transport + // Allocate sid and create transport uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); - UHD_TX_STREAMER_LOG() << std::hex << "[TX Streamer] data_sid = " << xport.send_sid << std::dec << std::endl; - - // To calculate the max number of samples per packet, we assume the maximum header length - // to avoid fragmentation should the entire header be used. - const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet - UHD_TX_STREAMER_LOG() << "spp == " << spp ; - - //make the new streamer given the samples per packet - if (not my_streamer) - my_streamer = boost::make_shared<device3_send_packet_streamer>(spp); - my_streamer->resize(chan_list.size()); - my_streamer->_xport = xport; - my_streamer->_async_xport = async_xport; - - //init some streamer stuff - std::string conv_endianness; - if (xport.endianness == ENDIANNESS_BIG) { - my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); - conv_endianness = "be"; - } else { - my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); - conv_endianness = "le"; - } - - //set the converter - uhd::convert::id_type id; - id.input_format = args.cpu_format; - id.num_inputs = 1; - id.output_format = args.otw_format + "_item32_" + conv_endianness; - id.num_outputs = 1; - my_streamer->set_converter(id); + UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; - //flow control setup - const size_t pkt_size = spp * bpi + stream_options.tx_max_len_hdr; - // For flow control, this value is used to determine the window size in *packets* - size_t fc_window = get_tx_flow_control_window( - pkt_size, // This is the maximum packet size - blk_ctrl->get_fifo_size(block_port), - tx_hints // This can override the value reported by the block! - ); + // Configure flow control + // This disables the FC module's output, do this before configuring flow control + blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); + blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); + // Configure flow control on downstream block + const size_t fc_window = tx_hints.cast<size_t>("send_buff_size", blk_ctrl->get_fifo_size(block_port)); const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.tx_fc_response_freq); UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ; blk_ctrl->configure_flow_control_in( - stream_options.tx_fc_response_cycles, - fc_handle_window, /*pkts*/ + fc_handle_window, /*bytes*/ block_port ); - - boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); - async_tx_info->stream_channel = args.channels[stream_i]; - async_tx_info->device_channel = mb_index; - async_tx_info->async_queue = async_md; - async_tx_info->old_async_queue = _async_md; - - boost::function<double(void)> tick_rate_retriever = boost::bind( - &rfnoc::tick_node_ctrl::get_tick_rate, - send_terminator, - std::set< rfnoc::node_ctrl_base::sptr >() // Need to specify default args with bind - ); - - my_streamer->_tx_async_msg_task = task::make( - boost::bind( - &handle_tx_async_msgs, - async_tx_info, - my_streamer->_async_xport.recv, - xport.endianness, - tick_rate_retriever - ), - "tx_async_msgs_task" + // Add flow control transport + boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); + xport.send = zero_copy_flow_ctrl::make( + xport.send, + [=](managed_buffer::sptr buff) { + return tx_flow_ctrl( + fc_cache, + xport.recv, + (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), + (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), + buff); + }, + NULL ); - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); + // Configure return path for async messages + blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ; // FIXME: Once there is a better way to map the radio block and port @@ -863,7 +936,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) { if (node->get_block_id() == radio_id) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), radio_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port); } } } else { @@ -876,39 +949,96 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); } } - // Add flow control - boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); - my_streamer->_xport.send = zero_copy_flow_ctrl::make( - my_streamer->_xport.send, - boost::bind( - &tx_flow_ctrl, - fc_cache, - my_streamer->_xport.recv, - (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), - (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), - _1), - NULL); + // Second, configure the streamer now that the blocks and transports are configured + + //make the new streamer given the samples per packet + if (not my_streamer) + { + // To calculate the max number of samples per packet, we assume the maximum header length + // to avoid fragmentation should the entire header be used. + const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; + const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item + const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet + UHD_TX_STREAMER_LOG() << "spp == " << spp ; + + my_streamer = boost::make_shared<device3_send_packet_streamer>( + spp, + send_terminator, + xport, + async_xport); + my_streamer->resize(chan_list.size()); + } + + //init some streamer stuff + std::string conv_endianness; + if (xport.endianness == ENDIANNESS_BIG) { + my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); + conv_endianness = "be"; + } else { + my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); + conv_endianness = "le"; + } + + //set the converter + uhd::convert::id_type id; + id.input_format = args.cpu_format; + id.num_inputs = 1; + id.output_format = args.otw_format + "_item32_" + conv_endianness; + id.num_outputs = 1; + my_streamer->set_converter(id); + + boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); + async_tx_info->stream_channel = args.channels[stream_i]; + async_tx_info->device_channel = mb_index; + async_tx_info->async_queue = async_md; + async_tx_info->old_async_queue = _async_md; + + task::sptr async_task = task::make( + [=]() { + handle_tx_async_msgs( + async_tx_info, + async_xport.recv, + xport.endianness, + [=]() {return send_terminator->get_tick_rate();} + ); + } + ); + my_streamer->add_async_msg_task(async_task); //Give the streamer a functor to get the send buffer my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&zero_copy_if::get_send_buff, my_streamer->_xport.send, _1) + [=](const double timeout) { + return xport.send->get_send_buff(timeout); + } ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( - boost::bind(&async_md_type::pop_with_timed_wait, async_md, _1, _2) + [=](uhd::async_metadata_t& md, const double timeout) { + return async_md->pop_with_timed_wait(md, timeout); + } ); my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); // CHDR does not support trailers my_streamer->set_enable_trailer(false); - } - // Connect the terminator to the streamer - my_streamer->set_terminator(send_terminator); + my_streamer->set_xport_chan_post_send_cb( + stream_i, + [=]() { + tx_flow_ctrl_ack( + fc_cache, + xport.send, + xport.send_sid, + (xport.endianness == ENDIANNESS_BIG ? uhd::htonx<uint32_t> : uhd::htowx<uint32_t>), + (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_pack_be : vrt::chdr::if_hdr_pack_le) + ); + } + ); + } // Notify all blocks in this chain that they are connected to an active streamer send_terminator->set_tx_streamer(true, 0); @@ -916,7 +1046,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. // Note that we store the streamer only once, and use its terminator's // ID to do so. - _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<sph::send_packet_streamer>(my_streamer); + _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<uhd::tx_streamer>(my_streamer); // Sets tick rate, samp rate and scaling on this streamer // A registered terminator is required to do this. |