diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/transport/chdr.cpp | 7 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 143 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 41 | ||||
-rw-r--r-- | host/lib/transport/zero_copy_flow_ctrl.cpp | 5 |
5 files changed, 92 insertions, 105 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index ae903d956..15771697a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -112,6 +112,7 @@ LIBUHD_PYTHON_GEN_SOURCE( ) LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp diff --git a/host/lib/transport/chdr.cpp b/host/lib/transport/chdr.cpp index 48263f57e..36f380d62 100644 --- a/host/lib/transport/chdr.cpp +++ b/host/lib/transport/chdr.cpp @@ -23,6 +23,7 @@ using namespace uhd::transport::vrt; static const uint32_t HDR_FLAG_TSF = (1 << 29); static const uint32_t HDR_FLAG_EOB = (1 << 28); static const uint32_t HDR_FLAG_ERROR = (1 << 28); +static const uint32_t HDR_FLAG_FCACK = (1 << 28); /***************************************************************************/ /* Packing */ @@ -45,8 +46,8 @@ UHD_INLINE uint32_t _hdr_pack_chdr( | (if_packet_info.packet_type << 30) // 1 Bit: Has time | (if_packet_info.has_tsf ? HDR_FLAG_TSF : 0) - // 1 Bit: EOB or Error - | ((if_packet_info.eob or if_packet_info.error) ? HDR_FLAG_EOB : 0) + // 1 Bit: EOB or Error or FC ACK + | ((if_packet_info.eob or if_packet_info.error or if_packet_info.fc_ack) ? HDR_FLAG_EOB : 0) // 12 Bits: Sequence number | ((if_packet_info.packet_count & 0xFFF) << 16) // 16 Bits: Total packet length @@ -111,6 +112,8 @@ UHD_INLINE void _hdr_unpack_chdr( && ((chdr & HDR_FLAG_EOB) > 0); if_packet_info.error = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_RESP) && ((chdr & HDR_FLAG_ERROR) > 0); + if_packet_info.fc_ack = (if_packet_info.packet_type == if_packet_info_t::PACKET_TYPE_FC) + && ((chdr & HDR_FLAG_FCACK) > 0); if_packet_info.packet_count = (chdr >> 16) & 0xFFF; // Set packet length variables diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 921bfdfa0..c962d40e6 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -12,9 +12,9 @@ #include <uhd/exception.hpp> #include <uhd/convert.hpp> #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> @@ -22,7 +22,6 @@ #include <boost/dynamic_bitset.hpp> #include <boost/function.hpp> #include <boost/format.hpp> -#include <boost/bind.hpp> #include <boost/make_shared.hpp> #include <iostream> #include <vector> @@ -59,6 +58,7 @@ class recv_packet_handler{ public: typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; typedef boost::function<void(const size_t)> handle_flowctrl_type; + typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type; typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type; typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; @@ -102,33 +102,6 @@ public: _header_offset_words32 = header_offset_words32; } - ////////////////// RFNOC /////////////////////////// - //! Set the stream ID for a specific channel (or no SID) - void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){ - _props.at(xport_chan).has_sid = has_sid; - _props.at(xport_chan).sid = sid; - } - - //! Get the stream ID for a specific channel (or zero if no SID) - uint32_t get_xport_chan_sid(const size_t xport_chan) const { - if (_props.at(xport_chan).has_sid) { - return _props.at(xport_chan).sid; - } else { - return 0; - } - } - - void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator) - { - _terminator = terminator; - } - - uhd::rfnoc::rx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - ////////////////// RFNOC /////////////////////////// - /*! * Set the threshold for alignment failure. * How many packets throw out before giving up? @@ -183,6 +156,13 @@ public: if (do_init) handle_flowctrl(0); } + void set_xport_handle_flowctrl_ack( + const size_t xport_chan, + const handle_flowctrl_ack_type &handle_flowctrl_ack + ) { + _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack; + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_outputs = id.num_outputs; @@ -211,12 +191,11 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { - // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this. - //if (stream_cmd.stream_now - //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS - //and _props.size() > 1) { - //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); - //} + if (size() > 1 and stream_cmd.stream_now and + stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) + { + throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align."); + } for (size_t i = 0; i < _props.size(); i++) { @@ -307,11 +286,8 @@ private: size_t packet_count; handle_overflow_type handle_overflow; handle_flowctrl_type handle_flowctrl; + handle_flowctrl_ack_type handle_flowctrl_ack; size_t fc_update_window; - /////// RFNOC /////////// - bool has_sid; - uint32_t sid; - /////// RFNOC /////////// }; std::vector<xport_chan_props_type> _props; size_t _num_outputs; @@ -340,6 +316,7 @@ private: buffers_info_type(const size_t size): std::vector<per_buffer_info_type>(size), indexes_todo(size, true), + alignment_time(0), alignment_time_valid(false), data_bytes_to_copy(0), fragment_offset_in_samps(0) @@ -384,8 +361,6 @@ private: int recvd_packets; #endif - uhd::rfnoc::rx_stream_terminator::sptr _terminator; - /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -398,42 +373,58 @@ private: per_buffer_info_type &curr_buffer_info, double timeout ){ - //get a single packet from the transport layer managed_recv_buffer::sptr &buff = curr_buffer_info.buff; - buff = _props[index].get_buff(timeout); - if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - - #ifdef ERROR_INJECT_DROPPED_PACKETS - if (++recvd_packets > 1000) + per_buffer_info_type &info = curr_buffer_info; + while (1) { - recvd_packets = 0; - buff.reset(); + //get a single packet from the transport layer buff = _props[index].get_buff(timeout); if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - } - #endif - //bounds check before extract - size_t num_packet_words32 = buff->size()/sizeof(uint32_t); - if (num_packet_words32 <= _header_offset_words32){ - throw std::runtime_error("recv buffer smaller than vrt packet offset"); - } + #ifdef ERROR_INJECT_DROPPED_PACKETS + if (++recvd_packets > 1000) + { + recvd_packets = 0; + buff.reset(); + buff = _props[index].get_buff(timeout); + if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; + } + #endif - //extract packet info - per_buffer_info_type &info = curr_buffer_info; - info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; - info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; - _vrt_unpacker(info.vrt_hdr, info.ifpi); - info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true - info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); - - //handle flow control - if (_props[index].handle_flowctrl) - { - if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true + info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + + //handle flow control + if (_props[index].handle_flowctrl) { - _props[index].handle_flowctrl(info.ifpi.packet_count); + if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + { + _props[index].handle_flowctrl(info.ifpi.packet_count); + } } + + //handle flow control ack + if (info.ifpi.fc_ack){ + if (_props[index].handle_flowctrl_ack) { + _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff)); + } + // Process the next packet + buff.reset(); + info.copy_buff = nullptr; + continue; + } + + break; } //-------------------------------------------------------------- @@ -605,7 +596,7 @@ private: rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); curr_info.metadata = metadata; - UHD_LOG_FASTPATH("O") + UHD_LOG_FASTPATH("O"); } curr_info[index].buff.reset(); curr_info[index].copy_buff = nullptr; @@ -627,7 +618,7 @@ private: prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate); curr_info.metadata.out_of_sequence = true; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; - UHD_LOG_FASTPATH("D") + UHD_LOG_FASTPATH("D"); return; } @@ -635,10 +626,10 @@ private: //too many iterations: detect alignment failure if (iterations++ > _alignment_failure_threshold){ UHD_LOGGER_ERROR("STREAMER") << boost::format( - "The receive packet handler failed to time-align packets. " - "%u received packets were processed by the handler. " - "However, a timestamp match could not be determined." - ) % iterations; + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; _props[index].handle_overflow(); @@ -659,7 +650,7 @@ private: } /******************************************************************* - * Receive a single packet: + * Receive a single packet on all channels * Handles fragmentation, messages, errors, and copy-conversion. * When no fragments are available, call the get aligned buffers. * Then copy-convert available data into the user's IO buffers. diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index e824ef4e9..5cba570a7 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -12,7 +12,6 @@ #include <uhd/exception.hpp> #include <uhd/convert.hpp> #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/thread.hpp> @@ -49,6 +48,7 @@ namespace sph { class send_packet_handler{ public: typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; + typedef boost::function<void(void)> post_send_cb_type; typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type; typedef void(*vrt_packer_type)(uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; @@ -93,27 +93,6 @@ public: _props.at(xport_chan).sid = sid; } - ///////// RFNOC /////////////////// - //! Get the stream ID for a specific channel (or zero if no SID) - uint32_t get_xport_chan_sid(const size_t xport_chan) const { - if (_props.at(xport_chan).has_sid) { - return _props.at(xport_chan).sid; - } else { - return 0; - } - } - - void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator) - { - _terminator = terminator; - } - - uhd::rfnoc::tx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - ///////// RFNOC /////////////////// - void set_enable_trailer(const bool enable) { _has_tlr = enable; @@ -138,6 +117,15 @@ public: _props.at(xport_chan).get_buff = get_buff; } + /*! + * Set the callback function for post-send. + * \param xport_chan which transport channel + * \param cb post-send callback + */ + void set_xport_chan_post_send_cb(const size_t xport_chan, const post_send_cb_type &cb){ + _props.at(xport_chan).go_postal = cb; + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_inputs = id.num_inputs; @@ -198,6 +186,7 @@ public: if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate); if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; + if_packet_info.fc_ack = false; //This is a data packet /* * Metadata is cached when we get a send requesting a start of burst with no samples. @@ -291,6 +280,7 @@ private: struct xport_chan_props_type{ xport_chan_props_type(void):has_sid(false),sid(0){} get_buff_type get_buff; + post_send_cb_type go_postal; bool has_sid; uint32_t sid; managed_send_buffer::sptr buff; @@ -308,8 +298,6 @@ private: bool _cached_metadata; uhd::tx_metadata_t _metadata_cache; - uhd::rfnoc::tx_stream_terminator::sptr _terminator; - #ifdef UHD_TXRX_DEBUG_PRINTS struct dbg_send_stat_t { dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate): @@ -428,6 +416,11 @@ private: const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; buff->commit(num_vita_words32*sizeof(uint32_t)); buff.reset(); //effectively a release + + if (_props[index].go_postal) + { + _props[index].go_postal(); + } } //! Shared variables for the worker threads diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp index 709b9e981..25be35569 100644 --- a/host/lib/transport/zero_copy_flow_ctrl.cpp +++ b/host/lib/transport/zero_copy_flow_ctrl.cpp @@ -65,7 +65,7 @@ public: zero_copy_flow_ctrl_mrb( flow_ctrl_func flow_ctrl ) : - _mb(nullptr), + _mb(NULL), _flow_ctrl(flow_ctrl) { /* NOP */ @@ -80,8 +80,6 @@ public: { if (_mb) { - _mb->commit(size()); - while (_flow_ctrl and not _flow_ctrl(_mb)) {} _mb.reset(); } } @@ -89,6 +87,7 @@ public: UHD_INLINE sptr get(sptr &mb) { _mb = mb; + while (_flow_ctrl and not _flow_ctrl(_mb)) {} return make(this, _mb->cast<void *>(), _mb->size()); } |