diff options
Diffstat (limited to 'host/lib/include')
4 files changed, 130 insertions, 29 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp index 5440c1e37..5327105c8 100644 --- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp +++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp @@ -324,7 +324,6 @@ private: std::unordered_map<size_t, double> _rx_bandwidth; std::vector<uhd::stream_cmd_t> _last_stream_cmd; - std::vector<bool> _restart_cont; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp index 1d1d0805d..d39d88f43 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -10,8 +10,8 @@ #include <uhd/rfnoc/node.hpp> #include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> #include <uhdlib/transport/rx_streamer_impl.hpp> -#include <string> #include <atomic> +#include <string> namespace uhd { namespace rfnoc { @@ -75,16 +75,17 @@ public: */ bool check_topology(const std::vector<size_t>& connected_inputs, const std::vector<size_t>& connected_outputs); + private: void _register_props(const size_t chan, const std::string& otw_format); void _handle_rx_event_action( const res_source_info& src, rx_event_action_info::sptr rx_event_action); - void _handle_restart_request( - const res_source_info& src, action_info::sptr rx_event_action); void _handle_stream_cmd_action( const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action); + void _handle_overrun(); + // Properties std::vector<property_t<double>> _scaling_in; std::vector<property_t<double>> _samp_rate_in; @@ -98,6 +99,7 @@ private: const uhd::stream_args_t _stream_args; std::atomic<bool> _overrun_handling_mode{false}; + size_t _overrun_channel = 0; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index d3fe97c7f..cc989e8f2 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -196,6 +196,19 @@ protected: _zero_copy_streamer.set_tick_rate(rate); } + //! Notifies the streamer that an overrun has occured + void set_stopped_due_to_overrun() + { + _zero_copy_streamer.set_stopped_due_to_overrun(); + } + + //! Provides a callback to handle overruns + void set_overrun_handler( + typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler) + { + _zero_copy_streamer.set_overrun_handler(handler); + } + private: //! Converter and associated item sizes struct convert_info diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 36f568f2d..1f7320330 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -14,6 +14,7 @@ #include <uhdlib/transport/get_aligned_buffs.hpp> #include <boost/format.hpp> #include <vector> +#include <atomic> namespace uhd { namespace transport { @@ -26,6 +27,8 @@ template <typename transport_t> class rx_streamer_zero_copy { public: + using overrun_handler_t = std::function<void()>; + //! Constructor rx_streamer_zero_copy(const size_t num_ports) : _xports(num_ports) @@ -84,6 +87,18 @@ public: _bytes_per_item = bpi; } + //! Notifies the streamer that an overrun has occured + void set_stopped_due_to_overrun() + { + _stopped_due_to_overrun = true; + } + + //! Provides a callback to handle overruns + void set_overrun_handler(overrun_handler_t handler) + { + _overrun_handler = handler; + } + /*! * Gets a set of time-aligned buffers, one per channel. * @@ -96,35 +111,62 @@ public: rx_metadata_t& metadata, const int32_t timeout_ms) { - metadata.reset(); + // Function to set metadata based on alignment error + auto set_metadata_for_error = + [this](typename get_aligned_buffs_t::alignment_result_t error, + rx_metadata_t& metadata) { + switch (error) { + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + break; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + break; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + break; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + std::tie(metadata.has_time_spec, metadata.time_spec) = + _last_read_time_info.get_next_packet_time(_samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + break; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + }; - switch (_get_aligned_buffs(timeout_ms)) { - case get_aligned_buffs_t::SUCCESS: - break; - - case get_aligned_buffs_t::BAD_PACKET: - metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; - return 0; - - case get_aligned_buffs_t::TIMEOUT: - metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; - return 0; - - case get_aligned_buffs_t::ALIGNMENT_FAILURE: - metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; - return 0; + metadata.reset(); - case get_aligned_buffs_t::SEQUENCE_ERROR: - metadata.has_time_spec = _last_read_time_info.has_time_spec; - metadata.time_spec = - _last_read_time_info.time_spec - + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); - metadata.out_of_sequence = true; - metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + // Try to get buffs with a 0 timeout first. This avoids needing to check + // if radios are stopped due to overrun when packets are available. + auto result = _get_aligned_buffs(0); + + if (result == get_aligned_buffs_t::TIMEOUT) { + if (_stopped_due_to_overrun) { + // An overrun occurred and the user has read all the packets + // that were buffered prior to the overrun. Call the overrun + // handler and return overrun error. + _handle_overrun(); + std::tie(metadata.has_time_spec, metadata.time_spec) = + _last_read_time_info.get_next_packet_time(_samp_rate); + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + _stopped_due_to_overrun = false; return 0; + } else { + // Packets were not available with zero timeout, wait for them + // to arrive using the specified timeout. + result = _get_aligned_buffs(timeout_ms); + } + } - default: - UHD_THROW_INVALID_CODE_PATH(); + if (result != get_aligned_buffs_t::SUCCESS) { + set_metadata_for_error(result, metadata); + return 0; } // Get payload pointers for each buffer and aggregate eob. We set eob to @@ -167,6 +209,34 @@ public: private: using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + void _handle_overrun() + { + // Flush any remaining packets. This method is called after any channel + // times out, so here we ensure all channels are flushed prior to + // calling the overrun handler to potentially restart the radios. + for (size_t chan = 0; chan < _xports.size(); chan++) { + if (_frame_buffs[chan]) { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + + frame_buff::uptr buff; + while (true) { + std::tie(buff, std::ignore, std::ignore) = + _xports[chan]->get_recv_buff(0); + if (!buff) { + break; + } + _xports[chan]->release_recv_buff(std::move(buff)); + } + } + + // Now call the overrun handler + if (_overrun_handler) { + _overrun_handler(); + } + } + // Information recorded by streamer about the last data packet processed, // used to create the metadata when there is a sequence error. struct last_read_time_info_t @@ -174,6 +244,16 @@ private: size_t num_samps = 0; bool has_time_spec = false; time_spec_t time_spec; + + std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate) + { + if (has_time_spec) { + return std::make_tuple( + true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate)); + } else { + return std::make_tuple(false, time_spec_t()); + } + } }; // Transports for each channel @@ -200,6 +280,13 @@ private: // Information about the last data packet processed last_read_time_info_t _last_read_time_info; + + // Flag that indicates an overrun occurred. The streamer will return an + // overrun error when no more packets are available. + std::atomic<bool> _stopped_due_to_overrun{false}; + + // Callback for overrun + overrun_handler_t _overrun_handler; }; }} // namespace uhd::transport |