diff options
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 143 |
1 files changed, 67 insertions, 76 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 921bfdfa0..c962d40e6 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -12,9 +12,9 @@ #include <uhd/exception.hpp> #include <uhd/convert.hpp> #include <uhd/stream.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> @@ -22,7 +22,6 @@ #include <boost/dynamic_bitset.hpp> #include <boost/function.hpp> #include <boost/format.hpp> -#include <boost/bind.hpp> #include <boost/make_shared.hpp> #include <iostream> #include <vector> @@ -59,6 +58,7 @@ class recv_packet_handler{ public: typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; typedef boost::function<void(const size_t)> handle_flowctrl_type; + typedef std::function<void(const uint32_t *)> handle_flowctrl_ack_type; typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type; typedef void(*vrt_unpacker_type)(const uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(const uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; @@ -102,33 +102,6 @@ public: _header_offset_words32 = header_offset_words32; } - ////////////////// RFNOC /////////////////////////// - //! Set the stream ID for a specific channel (or no SID) - void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const uint32_t sid = 0){ - _props.at(xport_chan).has_sid = has_sid; - _props.at(xport_chan).sid = sid; - } - - //! Get the stream ID for a specific channel (or zero if no SID) - uint32_t get_xport_chan_sid(const size_t xport_chan) const { - if (_props.at(xport_chan).has_sid) { - return _props.at(xport_chan).sid; - } else { - return 0; - } - } - - void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator) - { - _terminator = terminator; - } - - uhd::rfnoc::rx_stream_terminator::sptr get_terminator() - { - return _terminator; - } - ////////////////// RFNOC /////////////////////////// - /*! * Set the threshold for alignment failure. * How many packets throw out before giving up? @@ -183,6 +156,13 @@ public: if (do_init) handle_flowctrl(0); } + void set_xport_handle_flowctrl_ack( + const size_t xport_chan, + const handle_flowctrl_ack_type &handle_flowctrl_ack + ) { + _props.at(xport_chan).handle_flowctrl_ack = handle_flowctrl_ack; + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_outputs = id.num_outputs; @@ -211,12 +191,11 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { - // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this. - //if (stream_cmd.stream_now - //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS - //and _props.size() > 1) { - //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); - //} + if (size() > 1 and stream_cmd.stream_now and + stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) + { + throw uhd::runtime_error("Invalid recv stream command - stream now on multiple channels in a single streamer will fail to time align."); + } for (size_t i = 0; i < _props.size(); i++) { @@ -307,11 +286,8 @@ private: size_t packet_count; handle_overflow_type handle_overflow; handle_flowctrl_type handle_flowctrl; + handle_flowctrl_ack_type handle_flowctrl_ack; size_t fc_update_window; - /////// RFNOC /////////// - bool has_sid; - uint32_t sid; - /////// RFNOC /////////// }; std::vector<xport_chan_props_type> _props; size_t _num_outputs; @@ -340,6 +316,7 @@ private: buffers_info_type(const size_t size): std::vector<per_buffer_info_type>(size), indexes_todo(size, true), + alignment_time(0), alignment_time_valid(false), data_bytes_to_copy(0), fragment_offset_in_samps(0) @@ -384,8 +361,6 @@ private: int recvd_packets; #endif - uhd::rfnoc::rx_stream_terminator::sptr _terminator; - /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -398,42 +373,58 @@ private: per_buffer_info_type &curr_buffer_info, double timeout ){ - //get a single packet from the transport layer managed_recv_buffer::sptr &buff = curr_buffer_info.buff; - buff = _props[index].get_buff(timeout); - if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - - #ifdef ERROR_INJECT_DROPPED_PACKETS - if (++recvd_packets > 1000) + per_buffer_info_type &info = curr_buffer_info; + while (1) { - recvd_packets = 0; - buff.reset(); + //get a single packet from the transport layer buff = _props[index].get_buff(timeout); if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; - } - #endif - //bounds check before extract - size_t num_packet_words32 = buff->size()/sizeof(uint32_t); - if (num_packet_words32 <= _header_offset_words32){ - throw std::runtime_error("recv buffer smaller than vrt packet offset"); - } + #ifdef ERROR_INJECT_DROPPED_PACKETS + if (++recvd_packets > 1000) + { + recvd_packets = 0; + buff.reset(); + buff = _props[index].get_buff(timeout); + if (buff.get() == nullptr) return PACKET_TIMEOUT_ERROR; + } + #endif - //extract packet info - per_buffer_info_type &info = curr_buffer_info; - info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; - info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; - _vrt_unpacker(info.vrt_hdr, info.ifpi); - info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true - info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); - - //handle flow control - if (_props[index].handle_flowctrl) - { - if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast<const uint32_t *>() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true + info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + + //handle flow control + if (_props[index].handle_flowctrl) { - _props[index].handle_flowctrl(info.ifpi.packet_count); + if ((info.ifpi.packet_count % _props[index].fc_update_window) == 0) + { + _props[index].handle_flowctrl(info.ifpi.packet_count); + } } + + //handle flow control ack + if (info.ifpi.fc_ack){ + if (_props[index].handle_flowctrl_ack) { + _props[index].handle_flowctrl_ack(reinterpret_cast<const uint32_t *>(info.copy_buff)); + } + // Process the next packet + buff.reset(); + info.copy_buff = nullptr; + continue; + } + + break; } //-------------------------------------------------------------- @@ -605,7 +596,7 @@ private: rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); curr_info.metadata = metadata; - UHD_LOG_FASTPATH("O") + UHD_LOG_FASTPATH("O"); } curr_info[index].buff.reset(); curr_info[index].copy_buff = nullptr; @@ -627,7 +618,7 @@ private: prev_info[index].ifpi.num_payload_words32*sizeof(uint32_t)/_bytes_per_otw_item, _samp_rate); curr_info.metadata.out_of_sequence = true; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; - UHD_LOG_FASTPATH("D") + UHD_LOG_FASTPATH("D"); return; } @@ -635,10 +626,10 @@ private: //too many iterations: detect alignment failure if (iterations++ > _alignment_failure_threshold){ UHD_LOGGER_ERROR("STREAMER") << boost::format( - "The receive packet handler failed to time-align packets. " - "%u received packets were processed by the handler. " - "However, a timestamp match could not be determined." - ) % iterations; + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; std::swap(curr_info, next_info); //save progress from curr -> next curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; _props[index].handle_overflow(); @@ -659,7 +650,7 @@ private: } /******************************************************************* - * Receive a single packet: + * Receive a single packet on all channels * Handles fragmentation, messages, errors, and copy-conversion. * When no fragments are available, call the get aligned buffers. * Then copy-convert available data into the user's IO buffers. |