diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/include/uhdlib/rfnoc/graph_impl.hpp | 2 | ||||
| -rw-r--r-- | host/lib/rfnoc/ctrl_iface.cpp | 1 | ||||
| -rw-r--r-- | host/lib/rfnoc/graph_impl.cpp | 31 | ||||
| -rw-r--r-- | host/lib/rfnoc/legacy_compat.cpp | 4 | ||||
| -rw-r--r-- | host/lib/rfnoc/sink_block_ctrl_base.cpp | 19 | ||||
| -rw-r--r-- | host/lib/rfnoc/source_block_ctrl_base.cpp | 33 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/transport/chdr.cpp | 7 | ||||
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 143 | ||||
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 41 | ||||
| -rw-r--r-- | host/lib/transport/zero_copy_flow_ctrl.cpp | 5 | ||||
| -rw-r--r-- | host/lib/usrp/device3/device3_impl.hpp | 81 | ||||
| -rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 686 | ||||
| -rw-r--r-- | host/lib/usrp/x300/x300_impl.cpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/x300/x300_io_impl.cpp | 14 | 
15 files changed, 629 insertions, 443 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/graph_impl.hpp b/host/lib/include/uhdlib/rfnoc/graph_impl.hpp index 182befbf4..404369618 100644 --- a/host/lib/include/uhdlib/rfnoc/graph_impl.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph_impl.hpp @@ -55,7 +55,7 @@ public:      void connect_sink(              const block_id_t &sink_block,              const size_t dst_block_port, -            const size_t pkts_per_ack +            const size_t bytes_per_ack      );      /************************************************************************ diff --git a/host/lib/rfnoc/ctrl_iface.cpp b/host/lib/rfnoc/ctrl_iface.cpp index 11dfa7aaa..29e18fc3a 100644 --- a/host/lib/rfnoc/ctrl_iface.cpp +++ b/host/lib/rfnoc/ctrl_iface.cpp @@ -108,6 +108,7 @@ private:          packet_info.tsf = timestamp;          packet_info.sob = false;          packet_info.eob = false; +        packet_info.fc_ack = false;          packet_info.sid = _xports.send_sid;          packet_info.has_sid = true;          packet_info.has_cid = false; diff --git a/host/lib/rfnoc/graph_impl.cpp b/host/lib/rfnoc/graph_impl.cpp index c361ea8f2..a2e0e64f4 100644 --- a/host/lib/rfnoc/graph_impl.cpp +++ b/host/lib/rfnoc/graph_impl.cpp @@ -122,9 +122,9 @@ void graph_impl::connect(          UHD_LOGGER_WARNING("RFNOC") << "Assuming max packet size for " << src->get_block_id() ;          pkt_size = uhd::rfnoc::MAX_PACKET_SIZE;      } -    // FC window (in packets) depends on FIFO size...          ...and packet size. -    size_t buf_size_pkts = dst->get_fifo_size(dst_block_port) / pkt_size; -    if (buf_size_pkts == 0) { +    // FC window (in bytes) depends on FIFO size. +    size_t buf_size_bytes = dst->get_fifo_size(dst_block_port); +    if (buf_size_bytes < pkt_size) {          throw uhd::runtime_error(str(              boost::format("Input FIFO for block %s is too small (%d kiB) for packets of size %d kiB\n"                            "coming from block %s.") @@ -132,19 +132,20 @@ void graph_impl::connect(              % (pkt_size / 1024) % src->get_block_id().get()          ));      } -    src->configure_flow_control_out(buf_size_pkts, src_block_port); -    // On the same crossbar, use lots of FC packets -    size_t pkts_per_ack = std::min( -            uhd::rfnoc::DEFAULT_FC_XBAR_PKTS_PER_ACK, -            buf_size_pkts - 1 +    src->configure_flow_control_out( +            true, /* enable output */ +            buf_size_bytes, +            0, /* no packet limit. We need to revisit this at some point. */ +            src_block_port      ); +    // On the same crossbar, use lots of FC packets +    size_t bytes_per_response = std::ceil<size_t>(buf_size_bytes / uhd::rfnoc::DEFAULT_FC_XBAR_RESPONSE_FREQ);      // Over the network, use less or we'd flood the transport      if (sid.get_src_addr() != sid.get_dst_addr()) { -        pkts_per_ack = std::max<size_t>(buf_size_pkts / uhd::rfnoc::DEFAULT_FC_TX_RESPONSE_FREQ, 1); +        bytes_per_response = std::ceil<size_t>(buf_size_bytes / uhd::rfnoc::DEFAULT_FC_TX_RESPONSE_FREQ);      }      dst->configure_flow_control_in( -            0, // Default to not use cycles -            pkts_per_ack, +            bytes_per_response,              dst_block_port      ); @@ -209,7 +210,7 @@ void graph_impl::connect_src(  void graph_impl::connect_sink(          const block_id_t &sink_block,          const size_t dst_block_port, -        const size_t pkts_per_ack +        const size_t bytes_per_ack  ) {      device3::sptr device_ptr = _device_ptr.lock();      if (not device_ptr) { @@ -222,11 +223,7 @@ void graph_impl::connect_sink(      uhd::rfnoc::sink_block_ctrl_base::sptr dst =          device_ptr->get_block_ctrl<rfnoc::sink_block_ctrl_base>(sink_block); -    dst->configure_flow_control_in( -            0, -            pkts_per_ack, -            dst_block_port -    ); +    dst->configure_flow_control_in(bytes_per_ack, dst_block_port);      /********************************************************************       * 5. Configure error policy diff --git a/host/lib/rfnoc/legacy_compat.cpp b/host/lib/rfnoc/legacy_compat.cpp index 7e9eec20e..c5bd7891a 100644 --- a/host/lib/rfnoc/legacy_compat.cpp +++ b/host/lib/rfnoc/legacy_compat.cpp @@ -158,10 +158,10 @@ public:              UHD_LOGGER_WARNING("RFNOC") << "[legacy_compat] No DUCs detected. You will only be able to transmit at the radio frontend rate." ;          }          if (args.has_key("skip_dram")) { -            UHD_LEGACY_LOG() << "[legacy_compat] Skipping DRAM by user request." << std::endl; +            UHD_LEGACY_LOG() << "[legacy_compat] Skipping DRAM by user request." ;          }          if (args.has_key("skip_sram")) { -            UHD_LEGACY_LOG() << "[legacy_compat] Skipping SRAM by user request." << std::endl; +            UHD_LEGACY_LOG() << "[legacy_compat] Skipping SRAM by user request.";          }          if (not _has_dmafifo and not _has_sramfifo) {              UHD_LOGGER_WARNING("RFNOC") << "[legacy_compat] No FIFO detected. Higher transmit rates may encounter errors."; diff --git a/host/lib/rfnoc/sink_block_ctrl_base.cpp b/host/lib/rfnoc/sink_block_ctrl_base.cpp index b620f917f..1562e134b 100644 --- a/host/lib/rfnoc/sink_block_ctrl_base.cpp +++ b/host/lib/rfnoc/sink_block_ctrl_base.cpp @@ -52,22 +52,17 @@ size_t sink_block_ctrl_base::get_fifo_size(size_t block_port) const {  }  void sink_block_ctrl_base::configure_flow_control_in( -        size_t cycles, -        size_t packets, +        size_t bytes,          size_t block_port  ) { -    UHD_RFNOC_BLOCK_TRACE() << boost::format("sink_block_ctrl_base::configure_flow_control_in(cycles=%d, packets=%d)") % cycles % packets ; -    uint32_t cycles_word = 0; -    if (cycles) { -        cycles_word = (1<<31) | cycles; -    } -    sr_write(SR_FLOW_CTRL_CYCS_PER_ACK, cycles_word, block_port); +    UHD_RFNOC_BLOCK_TRACE() << boost::format("sink_block_ctrl_base::configure_flow_control_in(bytes=%d)") % bytes; -    uint32_t packets_word = 0; -    if (packets) { -        packets_word = (1<<31) | packets; +    uint32_t bytes_word = 0; +    if (bytes) { +        // Bit 32 enables flow control +        bytes_word = (1<<31) | bytes;      } -    sr_write(SR_FLOW_CTRL_PKTS_PER_ACK, packets_word, block_port); +    sr_write(SR_FLOW_CTRL_BYTES_PER_ACK, bytes_word, block_port);  }  void sink_block_ctrl_base::set_error_policy( diff --git a/host/lib/rfnoc/source_block_ctrl_base.cpp b/host/lib/rfnoc/source_block_ctrl_base.cpp index 5ede899cb..afec6ba1b 100644 --- a/host/lib/rfnoc/source_block_ctrl_base.cpp +++ b/host/lib/rfnoc/source_block_ctrl_base.cpp @@ -77,25 +77,27 @@ void source_block_ctrl_base::set_destination(  }  void source_block_ctrl_base::configure_flow_control_out( -            size_t buf_size_pkts, +            bool enable_fc_output, +            size_t buf_size_bytes, +            size_t pkt_limit,              size_t block_port,              UHD_UNUSED(const uhd::sid_t &sid)  ) { -    UHD_RFNOC_BLOCK_TRACE() << "source_block_ctrl_base::configure_flow_control_out() buf_size_pkts==" << buf_size_pkts ; -    if (buf_size_pkts < 2) { +    UHD_RFNOC_BLOCK_TRACE() << "source_block_ctrl_base::configure_flow_control_out() buf_size_bytes==" << buf_size_bytes; +    if (buf_size_bytes == 0) {        throw uhd::runtime_error(str( -              boost::format("Invalid window size %d for block %s. Window size must at least be 2.") -              % buf_size_pkts % unique_id() +              boost::format("Invalid window size %d for block %s. Window size cannot be 0 bytes.") +              % buf_size_bytes % unique_id()        ));      } -    //Disable the window and let all upstream data flush out +    //Disable flow control entirely and let all upstream data flush out      //We need to do this every time the window is changed because      //a) We don't know what state the flow-control module was left in      //   in the previous run (it should still be enabled)      //b) Changing the window size where data is buffered upstream may      //   result in stale packets entering the stream. -    sr_write(SR_FLOW_CTRL_WINDOW_EN, 0, block_port); +    sr_write(SR_FLOW_CTRL_EN, 0, block_port);      //Wait for data to flush out.      //In the FPGA we are guaranteed that all buffered packets are more-or-less consecutive. @@ -107,13 +109,24 @@ void source_block_ctrl_base::configure_flow_control_out(      //      module is done flushing.      std::this_thread::sleep_for(std::chrono::milliseconds(1)); +    //Enable source flow control module and conditionally enable byte based and/or packet count +    //based flow control +    const bool enable_byte_fc = (buf_size_bytes != 0); +    const bool enable_pkt_cnt_fc = (pkt_limit != 0); +    const size_t config = enable_fc_output + (enable_byte_fc << 1) + (enable_pkt_cnt_fc << 2); +      //Resize the FC window.      //Precondition: No data can be buffered upstream. -    sr_write(SR_FLOW_CTRL_WINDOW_SIZE, buf_size_pkts, block_port); +    if (enable_byte_fc) { +        sr_write(SR_FLOW_CTRL_WINDOW_SIZE, buf_size_bytes, block_port); +    } +    if (enable_pkt_cnt_fc) { +        sr_write(SR_FLOW_CTRL_PKT_LIMIT, pkt_limit, block_port); +    }      //Enable the FC window. -    //Precondition: The window size must be set. -    sr_write(SR_FLOW_CTRL_WINDOW_EN, (buf_size_pkts != 0), block_port); +    //Precondition: The window size and/or packet limit must be set. +    sr_write(SR_FLOW_CTRL_EN, config, block_port);  }  /*********************************************************************** diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index ae903d956..15771697a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -112,6 +112,7 @@ LIBUHD_PYTHON_GEN_SOURCE(  )  LIBUHD_APPEND_SOURCES( +    ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp diff --git a/host/lib/transport/chdr.cpp b/host/lib/transport/chdr.cpp index 48263f57e..36f380d62 100644 --- a/host/lib/transport/chdr.cpp +++ b/host/lib/transport/chdr.cpp @@ -23,6 +23,7 @@ using namespace uhd::transport::vrt;  static const uint32_t HDR_FLAG_TSF = (1 << 29);  static const uint32_t HDR_FLAG_EOB = (1 << 28);  static const uint32_t HDR_FLAG_ERROR = (1 << 28); +static const uint32_t HDR_FLAG_FCACK = (1 << 28);  /***************************************************************************/  /* Packing                                                                 */ @@ -45,8 +46,8 @@ UHD_INLINE uint32_t _hdr_pack_chdr(          | (if_packet_info.packet_type << 30)          // 1 Bit: Has time          | (if_packet_info.has_tsf ? HDR_FLAG_TSF : 0) -        // 1 Bit: EOB or Error -        | ((if_packet_info.eob or if_packet_info.error) ? HDR_FLAG_EOB : 0) +        // 1 Bit: EOB or Error or FC ACK +        | ((if_packet_info.eob or if_packet_info.error or if_packet_info.fc_ack) ? HDR_FLAG_EOB : 0)          // 12 Bits: Sequence number          | ((if_packet_info.packet_count & 0xFFF) << 16)          // 16 Bits: Total packet length @@ -111,6 +112,8 @@ UHD_INLINE void _hdr_unpack_chdr(                           && ((chdr & HDR_FLAG_EOB) > 0);      if_packet_info.error = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_RESP)                           && ((chdr & HDR_FLAG_ERROR) > 0); +    if_packet_info.fc_ack = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_FC) +                         && ((chdr & HDR_FLAG_FCACK) > 0);      if_packet_info.packet_count = (chdr >> 16) & 0xFFF;      // Set packet length variables diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 921bfdfa0..c962d40e6 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -12,9 +12,9 @@  #include <uhd/exception.hpp>  #include <uhd/convert.hpp>  #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp>  #include <uhd/utils/tasks.hpp>  #include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp>  #include <uhd/types/metadata.hpp>  #include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/transport/zero_copy.hpp> @@ -22,7 +22,6 @@  #include <boost/dynamic_bitset.hpp>  #include <boost/function.hpp>  #include <boost/format.hpp> -#include <boost/bind.hpp>  #include <boost/make_shared.hpp>  #include <iostream>  #include <vector> @@ -59,6 +58,7 @@ class recv_packet_handler{  public:      typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;      typedef boost::function<void(const size_t)> handle_flowctrl_type; +    typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type;      typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;      typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &);      //typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; @@ -102,33 +102,6 @@ public:          _header_offset_words32 = header_offset_words32;      } -    ////////////////// RFNOC /////////////////////////// -    //! Set the stream ID for a specific channel (or no SID) -    void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){ -        _props.at(xport_chan).has_sid = has_sid; -        _props.at(xport_chan).sid = sid; -    } - -    //! Get the stream ID for a specific channel (or zero if no SID) -    uint32_t get_xport_chan_sid(const size_t xport_chan) const { -        if (_props.at(xport_chan).has_sid) { -            return _props.at(xport_chan).sid; -        } else { -            return 0; -        } -    } - -    void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator) -    { -        _terminator = terminator; -    } - -    uhd::rfnoc::rx_stream_terminator::sptr get_terminator() -    { -        return _terminator; -    } -    ////////////////// RFNOC /////////////////////////// -      /*!       * Set the threshold for alignment failure.       * How many packets throw out before giving up? @@ -183,6 +156,13 @@ public:          if (do_init) handle_flowctrl(0);      } +    void set_xport_handle_flowctrl_ack( +            const size_t xport_chan, +            const handle_flowctrl_ack_type &handle_flowctrl_ack +    ) { +        _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack; +    } +      //! Set the conversion routine for all channels      void set_converter(const uhd::convert::id_type &id){          _num_outputs = id.num_outputs; @@ -211,12 +191,11 @@ public:      //! Overload call to issue stream commands      void issue_stream_cmd(const stream_cmd_t &stream_cmd)      { -        // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this. -        //if (stream_cmd.stream_now -                //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS -                //and _props.size() > 1) { -            //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); -        //} +        if (size() > 1 and stream_cmd.stream_now and +            stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) +        { +            throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align."); +        }          for (size_t i = 0; i < _props.size(); i++)          { @@ -307,11 +286,8 @@ private:          size_t packet_count;          handle_overflow_type handle_overflow;          handle_flowctrl_type handle_flowctrl; +        handle_flowctrl_ack_type handle_flowctrl_ack;          size_t fc_update_window; -	/////// RFNOC /////////// -        bool has_sid; -        uint32_t sid; -	/////// RFNOC ///////////      };      std::vector<xport_chan_props_type> _props;      size_t _num_outputs; @@ -340,6 +316,7 @@ private:          buffers_info_type(const size_t size):              std::vector<per_buffer_info_type>(size),              indexes_todo(size, true), +            alignment_time(0),              alignment_time_valid(false),              data_bytes_to_copy(0),              fragment_offset_in_samps(0) @@ -384,8 +361,6 @@ private:      int recvd_packets;      #endif -    uhd::rfnoc::rx_stream_terminator::sptr _terminator; -      /*******************************************************************       * Get and process a single packet from the transport:       * Receive a single packet at the given index. @@ -398,42 +373,58 @@ private:          per_buffer_info_type &curr_buffer_info,          double timeout      ){ -        //get a single packet from the transport layer          managed_recv_buffer::sptr &buff = curr_buffer_info.buff; -        buff = _props[index].get_buff(timeout); -        if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - -        #ifdef  ERROR_INJECT_DROPPED_PACKETS -        if (++recvd_packets > 1000) +        per_buffer_info_type &info = curr_buffer_info; +        while (1)          { -            recvd_packets = 0; -            buff.reset(); +            //get a single packet from the transport layer              buff = _props[index].get_buff(timeout);              if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; -        } -        #endif -        //bounds check before extract -        size_t num_packet_words32 = buff->size()/sizeof(uint32_t); -        if (num_packet_words32 <= _header_offset_words32){ -            throw std::runtime_error("recv buffer smaller than vrt packet offset"); -        } +            #ifdef  ERROR_INJECT_DROPPED_PACKETS +            if (++recvd_packets > 1000) +            { +                recvd_packets = 0; +                buff.reset(); +                buff = _props[index].get_buff(timeout); +                if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; +            } +            #endif -        //extract packet info -        per_buffer_info_type &info = curr_buffer_info; -        info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; -        info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; -        _vrt_unpacker(info.vrt_hdr, info.ifpi); -        info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true -        info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); - -        //handle flow control -        if (_props[index].handle_flowctrl) -        { -            if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) +            //bounds check before extract +            size_t num_packet_words32 = buff->size()/sizeof(uint32_t); +            if (num_packet_words32 <= _header_offset_words32){ +                throw std::runtime_error("recv buffer smaller than vrt packet offset"); +            } + +            //extract packet info +            info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; +            info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; +            _vrt_unpacker(info.vrt_hdr, info.ifpi); +            info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true +            info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + +            //handle flow control +            if (_props[index].handle_flowctrl)              { -                _props[index].handle_flowctrl(info.ifpi.packet_count); +                if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) +                { +                    _props[index].handle_flowctrl(info.ifpi.packet_count); +                }              } + +            //handle flow control ack +            if (info.ifpi.fc_ack){ +                if (_props[index].handle_flowctrl_ack) { +                    _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff)); +                } +                // Process the next packet +                buff.reset(); +                info.copy_buff = nullptr; +                continue; +            } + +            break;          }          //-------------------------------------------------------------- @@ -605,7 +596,7 @@ private:                      rx_metadata_t metadata = curr_info.metadata;                      _props[index].handle_overflow();                      curr_info.metadata = metadata; -                    UHD_LOG_FASTPATH("O") +                    UHD_LOG_FASTPATH("O");                  }                  curr_info[index].buff.reset();                  curr_info[index].copy_buff = nullptr; @@ -627,7 +618,7 @@ private:                      prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate);                  curr_info.metadata.out_of_sequence = true;                  curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; -                UHD_LOG_FASTPATH("D") +                UHD_LOG_FASTPATH("D");                  return;              } @@ -635,10 +626,10 @@ private:              //too many iterations: detect alignment failure              if (iterations++ > _alignment_failure_threshold){                  UHD_LOGGER_ERROR("STREAMER") << boost::format( -                    "The receive packet handler failed to time-align packets. " -                    "%u received packets were processed by the handler. " -                    "However, a timestamp match could not be determined." -                    ) % iterations; +                    "The receive packet handler failed to time-align packets.\n" +                    "%u received packets were processed by the handler.\n" +                    "However, a timestamp match could not be determined.\n" +                ) % iterations << std::endl;                  std::swap(curr_info, next_info); //save progress from curr -> next                  curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;                  _props[index].handle_overflow(); @@ -659,7 +650,7 @@ private:      }      /******************************************************************* -     * Receive a single packet: +     * Receive a single packet on all channels       * Handles fragmentation, messages, errors, and copy-conversion.       * When no fragments are available, call the get aligned buffers.       * Then copy-convert available data into the user's IO buffers. diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index e824ef4e9..5cba570a7 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -12,7 +12,6 @@  #include <uhd/exception.hpp>  #include <uhd/convert.hpp>  #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp>  #include <uhd/utils/tasks.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/thread.hpp> @@ -49,6 +48,7 @@ namespace sph {  class send_packet_handler{  public:      typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; +    typedef boost::function<void(void)> post_send_cb_type;      typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;      typedef void(*vrt_packer_type)(uint32_t *, vrt::if_packet_info_t &);      //typedef boost::function<void(uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; @@ -93,27 +93,6 @@ public:          _props.at(xport_chan).sid = sid;      } -    ///////// RFNOC /////////////////// -    //! Get the stream ID for a specific channel (or zero if no SID) -    uint32_t get_xport_chan_sid(const size_t xport_chan) const { -        if (_props.at(xport_chan).has_sid) { -            return _props.at(xport_chan).sid; -        } else { -            return 0; -        } -    } - -    void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator) -    { -        _terminator = terminator; -    } - -    uhd::rfnoc::tx_stream_terminator::sptr get_terminator() -    { -        return _terminator; -    } -    ///////// RFNOC /////////////////// -      void set_enable_trailer(const bool enable)      {          _has_tlr = enable; @@ -138,6 +117,15 @@ public:          _props.at(xport_chan).get_buff = get_buff;      } +    /*! +     * Set the callback function for post-send. +     * \param xport_chan which transport channel +     * \param cb post-send callback +     */ +    void set_xport_chan_post_send_cb(const size_t xport_chan, const post_send_cb_type &cb){ +        _props.at(xport_chan).go_postal = cb; +    } +      //! Set the conversion routine for all channels      void set_converter(const uhd::convert::id_type &id){          _num_inputs = id.num_inputs; @@ -198,6 +186,7 @@ public:          if_packet_info.tsf     = metadata.time_spec.to_ticks(_tick_rate);          if_packet_info.sob     = metadata.start_of_burst;          if_packet_info.eob     = metadata.end_of_burst; +        if_packet_info.fc_ack  = false; //This is a data packet          /*           * Metadata is cached when we get a send requesting a start of burst with no samples. @@ -291,6 +280,7 @@ private:      struct xport_chan_props_type{          xport_chan_props_type(void):has_sid(false),sid(0){}          get_buff_type get_buff; +        post_send_cb_type go_postal;          bool has_sid;          uint32_t sid;          managed_send_buffer::sptr buff; @@ -308,8 +298,6 @@ private:      bool _cached_metadata;      uhd::tx_metadata_t _metadata_cache; -    uhd::rfnoc::tx_stream_terminator::sptr _terminator; -  #ifdef UHD_TXRX_DEBUG_PRINTS      struct dbg_send_stat_t {          dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate): @@ -428,6 +416,11 @@ private:          const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;          buff->commit(num_vita_words32*sizeof(uint32_t));          buff.reset(); //effectively a release + +        if (_props[index].go_postal) +        { +            _props[index].go_postal(); +        }      }      //! Shared variables for the worker threads diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp index 709b9e981..25be35569 100644 --- a/host/lib/transport/zero_copy_flow_ctrl.cpp +++ b/host/lib/transport/zero_copy_flow_ctrl.cpp @@ -65,7 +65,7 @@ public:      zero_copy_flow_ctrl_mrb(          flow_ctrl_func flow_ctrl      ) : -        _mb(nullptr), +        _mb(NULL),          _flow_ctrl(flow_ctrl)      {          /* NOP */ @@ -80,8 +80,6 @@ public:      {          if (_mb)          { -            _mb->commit(size()); -            while (_flow_ctrl and not _flow_ctrl(_mb)) {}              _mb.reset();          }      } @@ -89,6 +87,7 @@ public:      UHD_INLINE sptr get(sptr &mb)      {          _mb = mb; +        while (_flow_ctrl and not _flow_ctrl(_mb)) {}          return make(this, _mb->cast<void *>(), _mb->size());      } diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index 8ecb1f72b..580268e4a 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -21,6 +21,10 @@  #include <uhd/types/direction.hpp>  #include <uhd/utils/tasks.hpp>  #include <uhd/device3.hpp> +#include "../../transport/super_send_packet_handler.hpp" +#include "../../transport/super_recv_packet_handler.hpp" +#include <uhdlib/rfnoc/tx_stream_terminator.hpp> +#include <uhdlib/rfnoc/rx_stream_terminator.hpp>  #include <uhdlib/rfnoc/xports.hpp>  namespace uhd { namespace usrp { @@ -30,11 +34,83 @@ namespace uhd { namespace usrp {   **********************************************************************/  static const size_t DEVICE3_RX_FC_REQUEST_FREQ         = 32;    //per flow-control window  static const size_t DEVICE3_TX_FC_RESPONSE_FREQ        = 8; -static const size_t DEVICE3_TX_FC_RESPONSE_CYCLES      = 0;     // Cycles: Off. +static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32   = 2; +static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET     = 0; +static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET       = 1; +static const size_t DEVICE3_LINE_SIZE                  = 8;  static const size_t DEVICE3_TX_MAX_HDR_LEN             = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t);    // Bytes  static const size_t DEVICE3_RX_MAX_HDR_LEN             = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t);    // Bytes +// This class manages the lifetime of the TX async message handler task, transports, and terminator +class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer +{ +public: +    device3_send_packet_streamer( +            const size_t max_num_samps, +            const uhd::rfnoc::tx_stream_terminator::sptr terminator, +            const both_xports_t data_xport, +            const both_xports_t async_msg_xport +    ) : +        uhd::transport::sph::send_packet_streamer(max_num_samps), +        _terminator(terminator), +        _data_xport(data_xport), +        _async_msg_xport(async_msg_xport) +    {}; + +    ~device3_send_packet_streamer() +    { +        // Make sure the async task is destroyed before the transports +        _tx_async_msg_tasks.clear(); +    }; + +    uhd::rfnoc::tx_stream_terminator::sptr get_terminator() +    { +        return _terminator; +    } + +    void add_async_msg_task(task::sptr task) +    { +        _tx_async_msg_tasks.push_back(task); +    } + +private: +    uhd::rfnoc::tx_stream_terminator::sptr _terminator; +    both_xports_t _data_xport; +    both_xports_t _async_msg_xport; +    std::vector<task::sptr> _tx_async_msg_tasks; +}; + +// This class manages the lifetime of the RX transports and terminator and provides access to both +class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer +{ +public: +    device3_recv_packet_streamer( +            const size_t max_num_samps, +            const uhd::rfnoc::rx_stream_terminator::sptr terminator, +            const both_xports_t xport +        ) : +            uhd::transport::sph::recv_packet_streamer(max_num_samps), +            _terminator(terminator), +            _xport(xport) {}; + +    ~device3_recv_packet_streamer() {}; + +    both_xports_t get_xport() +    { +        return _xport; +    } + +    uhd::rfnoc::rx_stream_terminator::sptr get_terminator() +    { +        return _terminator; +    } + +private: +    uhd::rfnoc::rx_stream_terminator::sptr _terminator; +    both_xports_t _xport; +}; +  class device3_impl : public uhd::device3, public boost::enable_shared_from_this<device3_impl>  {  public: @@ -64,14 +140,11 @@ public:          size_t rx_fc_request_freq;          //! How often the downstream block should send ACKs per one full FC window          size_t tx_fc_response_freq; -        //! How often the downstream block should send ACKs in cycles -        size_t tx_fc_response_cycles;          stream_options_t(void)              : tx_max_len_hdr(DEVICE3_TX_MAX_HDR_LEN)              , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN)              , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ)              , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) -            , tx_fc_response_cycles(DEVICE3_TX_FC_RESPONSE_CYCLES)          {};      }; diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 8882552af..92865b6fe 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -13,8 +13,6 @@  #include <uhd/rfnoc/sink_block_ctrl_base.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/log.hpp> -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp"  #include <uhd/rfnoc/rate_node_ctrl.hpp>  #include <uhd/rfnoc/radio_ctrl.hpp>  #include <uhd/transport/zero_copy_flow_ctrl.hpp> @@ -30,10 +28,6 @@ using namespace uhd;  using namespace uhd::usrp;  using namespace uhd::transport; -//! CHDR uses 12-Bit sequence numbers -static const uint32_t HW_SEQ_NUM_MASK = 0xfff; - -  /***********************************************************************   * Helper functions for get_?x_stream()   **********************************************************************/ @@ -146,8 +140,25 @@ void generate_channel_list(  struct rx_fc_cache_t  {      rx_fc_cache_t(): -        last_seq_in(0){} -    size_t last_seq_in; +        interval(0), +        last_byte_count(0), +        total_bytes_consumed(0), +        total_packets_consumed(0), +        seq_num(0) {} + +    //! Flow control interval in bytes +    size_t interval; +    //! Byte count at last flow control packet +    uint32_t last_byte_count; +    //! This will wrap around, but that's OK, because math. +    uint32_t total_bytes_consumed; +    //! This will wrap around, but that's OK, because math. +    uint32_t total_packets_consumed; +    //! Sequence number of next flow control packet +    uint64_t seq_num; +    sid_t sid; +    zero_copy_if::sptr xport; +    endianness_t endianness;  };  /*! Determine the size of the flow control window in number of packets. @@ -180,109 +191,149 @@ static size_t get_rx_flow_control_window(          throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)");      } -    size_t window_in_pkts = (static_cast<size_t>(sw_buff_size * fullness_factor) / pkt_size); +    size_t window_in_bytes = (static_cast<size_t>(sw_buff_size * fullness_factor));      if (rx_args.has_key("max_recv_window")) { -        window_in_pkts = std::min( -            window_in_pkts, -            rx_args.cast<size_t>("max_recv_window", window_in_pkts) +        window_in_bytes = std::min( +            window_in_bytes, +            rx_args.cast<size_t>("max_recv_window", window_in_bytes)          );      } -    if (window_in_pkts == 0) { +    if (window_in_bytes < pkt_size) {          throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size.");      } -    UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= pkt_size * window_in_pkts); -    return window_in_pkts; +    UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= window_in_bytes); +    return window_in_bytes;  }  /*! Send out RX flow control packets.   * - * For an rx stream, this function takes care of sending back - * a flow control packet to the source telling it which - * packets have been consumed. - * - * This function should only be called by the function handling - * the rx stream, usually recv() in super_recv_packet_handler. + * This function handles updating the counters for the consumed + * bytes and packets, determines if a flow control message is + * is necessary, and sends one if it is.  Passing a nullptr for + * the buff parameter will skip the counter update.   * - * \param sid The SID that goes into this packet. This is the reversed() - *            version of the data stream's SID. - * \param xport A transport object over which to send the data - * \param big_endian Endianness of the transport - * \param seq32_state Pointer to a variable that saves the 32-Bit state - *                    of the sequence numbers, since we only have 12 Bit - *                    sequence numbers in CHDR. - * \param last_seq The value to send: The last consumed packet's sequence number. + * \param fc_cache RX flow control state information + * \param buff Receive buffer.  Setting to nullptr will + *             skip the counter update.   */ -static void handle_rx_flowctrl( -        const sid_t &sid, -        zero_copy_if::sptr xport, -        endianness_t endianness, +static bool rx_flow_ctrl(          boost::shared_ptr<rx_fc_cache_t> fc_cache, -        const size_t last_seq +        managed_buffer::sptr buff  ) { -    static const size_t RXFC_PACKET_LEN_IN_WORDS    = 2; -    static const size_t RXFC_CMD_CODE_OFFSET        = 0; -    static const size_t RXFC_SEQ_NUM_OFFSET         = 1; +    // If the caller supplied a buffer +    if (buff) +    { +        // Unpack the header +        vrt::if_packet_info_t packet_info; +        packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); +        const uint32_t *pkt = buff->cast<const uint32_t *>(); +        try { +            if (fc_cache->endianness == ENDIANNESS_BIG) +            { +                vrt::chdr::if_hdr_unpack_be(pkt, packet_info); +            } else { +                vrt::chdr::if_hdr_unpack_le(pkt, packet_info); +            } +        } +        catch(const std::exception &ex) +        { +            // Log and ignore +            UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; +            return true; +        } -    managed_send_buffer::sptr buff = xport->get_send_buff(0.0); -    if (not buff) { -        throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer"); +        // Update counters assuming the buffer is a consumed packet +        if (not packet_info.error) +        { +            fc_cache->total_bytes_consumed += buff->size(); +            fc_cache->total_packets_consumed++; +        }      } -    uint32_t *pkt = buff->cast<uint32_t *>(); -    // Recover sequence number. The sequence numbers handled by the streamers -    // are 12 Bits, but we want to know the 32-Bit sequence number. -    size_t &seq32 = fc_cache->last_seq_in; -    const size_t seq12 = seq32 & HW_SEQ_NUM_MASK; -    if (last_seq < seq12) -        seq32 += (HW_SEQ_NUM_MASK + 1); -    seq32 &= ~HW_SEQ_NUM_MASK; -    seq32 |= last_seq; +    // Just return if there is no need to send a flow control packet +    if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) +    { +        return true; +    } -    // Super-verbose mode: -    //static size_t fc_pkt_count = 0; -    //UHD_LOGGER_INFO("STREAMER") << "sending flow ctrl packet " << fc_pkt_count++ << ", acking " << str(boost::format("%04d\tseq_sw==0x%08x") % last_seq % seq32) ; +    // Time to send a flow control packet +    // Get a send buffer +    managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); +    if (not fc_buff) { +        throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); +    } +    uint32_t *pkt = fc_buff->cast<uint32_t *>();      //load packet info      vrt::if_packet_info_t packet_info;      packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC; -    packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS; +    packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32;      packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); -    packet_info.packet_count = seq32; +    packet_info.packet_count = fc_cache->seq_num++;      packet_info.sob = false;      packet_info.eob = false; -    packet_info.sid = sid.get(); +    packet_info.error = false; +    packet_info.fc_ack = false; +    packet_info.sid = fc_cache->sid.get();      packet_info.has_sid = true;      packet_info.has_cid = false;      packet_info.has_tsi = false;      packet_info.has_tsf = false;      packet_info.has_tlr = false; -    if (endianness == ENDIANNESS_BIG) { +    if (fc_cache->endianness == ENDIANNESS_BIG) {          // Load Header:          vrt::chdr::if_hdr_pack_be(pkt, packet_info); -        // Load Payload: (the sequence number) -        pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htonx<uint32_t>(0); -        pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET]  = uhd::htonx<uint32_t>(seq32); +        // Load Payload: Packet count, and byte count +        pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = +            uhd::htonx<uint32_t>(fc_cache->total_packets_consumed); +        pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = +            uhd::htonx<uint32_t>(fc_cache->total_bytes_consumed);      } else {          // Load Header:          vrt::chdr::if_hdr_pack_le(pkt, packet_info); -        // Load Payload: (the sequence number) -        pkt[packet_info.num_header_words32+RXFC_CMD_CODE_OFFSET] = uhd::htowx<uint32_t>(0); -        pkt[packet_info.num_header_words32+RXFC_SEQ_NUM_OFFSET]  = uhd::htowx<uint32_t>(seq32); +        // Load Payload: Packet count, and byte count +        pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = +            uhd::htowx<uint32_t>(fc_cache->total_packets_consumed); +        pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = +            uhd::htowx<uint32_t>(fc_cache->total_bytes_consumed);      } -    //std::cout << "  SID=" << std::hex << sid << " hdr bits=" << packet_info.packet_type << " seq32=" << seq32 << std::endl; -    //std::cout << "num_packet_words32: " << packet_info.num_packet_words32 << std::endl; -    //for (size_t i = 0; i < packet_info.num_packet_words32; i++) { -        //std::cout << str(boost::format("0x%08x") % pkt[i]) << " "; -        //if (i % 2) { -            //std::cout << std::endl; -        //} -    //} -      //send the buffer over the interface -    buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); +    fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + +    //update byte count +    fc_cache->last_byte_count = fc_cache->total_bytes_consumed; + +    return true; +} + +/*! Handle RX flow control ACK packets. + * + */ +static void handle_rx_flowctrl_ack( +        boost::shared_ptr<rx_fc_cache_t> fc_cache, +        const uint32_t *payload +) { +    const uint32_t pkt_count = (fc_cache->endianness == ENDIANNESS_BIG) ? +                                    uhd::ntohx<uint32_t>(payload[0]) : +                                    uhd::wtohx<uint32_t>(payload[0]); +    const uint32_t byte_count = (fc_cache->endianness == ENDIANNESS_BIG) ? +                                    uhd::ntohx<uint32_t>(payload[1]) : +                                    uhd::wtohx<uint32_t>(payload[1]); +    if (fc_cache->total_bytes_consumed != byte_count) +    { +        UHD_LOGGER_DEBUG("device3") +        << "oh noes: byte_count==" << byte_count +            << "  total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::endl +        ; +    } +    fc_cache->total_bytes_consumed = byte_count; +    fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? + +    // This will send a flow control packet if there is a significant discrepancy +    rx_flow_ctrl(fc_cache, nullptr);  }  /*********************************************************************** @@ -293,56 +344,51 @@ static void handle_rx_flowctrl(  //! Stores the state of TX flow control  struct tx_fc_cache_t  { -    tx_fc_cache_t(size_t capacity): +    tx_fc_cache_t(uint32_t capacity): +        last_byte_ack(0),          last_seq_ack(0), -        space(capacity) {} - -    size_t last_seq_ack; -    size_t space; +        byte_count(0), +        pkt_count(0), +        window_size(capacity), +        fc_ack_seqnum(0), +        fc_received(false) {} + +    uint32_t last_byte_ack; +    uint32_t last_seq_ack; +    uint32_t byte_count; +    uint32_t pkt_count; +    uint32_t window_size; +    uint32_t fc_ack_seqnum; +    bool fc_received;  }; -/*! Return the size of the flow control window in packets. - * - * If the return value of this function is F, the last tx'd packet - * has index N and the last ack'd packet has index M, the amount of - * FC credit we have is C = F + M - N (i.e. we can send C more packets - * before getting another ack). - * - * Note: If `send_buff_size` is set in \p tx_hints, this will - * override hw_buff_size_. - */ -static size_t get_tx_flow_control_window( -        size_t pkt_size, -        const double hw_buff_size_, -        const device_addr_t& tx_hints -) { -    double hw_buff_size = tx_hints.cast<double>("send_buff_size", hw_buff_size_); -    size_t window_in_pkts = (static_cast<size_t>(hw_buff_size) / pkt_size); -    if (window_in_pkts == 0) { -        throw uhd::value_error("send_buff_size must be larger than the send_frame_size."); -    } -    return window_in_pkts; -} -  static bool tx_flow_ctrl(      boost::shared_ptr<tx_fc_cache_t> fc_cache, -    zero_copy_if::sptr async_xport, -    uint32_t (*endian_conv)(uint32_t), +	zero_copy_if::sptr xport, +    uint32_t (*to_host)(uint32_t),      void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), -    managed_buffer::sptr +    managed_buffer::sptr buff  ) {      while (true)      {          // If there is space -        if (fc_cache->space) +        if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size())          {              // All is good - packet will be sent -            fc_cache->space--; +            fc_cache->byte_count += buff->size(); +            // Round up to nearest word +            if (fc_cache->byte_count % DEVICE3_LINE_SIZE) +            { +                fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); +            } +            fc_cache->pkt_count++;              return true;          }          // Look for a flow control message to update the space available in the buffer. -        managed_recv_buffer::sptr buff = async_xport->get_recv_buff(); +        // A minimal timeout is used because larger timeouts can cause the thread to be +        // scheduled out for too long at high data rates and result in underruns. +        managed_recv_buffer::sptr buff = xport->get_recv_buff(0.000001);          if (buff)          {              vrt::if_packet_info_t if_packet_info; @@ -353,28 +399,94 @@ static bool tx_flow_ctrl(              }              catch(const std::exception &ex)              { -                UHD_LOG_ERROR("TX FLOW CTRL", "Error unpacking async flow control packet: " << ex.what()); +                UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl;                  continue;              }              if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC)              { -                UHD_LOG_ERROR( -                    "TX FLOW CTRL", -                    "Unexpected packet type received by flow control handler: " << if_packet_info.packet_type -                ); +                UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl;                  continue;              } +            const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; +            const uint32_t pkt_count = to_host(payload[0]); +            const uint32_t byte_count = to_host(payload[1]); +              // update the amount of space -            size_t seq_ack = endian_conv(packet_buff[if_packet_info.num_header_words32+1]); -            fc_cache->space += (seq_ack - fc_cache->last_seq_ack) & HW_SEQ_NUM_MASK; -            fc_cache->last_seq_ack = seq_ack; +            fc_cache->last_byte_ack = byte_count; +            fc_cache->last_seq_ack = pkt_count; + +            fc_cache->fc_received = true;          }      }      return false;  } +static void tx_flow_ctrl_ack( +    boost::shared_ptr<tx_fc_cache_t> fc_cache, +    zero_copy_if::sptr send_xport, +    sid_t send_sid, +    uint32_t (*from_host)(uint32_t), +    void (*pack)(uint32_t *packet_buff, vrt::if_packet_info_t &) +) { +    if (not fc_cache->fc_received) +    { +        return; +    } + +    // Time to send a flow control ACK packet +    // Get a send buffer +    managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); +    if (not fc_buff) { +        UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; +        return; +    } +    uint32_t *pkt = fc_buff->cast<uint32_t *>(); + +    // Load packet info +    vrt::if_packet_info_t packet_info; +    packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK; +    packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; +    packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); +    packet_info.packet_count = fc_cache->fc_ack_seqnum++; +    packet_info.sob = false; +    packet_info.eob = true; +    packet_info.error = false; +    packet_info.fc_ack = false; +    packet_info.sid = send_sid.get(); +    packet_info.has_sid = true; +    packet_info.has_cid = false; +    packet_info.has_tsi = false; +    packet_info.has_tsf = false; +    packet_info.has_tlr = false; + +    // Load Header: +    pack(pkt, packet_info); + +    // Update counters to include this packet +    size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); +    fc_cache->byte_count += fc_ack_pkt_size; +    // Round up to nearest word +    if (fc_cache->byte_count % DEVICE3_LINE_SIZE) +    { +        fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); +    } +    fc_cache->pkt_count++; + +    // Load Payload: Packet count, and byte count +    pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = +        from_host(fc_cache->pkt_count); +    pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = +        from_host(fc_cache->byte_count); + +    // Send the buffer over the interface +    fc_buff->commit(fc_ack_pkt_size); + +    // Reset for next FC +    fc_cache->fc_received = false; +} +  /***********************************************************************   * TX Async Message Functions   **********************************************************************/ @@ -445,13 +557,10 @@ static void handle_tx_async_msgs(              async_info->stream_channel      ); -    // Filter out any flow control messages and cache the rest +	// Filter out any flow control messages and cache the rest      if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL)      { -        UHD_LOG_ERROR( -            "TX ASYNC", -            "Unexpected flow control message found in async message handling" -        ); +        UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl;      } else {          async_info->async_queue->push_with_pop_on_full(metadata);          metadata.channel = async_info->device_channel; @@ -474,8 +583,8 @@ void device3_impl::update_rx_streamers(double /* rate */)  {      for(const std::string &block_id:  _rx_streamers.keys()) {          UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id; -        boost::shared_ptr<sph::recv_packet_streamer> my_streamer = -            boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[block_id].lock()); +        boost::shared_ptr<device3_recv_packet_streamer> my_streamer = +            boost::dynamic_pointer_cast<device3_recv_packet_streamer>(_rx_streamers[block_id].lock());          if (my_streamer) {              double tick_rate = my_streamer->get_terminator()->get_tick_rate();              if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -511,12 +620,15 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)      // Note: All 'args.args' are merged into chan_args now.      // II. Iterate over all channels -    boost::shared_ptr<sph::recv_packet_streamer> my_streamer; +    boost::shared_ptr<device3_recv_packet_streamer> my_streamer;      // The terminator's lifetime is coupled to the streamer.      // There is only one terminator. If the streamer has multiple channels,      // it will be connected to each upstream block.      rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make(); -    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { +    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) +    { +        // First, configure blocks and create transport +          // Get block ID and mb index          uhd::rfnoc::block_id_t block_id = chan_list[stream_i];          UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ; @@ -549,7 +661,36 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)          uhd::sid_t stream_address = blk_ctrl->get_address(block_port);          UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ;          both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); -        UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size ; +        UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size; + +        // Configure the block +        // Flow control setup +        const size_t pkt_size = xport.recv->get_recv_frame_size(); +        // Leave one pkt_size space for overrun packets - TODO make this obsolete +        const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size; +        const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); +        UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window; +        blk_ctrl->configure_flow_control_out( +            true, +            fc_window, +            rx_hints.cast<size_t>("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 +            block_port +        ); + +        // Add flow control transport +        boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); +        fc_cache->sid = xport.send_sid; +        fc_cache->xport = xport.send; +        fc_cache->endianness = xport.endianness; +        fc_cache->interval = fc_handle_window; +        xport.recv = zero_copy_flow_ctrl::make +        ( +            xport.recv, +            NULL, +            [=](managed_buffer::sptr buff) { +                return rx_flow_ctrl(fc_cache, buff); +            } +        );          // Configure the block          // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. @@ -558,8 +699,10 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)          // other settings.          blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x1, block_port);          blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x0, block_port); +        // Configure routing for data          blk_ctrl->set_destination(xport.send_sid.get_src(), block_port); +        // Configure routing for responses          blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port);          UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ; @@ -570,17 +713,24 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)              node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port);          } -        // To calculate the max number of samples per packet, we assume the maximum header length -        // to avoid fragmentation should the entire header be used. -        const size_t bpp = xport.recv->get_recv_frame_size() - stream_options.rx_max_len_hdr; // bytes per packet -        const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item -        const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet -        UHD_RX_STREAMER_LOG() << "spp == " << spp ; +        // Second, configure the streamer          //make the new streamer given the samples per packet          if (not my_streamer) -            my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); -        my_streamer->resize(chan_list.size()); +        { +            // To calculate the max number of samples per packet, we assume the maximum header length +            // to avoid fragmentation should the entire header be used. +            const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet +            const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item +            const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet +            UHD_RX_STREAMER_LOG() << "spp == " << spp ; + +            my_streamer = boost::make_shared<device3_recv_packet_streamer>( +                    spp, +                    recv_terminator, +                    xport); +            my_streamer->resize(chan_list.size()); +        }          //init some streamer stuff          std::string conv_endianness; @@ -600,72 +750,51 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)          id.num_outputs = 1;          my_streamer->set_converter(id); -        //flow control setup -        const size_t pkt_size = spp * bpi + stream_options.rx_max_len_hdr; -        const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints); -        const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); -        UHD_RX_STREAMER_LOG()<< "Flow Control Window (minus one) = " << fc_window-1 << ", Flow Control Handler Window = " << fc_handle_window ; -        blk_ctrl->configure_flow_control_out( -                fc_window-1, // Leave one space for overrun packets TODO make this obsolete -                block_port +        // Give the streamer a functor to handle flow control ACK messages +        my_streamer->set_xport_handle_flowctrl_ack( +            stream_i, +            [=](const uint32_t *payload) { +                handle_rx_flowctrl_ack( +                        fc_cache, +                        payload +                ); +            }          );          //Give the streamer a functor to get the recv_buffer -        //bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency          my_streamer->set_xport_chan_get_buff(              stream_i, -            boost::bind(&zero_copy_if::get_recv_buff, xport.recv, _1), +            [=](double timeout) {return xport.recv->get_recv_buff(timeout);},              true /*flush*/          );          //Give the streamer a functor to handle overruns          //bind requires a weak_ptr to break the a streamer->streamer circular dependency          //Using "this" is OK because we know that this device3_impl will outlive the streamer +        boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer);          my_streamer->set_overflow_handler( -              stream_i, -              boost::bind( -                  &uhd::rfnoc::rx_stream_terminator::handle_overrun, recv_terminator, -                  boost::weak_ptr<uhd::rx_streamer>(my_streamer), stream_i -              ) -        ); - -        //Give the streamer a functor to send flow control messages -        //handle_rx_flowctrl is static and has no lifetime issues -        boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); -        my_streamer->set_xport_handle_flowctrl( -            stream_i, boost::bind( -                &handle_rx_flowctrl, -                xport.send_sid, -                xport.send, -                xport.endianness, -                fc_cache, -                _1 -            ), -            fc_handle_window, -            true/*init*/ +            stream_i, +            [=]() { +                recv_terminator->handle_overrun( +                        weak_ptr, +                        stream_i); +            }          );          //Give the streamer a functor issue stream cmd -        //bind requires a shared pointer to add a streamer->framer lifetime dependency          my_streamer->set_issue_stream_cmd(              stream_i, -            boost::bind(&uhd::rfnoc::source_block_ctrl_base::issue_stream_cmd, blk_ctrl, _1, block_port) +            [=](const stream_cmd_t& stream_cmd) {blk_ctrl->issue_stream_cmd(stream_cmd, block_port);}          ); - -        // Tell the streamer which SID is valid for this channel -        my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid);      } -    // Connect the terminator to the streamer -    my_streamer->set_terminator(recv_terminator); -      // Notify all blocks in this chain that they are connected to an active streamer      recv_terminator->set_rx_streamer(true, 0);      // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency.      // Note that we store the streamer only once, and use its terminator's      // ID to do so. -    _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<sph::recv_packet_streamer>(my_streamer); +    _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<uhd::rx_streamer>(my_streamer);      // Sets tick rate, samp rate and scaling on this streamer.      // A registered terminator is required to do this. @@ -682,8 +811,8 @@ void device3_impl::update_tx_streamers(double /* rate */)  {      for(const std::string &block_id:  _tx_streamers.keys()) {          UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id; -        boost::shared_ptr<sph::send_packet_streamer> my_streamer = -            boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[block_id].lock()); +        boost::shared_ptr<device3_send_packet_streamer> my_streamer = +            boost::dynamic_pointer_cast<device3_send_packet_streamer>(_tx_streamers[block_id].lock());          if (my_streamer) {              double tick_rate = my_streamer->get_terminator()->get_tick_rate();              if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -705,20 +834,6 @@ void device3_impl::update_tx_streamers(double /* rate */)      }  } -// This class manages the lifetime of the TX async message handler task and transports -class device3_send_packet_streamer : public sph::send_packet_streamer -{ -public: -	device3_send_packet_streamer(const size_t max_num_samps) : sph::send_packet_streamer(max_num_samps) {}; -	~device3_send_packet_streamer() { -		_tx_async_msg_task.reset();	// Make sure the async task is destroyed before the transports -	}; - -	both_xports_t _xport; -	both_xports_t _async_xport; -	task::sptr _tx_async_msg_task; -}; -  tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)  {      boost::mutex::scoped_lock lock(_transport_setup_mutex); @@ -739,7 +854,10 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)      // There is only one terminator. If the streamer has multiple channels,      // it will be connected to each downstream block.      rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make(); -    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { +    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) +    { +        // First, configure the downstream blocks and create the transports +          // Get block ID and mb index          uhd::rfnoc::block_id_t block_id = chan_list[stream_i];          // Update args so args.args is always valid for this particular channel: @@ -768,87 +886,42 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)          // Setup the dsp transport hints          device_addr_t tx_hints = get_tx_hints(mb_index); -        //allocate sid and create transport +        // Allocate sid and create transport          uhd::sid_t stream_address = blk_ctrl->get_address(block_port);          UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ;          both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints);          both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); -        UHD_TX_STREAMER_LOG() << std::hex << "[TX Streamer] data_sid = " << xport.send_sid << std::dec << std::endl; - -        // To calculate the max number of samples per packet, we assume the maximum header length -        // to avoid fragmentation should the entire header be used. -        const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; -        const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item -        const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet -        UHD_TX_STREAMER_LOG() << "spp == " << spp ; - -        //make the new streamer given the samples per packet -        if (not my_streamer) -            my_streamer = boost::make_shared<device3_send_packet_streamer>(spp); -        my_streamer->resize(chan_list.size()); -        my_streamer->_xport = xport; -        my_streamer->_async_xport = async_xport; - -        //init some streamer stuff -        std::string conv_endianness; -        if (xport.endianness == ENDIANNESS_BIG) { -            my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); -            conv_endianness = "be"; -        } else { -            my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); -            conv_endianness = "le"; -        } - -        //set the converter -        uhd::convert::id_type id; -        id.input_format = args.cpu_format; -        id.num_inputs = 1; -        id.output_format = args.otw_format + "_item32_" + conv_endianness; -        id.num_outputs = 1; -        my_streamer->set_converter(id); +        UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; -        //flow control setup -        const size_t pkt_size = spp * bpi + stream_options.tx_max_len_hdr; -        // For flow control, this value is used to determine the window size in *packets* -        size_t fc_window = get_tx_flow_control_window( -                pkt_size, // This is the maximum packet size -                blk_ctrl->get_fifo_size(block_port), -                tx_hints // This can override the value reported by the block! -        ); +        // Configure flow control +        // This disables the FC module's output, do this before configuring flow control +        blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); +        blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); +        // Configure flow control on downstream block +        const size_t fc_window = tx_hints.cast<size_t>("send_buff_size", blk_ctrl->get_fifo_size(block_port));          const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.tx_fc_response_freq);          UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ;          blk_ctrl->configure_flow_control_in( -                stream_options.tx_fc_response_cycles, -                fc_handle_window, /*pkts*/ +                fc_handle_window, /*bytes*/                  block_port          ); - -        boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); -        async_tx_info->stream_channel = args.channels[stream_i]; -        async_tx_info->device_channel = mb_index; -        async_tx_info->async_queue = async_md; -        async_tx_info->old_async_queue = _async_md; - -        boost::function<double(void)> tick_rate_retriever = boost::bind( -                &rfnoc::tick_node_ctrl::get_tick_rate, -                send_terminator, -                std::set< rfnoc::node_ctrl_base::sptr >() // Need to specify default args with bind -        ); - -        my_streamer->_tx_async_msg_task = task::make( -                boost::bind( -                    &handle_tx_async_msgs, -                    async_tx_info, -                    my_streamer->_async_xport.recv, -                    xport.endianness, -                    tick_rate_retriever -                    ), -                "tx_async_msgs_task" +        // Add flow control transport +        boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); +        xport.send = zero_copy_flow_ctrl::make( +            xport.send, +            [=](managed_buffer::sptr buff) { +                return tx_flow_ctrl( +                    fc_cache, +                    xport.recv, +                    (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), +                    (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), +                    buff); +            }, +            NULL          ); -        blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); -        blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); -        blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); +        // Configure return path for async messages +        blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port);          UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ;          // FIXME: Once there is a better way to map the radio block and port @@ -863,7 +936,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)              UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size();              for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node:  downstream_radio_nodes) {                  if (node->get_block_id() == radio_id) { -                    node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), radio_port); +                    node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port);                  }              }          } else { @@ -876,39 +949,96 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)              std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>();              UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size();              for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node:  downstream_radio_nodes) { -                node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); +                node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port);              }          } -        // Add flow control -        boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); -        my_streamer->_xport.send = zero_copy_flow_ctrl::make( -            my_streamer->_xport.send, -            boost::bind( -                &tx_flow_ctrl, -                fc_cache, -                my_streamer->_xport.recv, -                (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), -                (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), -                _1), -            NULL); +        // Second, configure the streamer now that the blocks and transports are configured + +        //make the new streamer given the samples per packet +        if (not my_streamer) +        { +            // To calculate the max number of samples per packet, we assume the maximum header length +            // to avoid fragmentation should the entire header be used. +            const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; +            const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item +            const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet +            UHD_TX_STREAMER_LOG() << "spp == " << spp ; + +            my_streamer = boost::make_shared<device3_send_packet_streamer>( +                    spp, +                    send_terminator, +                    xport, +                    async_xport); +            my_streamer->resize(chan_list.size()); +        } + +        //init some streamer stuff +        std::string conv_endianness; +        if (xport.endianness == ENDIANNESS_BIG) { +            my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); +            conv_endianness = "be"; +        } else { +            my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); +            conv_endianness = "le"; +        } + +        //set the converter +        uhd::convert::id_type id; +        id.input_format = args.cpu_format; +        id.num_inputs = 1; +        id.output_format = args.otw_format + "_item32_" + conv_endianness; +        id.num_outputs = 1; +        my_streamer->set_converter(id); + +        boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); +        async_tx_info->stream_channel = args.channels[stream_i]; +        async_tx_info->device_channel = mb_index; +        async_tx_info->async_queue = async_md; +        async_tx_info->old_async_queue = _async_md; + +        task::sptr async_task = task::make( +            [=]() { +                handle_tx_async_msgs( +                        async_tx_info, +                        async_xport.recv, +                        xport.endianness, +                        [=]() {return send_terminator->get_tick_rate();} +                ); +            } +        ); +        my_streamer->add_async_msg_task(async_task);          //Give the streamer a functor to get the send buffer          my_streamer->set_xport_chan_get_buff(              stream_i, -            boost::bind(&zero_copy_if::get_send_buff, my_streamer->_xport.send, _1) +            [=](const double timeout) { +                return xport.send->get_send_buff(timeout); +            }          );          //Give the streamer a functor handled received async messages          my_streamer->set_async_receiver( -            boost::bind(&async_md_type::pop_with_timed_wait, async_md, _1, _2) +            [=](uhd::async_metadata_t& md, const double timeout) { +                return async_md->pop_with_timed_wait(md, timeout); +            }          );          my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid);          // CHDR does not support trailers          my_streamer->set_enable_trailer(false); -    } -    // Connect the terminator to the streamer -    my_streamer->set_terminator(send_terminator); +        my_streamer->set_xport_chan_post_send_cb( +            stream_i, +            [=]() { +                tx_flow_ctrl_ack( +                    fc_cache, +                    xport.send, +                    xport.send_sid, +                    (xport.endianness == ENDIANNESS_BIG ? uhd::htonx<uint32_t> : uhd::htowx<uint32_t>), +                    (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_pack_be : vrt::chdr::if_hdr_pack_le) +                ); +            } +        ); +    }      // Notify all blocks in this chain that they are connected to an active streamer      send_terminator->set_tx_streamer(true, 0); @@ -916,7 +1046,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)      // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency.      // Note that we store the streamer only once, and use its terminator's      // ID to do so. -    _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<sph::send_packet_streamer>(my_streamer); +    _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<uhd::tx_streamer>(my_streamer);      // Sets tick rate, samp rate and scaling on this streamer      // A registered terminator is required to do this. diff --git a/host/lib/usrp/x300/x300_impl.cpp b/host/lib/usrp/x300/x300_impl.cpp index 0c8d78834..351cb4e10 100644 --- a/host/lib/usrp/x300/x300_impl.cpp +++ b/host/lib/usrp/x300/x300_impl.cpp @@ -1231,8 +1231,8 @@ uhd::both_xports_t x300_impl::make_transport(                  ? X300_PCIE_RX_DATA_FRAME_SIZE                  : X300_PCIE_MSG_FRAME_SIZE; -			default_buff_args.num_send_frames = -				(xport_type == TX_DATA) +            default_buff_args.num_send_frames = +                (xport_type == TX_DATA)                  ? X300_PCIE_TX_DATA_NUM_FRAMES                  : X300_PCIE_MSG_NUM_FRAMES; diff --git a/host/lib/usrp/x300/x300_io_impl.cpp b/host/lib/usrp/x300/x300_io_impl.cpp index 082d83185..af5aa7c9e 100644 --- a/host/lib/usrp/x300/x300_io_impl.cpp +++ b/host/lib/usrp/x300/x300_io_impl.cpp @@ -7,19 +7,9 @@  #include "x300_regs.hpp"  #include "x300_impl.hpp" -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp" -#include <uhdlib/usrp/common/async_packet_handler.hpp> -#include <uhd/transport/nirio_zero_copy.hpp> -#include <uhd/transport/bounded_buffer.hpp> -#include <uhd/utils/tasks.hpp> -#include <uhd/utils/log.hpp> -#include <boost/bind.hpp> -#include <boost/make_shared.hpp>  using namespace uhd;  using namespace uhd::usrp; -using namespace uhd::transport;  /***********************************************************************   * Hooks for get_tx_stream() and get_rx_stream() @@ -62,8 +52,8 @@ void x300_impl::post_streamer_hooks(direction_t dir)      // Loop through all tx streamers. Find all radios connected to one      // streamer. Sync those.      for(const boost::weak_ptr<uhd::tx_streamer> &streamer_w:  _tx_streamers.vals()) { -        const boost::shared_ptr<sph::send_packet_streamer> streamer = -            boost::dynamic_pointer_cast<sph::send_packet_streamer>(streamer_w.lock()); +        const boost::shared_ptr<device3_send_packet_streamer> streamer = +            boost::dynamic_pointer_cast<device3_send_packet_streamer>(streamer_w.lock());          if (not streamer) {              continue;          }  | 
