diff options
Diffstat (limited to 'host/lib/transport/super_recv_packet_handler.hpp')
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 73 |
1 files changed, 60 insertions, 13 deletions
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 541d9f3bc..5ca1da687 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -28,6 +28,9 @@ #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> +#ifdef DEVICE3_STREAMER +# include "../rfnoc/rx_stream_terminator.hpp" +#endif #include <boost/dynamic_bitset.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> @@ -112,6 +115,35 @@ public: _header_offset_words32 = header_offset_words32; } + ////////////////// RFNOC /////////////////////////// + //! Set the stream ID for a specific channel (or no SID) + void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){ + _props.at(xport_chan).has_sid = has_sid; + _props.at(xport_chan).sid = sid; + } + + //! Get the stream ID for a specific channel (or zero if no SID) + boost::uint32_t get_xport_chan_sid(const size_t xport_chan) const { + if (_props.at(xport_chan).has_sid) { + return _props.at(xport_chan).sid; + } else { + return 0; + } + } + + #ifdef DEVICE3_STREAMER + void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator) + { + _terminator = terminator; + } + + uhd::rfnoc::rx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + #endif + ////////////////// RFNOC /////////////////////////// + /*! * Set the threshold for alignment failure. * How many packets throw out before giving up? @@ -194,11 +226,12 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { - if (stream_cmd.stream_now - and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS - and _props.size() > 1) { - throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); - } + // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this. + //if (stream_cmd.stream_now + //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS + //and _props.size() > 1) { + //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); + //} for (size_t i = 0; i < _props.size(); i++) { @@ -288,6 +321,10 @@ private: handle_overflow_type handle_overflow; handle_flowctrl_type handle_flowctrl; size_t fc_update_window; + /////// RFNOC /////////// + bool has_sid; + boost::uint32_t sid; + /////// RFNOC /////////// }; std::vector<xport_chan_props_type> _props; size_t _num_outputs; @@ -360,6 +397,10 @@ private: int recvd_packets; #endif + #ifdef DEVICE3_STREAMER + uhd::rfnoc::rx_stream_terminator::sptr _terminator; + #endif + /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -426,6 +467,7 @@ private: const size_t expected_packet_count = _props[index].packet_count; _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask; if (expected_packet_count != info.ifpi.packet_count){ + //UHD_MSG(status) << "expected: " << expected_packet_count << " got: " << info.ifpi.packet_count << std::endl; if (_props[index].handle_flowctrl) { // Always update flow control in this case, because we don't // know which packet was dropped and what state the upstream @@ -447,6 +489,10 @@ private: void _flush_all(double timeout) { + get_prev_buffer_info().reset(); + get_curr_buffer_info().reset(); + get_next_buffer_info().reset(); + for (size_t i = 0; i < _props.size(); i++) { per_buffer_info_type prev_buffer_info, curr_buffer_info; @@ -467,9 +513,6 @@ private: curr_buffer_info.reset(); } } - get_prev_buffer_info().reset(); - get_curr_buffer_info().reset(); - get_next_buffer_info().reset(); } /******************************************************************* @@ -567,16 +610,20 @@ private: curr_info.metadata.time_spec = next_info[index].time; curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ + // Not sending flow control would cause timeouts due to source flow control locking up. + // Send first as the overrun handler may flush the receive buffers which could contain + // packets with sequence numbers after this packet's sequence number! + if(_props[index].handle_flowctrl) { + _props[index].handle_flowctrl(next_info[index].ifpi.packet_count); + } + rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); curr_info.metadata = metadata; UHD_MSG(fastpath) << "O"; - - // Not sending flow control would cause timeouts due to source flow control locking up - if(_props[index].handle_flowctrl) { - _props[index].handle_flowctrl(next_info[index].ifpi.packet_count); - } } + curr_info[index].buff.reset(); + curr_info[index].copy_buff = NULL; return; case PACKET_TIMEOUT_ERROR: |