diff options
Diffstat (limited to 'host/lib/include/uhdlib/transport')
-rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_impl.hpp | 13 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp | 137 |
2 files changed, 125 insertions, 25 deletions
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index d3fe97c7f..cc989e8f2 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -196,6 +196,19 @@ protected: _zero_copy_streamer.set_tick_rate(rate); } + //! Notifies the streamer that an overrun has occured + void set_stopped_due_to_overrun() + { + _zero_copy_streamer.set_stopped_due_to_overrun(); + } + + //! Provides a callback to handle overruns + void set_overrun_handler( + typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler) + { + _zero_copy_streamer.set_overrun_handler(handler); + } + private: //! Converter and associated item sizes struct convert_info diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 36f568f2d..1f7320330 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -14,6 +14,7 @@ #include <uhdlib/transport/get_aligned_buffs.hpp> #include <boost/format.hpp> #include <vector> +#include <atomic> namespace uhd { namespace transport { @@ -26,6 +27,8 @@ template <typename transport_t> class rx_streamer_zero_copy { public: + using overrun_handler_t = std::function<void()>; + //! Constructor rx_streamer_zero_copy(const size_t num_ports) : _xports(num_ports) @@ -84,6 +87,18 @@ public: _bytes_per_item = bpi; } + //! Notifies the streamer that an overrun has occured + void set_stopped_due_to_overrun() + { + _stopped_due_to_overrun = true; + } + + //! Provides a callback to handle overruns + void set_overrun_handler(overrun_handler_t handler) + { + _overrun_handler = handler; + } + /*! * Gets a set of time-aligned buffers, one per channel. * @@ -96,35 +111,62 @@ public: rx_metadata_t& metadata, const int32_t timeout_ms) { - metadata.reset(); + // Function to set metadata based on alignment error + auto set_metadata_for_error = + [this](typename get_aligned_buffs_t::alignment_result_t error, + rx_metadata_t& metadata) { + switch (error) { + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + break; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + break; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + break; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + std::tie(metadata.has_time_spec, metadata.time_spec) = + _last_read_time_info.get_next_packet_time(_samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + break; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + }; - switch (_get_aligned_buffs(timeout_ms)) { - case get_aligned_buffs_t::SUCCESS: - break; - - case get_aligned_buffs_t::BAD_PACKET: - metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; - return 0; - - case get_aligned_buffs_t::TIMEOUT: - metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; - return 0; - - case get_aligned_buffs_t::ALIGNMENT_FAILURE: - metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; - return 0; + metadata.reset(); - case get_aligned_buffs_t::SEQUENCE_ERROR: - metadata.has_time_spec = _last_read_time_info.has_time_spec; - metadata.time_spec = - _last_read_time_info.time_spec - + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); - metadata.out_of_sequence = true; - metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + // Try to get buffs with a 0 timeout first. This avoids needing to check + // if radios are stopped due to overrun when packets are available. + auto result = _get_aligned_buffs(0); + + if (result == get_aligned_buffs_t::TIMEOUT) { + if (_stopped_due_to_overrun) { + // An overrun occurred and the user has read all the packets + // that were buffered prior to the overrun. Call the overrun + // handler and return overrun error. + _handle_overrun(); + std::tie(metadata.has_time_spec, metadata.time_spec) = + _last_read_time_info.get_next_packet_time(_samp_rate); + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + _stopped_due_to_overrun = false; return 0; + } else { + // Packets were not available with zero timeout, wait for them + // to arrive using the specified timeout. + result = _get_aligned_buffs(timeout_ms); + } + } - default: - UHD_THROW_INVALID_CODE_PATH(); + if (result != get_aligned_buffs_t::SUCCESS) { + set_metadata_for_error(result, metadata); + return 0; } // Get payload pointers for each buffer and aggregate eob. We set eob to @@ -167,6 +209,34 @@ public: private: using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + void _handle_overrun() + { + // Flush any remaining packets. This method is called after any channel + // times out, so here we ensure all channels are flushed prior to + // calling the overrun handler to potentially restart the radios. + for (size_t chan = 0; chan < _xports.size(); chan++) { + if (_frame_buffs[chan]) { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + + frame_buff::uptr buff; + while (true) { + std::tie(buff, std::ignore, std::ignore) = + _xports[chan]->get_recv_buff(0); + if (!buff) { + break; + } + _xports[chan]->release_recv_buff(std::move(buff)); + } + } + + // Now call the overrun handler + if (_overrun_handler) { + _overrun_handler(); + } + } + // Information recorded by streamer about the last data packet processed, // used to create the metadata when there is a sequence error. struct last_read_time_info_t @@ -174,6 +244,16 @@ private: size_t num_samps = 0; bool has_time_spec = false; time_spec_t time_spec; + + std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate) + { + if (has_time_spec) { + return std::make_tuple( + true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate)); + } else { + return std::make_tuple(false, time_spec_t()); + } + } }; // Transports for each channel @@ -200,6 +280,13 @@ private: // Information about the last data packet processed last_read_time_info_t _last_read_time_info; + + // Flag that indicates an overrun occurred. The streamer will return an + // overrun error when no more packets are available. + std::atomic<bool> _stopped_due_to_overrun{false}; + + // Callback for overrun + overrun_handler_t _overrun_handler; }; }} // namespace uhd::transport |