diff options
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp | 1 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp | 8 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_impl.hpp | 13 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp | 137 | ||||
-rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 22 | ||||
-rw-r--r-- | host/lib/rfnoc/rfnoc_rx_streamer.cpp | 49 |
6 files changed, 158 insertions, 72 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp index 5440c1e37..5327105c8 100644 --- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp +++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp @@ -324,7 +324,6 @@ private: std::unordered_map<size_t, double> _rx_bandwidth; std::vector<uhd::stream_cmd_t> _last_stream_cmd; - std::vector<bool> _restart_cont; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp index 1d1d0805d..d39d88f43 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp @@ -10,8 +10,8 @@ #include <uhd/rfnoc/node.hpp> #include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> #include <uhdlib/transport/rx_streamer_impl.hpp> -#include <string> #include <atomic> +#include <string> namespace uhd { namespace rfnoc { @@ -75,16 +75,17 @@ public: */ bool check_topology(const std::vector<size_t>& connected_inputs, const std::vector<size_t>& connected_outputs); + private: void _register_props(const size_t chan, const std::string& otw_format); void _handle_rx_event_action( const res_source_info& src, rx_event_action_info::sptr rx_event_action); - void _handle_restart_request( - const res_source_info& src, action_info::sptr rx_event_action); void _handle_stream_cmd_action( const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action); + void _handle_overrun(); + // Properties std::vector<property_t<double>> _scaling_in; std::vector<property_t<double>> _samp_rate_in; @@ -98,6 +99,7 @@ private: const uhd::stream_args_t _stream_args; std::atomic<bool> _overrun_handling_mode{false}; + size_t _overrun_channel = 0; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index d3fe97c7f..cc989e8f2 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -196,6 +196,19 @@ protected: _zero_copy_streamer.set_tick_rate(rate); } + //! Notifies the streamer that an overrun has occured + void set_stopped_due_to_overrun() + { + _zero_copy_streamer.set_stopped_due_to_overrun(); + } + + //! Provides a callback to handle overruns + void set_overrun_handler( + typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler) + { + _zero_copy_streamer.set_overrun_handler(handler); + } + private: //! Converter and associated item sizes struct convert_info diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 36f568f2d..1f7320330 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -14,6 +14,7 @@ #include <uhdlib/transport/get_aligned_buffs.hpp> #include <boost/format.hpp> #include <vector> +#include <atomic> namespace uhd { namespace transport { @@ -26,6 +27,8 @@ template <typename transport_t> class rx_streamer_zero_copy { public: + using overrun_handler_t = std::function<void()>; + //! Constructor rx_streamer_zero_copy(const size_t num_ports) : _xports(num_ports) @@ -84,6 +87,18 @@ public: _bytes_per_item = bpi; } + //! Notifies the streamer that an overrun has occured + void set_stopped_due_to_overrun() + { + _stopped_due_to_overrun = true; + } + + //! Provides a callback to handle overruns + void set_overrun_handler(overrun_handler_t handler) + { + _overrun_handler = handler; + } + /*! * Gets a set of time-aligned buffers, one per channel. * @@ -96,35 +111,62 @@ public: rx_metadata_t& metadata, const int32_t timeout_ms) { - metadata.reset(); + // Function to set metadata based on alignment error + auto set_metadata_for_error = + [this](typename get_aligned_buffs_t::alignment_result_t error, + rx_metadata_t& metadata) { + switch (error) { + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + break; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + break; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + break; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + std::tie(metadata.has_time_spec, metadata.time_spec) = + _last_read_time_info.get_next_packet_time(_samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + break; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + }; - switch (_get_aligned_buffs(timeout_ms)) { - case get_aligned_buffs_t::SUCCESS: - break; - - case get_aligned_buffs_t::BAD_PACKET: - metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; - return 0; - - case get_aligned_buffs_t::TIMEOUT: - metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; - return 0; - - case get_aligned_buffs_t::ALIGNMENT_FAILURE: - metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; - return 0; + metadata.reset(); - case get_aligned_buffs_t::SEQUENCE_ERROR: - metadata.has_time_spec = _last_read_time_info.has_time_spec; - metadata.time_spec = - _last_read_time_info.time_spec - + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); - metadata.out_of_sequence = true; - metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + // Try to get buffs with a 0 timeout first. This avoids needing to check + // if radios are stopped due to overrun when packets are available. + auto result = _get_aligned_buffs(0); + + if (result == get_aligned_buffs_t::TIMEOUT) { + if (_stopped_due_to_overrun) { + // An overrun occurred and the user has read all the packets + // that were buffered prior to the overrun. Call the overrun + // handler and return overrun error. + _handle_overrun(); + std::tie(metadata.has_time_spec, metadata.time_spec) = + _last_read_time_info.get_next_packet_time(_samp_rate); + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + _stopped_due_to_overrun = false; return 0; + } else { + // Packets were not available with zero timeout, wait for them + // to arrive using the specified timeout. + result = _get_aligned_buffs(timeout_ms); + } + } - default: - UHD_THROW_INVALID_CODE_PATH(); + if (result != get_aligned_buffs_t::SUCCESS) { + set_metadata_for_error(result, metadata); + return 0; } // Get payload pointers for each buffer and aggregate eob. We set eob to @@ -167,6 +209,34 @@ public: private: using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + void _handle_overrun() + { + // Flush any remaining packets. This method is called after any channel + // times out, so here we ensure all channels are flushed prior to + // calling the overrun handler to potentially restart the radios. + for (size_t chan = 0; chan < _xports.size(); chan++) { + if (_frame_buffs[chan]) { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + + frame_buff::uptr buff; + while (true) { + std::tie(buff, std::ignore, std::ignore) = + _xports[chan]->get_recv_buff(0); + if (!buff) { + break; + } + _xports[chan]->release_recv_buff(std::move(buff)); + } + } + + // Now call the overrun handler + if (_overrun_handler) { + _overrun_handler(); + } + } + // Information recorded by streamer about the last data packet processed, // used to create the metadata when there is a sequence error. struct last_read_time_info_t @@ -174,6 +244,16 @@ private: size_t num_samps = 0; bool has_time_spec = false; time_spec_t time_spec; + + std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate) + { + if (has_time_spec) { + return std::make_tuple( + true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate)); + } else { + return std::make_tuple(false, time_spec_t()); + } + } }; // Transports for each channel @@ -200,6 +280,13 @@ private: // Information about the last data packet processed last_read_time_info_t _last_read_time_info; + + // Flag that indicates an overrun occurred. The streamer will return an + // overrun error when no more packets are available. + std::atomic<bool> _stopped_due_to_overrun{false}; + + // Callback for overrun + overrun_handler_t _overrun_handler; }; }} // namespace uhd::transport diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 9d7257108..e400033b3 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -6,6 +6,7 @@ #include <uhd/exception.hpp> #include <uhd/utils/log.hpp> +#include <uhd/rfnoc/mb_controller.hpp> #include <uhdlib/rfnoc/radio_control_impl.hpp> #include <uhdlib/utils/compat_check.hpp> #include <map> @@ -63,6 +64,8 @@ const uint32_t radio_control_impl::regmap::RX_CMD_TIMED_POS; const uhd::fs_path radio_control_impl::DB_PATH("dboard"); const uhd::fs_path radio_control_impl::FE_PATH("frontends"); +static constexpr double OVERRUN_RESTART_DELAY = 0.05; + /**************************************************************************** * Structors ***************************************************************************/ @@ -74,7 +77,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args) , _spc(_radio_width & 0xFFFF) , _last_stream_cmd( get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) - , _restart_cont(get_num_output_ports(), false) { uhd::assert_fpga_compat(MAJOR_COMPAT, MINOR_COMPAT, @@ -110,16 +112,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args) return; } issue_stream_cmd(stream_cmd_action->stream_cmd, port); - if (stream_cmd_action->stream_cmd.stream_mode - == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS - && _restart_cont.at(port)) { - RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now " - "request restart."); - _restart_cont[port] = false; - auto restart_request_action = - action_info::make(ACTION_KEY_RX_RESTART_REQ); - post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action); - } }); register_action_handler(ACTION_KEY_RX_RESTART_REQ, [this](const res_source_info& src, action_info::sptr /*action*/) { @@ -131,9 +123,10 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args) } auto stream_cmd_action = stream_cmd_action_info::make( uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS); - // FIXME - // stream_cmd_action->stream_cmd.stream_now = false; - // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA; + stream_cmd_action->stream_cmd.stream_now = false; + stream_cmd_action->stream_cmd.time_spec = + get_mb_controller()->get_timekeeper(0)->get_time_now() + + uhd::time_spec_t(OVERRUN_RESTART_DELAY); const size_t port = src.instance; if (port > get_num_output_ports()) { RFNOC_LOG_WARNING("Received stream command to invalid output port!"); @@ -962,7 +955,6 @@ void radio_control_impl::async_message_handler( rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW; const bool cont_mode = _last_stream_cmd.at(chan).stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS; - _restart_cont[chan] = cont_mode; rx_event_action->args["cont_mode"] = std::to_string(cont_mode); RFNOC_LOG_TRACE("Posting overrun event action message."); post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan}, diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp index cfa88b292..b50e2fe15 100644 --- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -18,16 +18,18 @@ using namespace uhd::rfnoc; const std::string STREAMER_ID = "RxStreamer"; static std::atomic<uint64_t> streamer_inst_ctr; -rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, - const uhd::stream_args_t stream_args) +rfnoc_rx_streamer::rfnoc_rx_streamer( + const size_t num_chans, const uhd::stream_args_t stream_args) : rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args) , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) , _stream_args(stream_args) { + set_overrun_handler([this]() { this->_handle_overrun(); }); + // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); - // + register_action_handler(ACTION_KEY_RX_EVENT, [this](const res_source_info& src, action_info::sptr action) { rx_event_action_info::sptr rx_event_action = @@ -38,10 +40,6 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, } _handle_rx_event_action(src, rx_event_action); }); - register_action_handler(ACTION_KEY_RX_RESTART_REQ, - [this](const res_source_info& src, action_info::sptr action) { - _handle_restart_request(src, action); - }); register_action_handler(ACTION_KEY_STREAM_CMD, [this](const res_source_info& src, action_info::sptr action) { stream_cmd_action_info::sptr stream_cmd_action = @@ -117,6 +115,15 @@ bool rfnoc_rx_streamer::check_topology( return node_t::check_topology(connected_inputs, connected_outputs); } +void rfnoc_rx_streamer::_handle_overrun() +{ + if (_overrun_handling_mode) { + RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); + post_action({res_source_info::INPUT_EDGE, _overrun_channel}, + action_info::make(ACTION_KEY_RX_RESTART_REQ)); + } +} + void rfnoc_rx_streamer::_register_props(const size_t chan, const std::string& otw_format) { @@ -178,6 +185,7 @@ void rfnoc_rx_streamer::_handle_rx_event_action( RFNOC_LOG_TRACE("Ignoring duplicate overrun message."); return; } + _overrun_channel = src.instance; RFNOC_LOG_TRACE( "Switching to overrun-handling mode: Stopping all upstream producers..."); auto stop_action = @@ -188,32 +196,17 @@ void rfnoc_rx_streamer::_handle_rx_event_action( post_action({res_source_info::INPUT_EDGE, i}, stop_action); } if (!rx_event_action->args.cast<bool>("cont_mode", false)) { - // FIXME wait until it's safe to restart radios - // If we don't need to restart, that's all we need to do + // If we don't need to restart, that's all we need to do. Clear this + // flag before setting the stopped due to overrun status below to + // avoid a potential race condition with the overrun handler. _overrun_handling_mode = false; } + // Tell the streamer to flag an overrun to the user after the data that + // was buffered prior to the overrun is read. + set_stopped_due_to_overrun(); } } -void rfnoc_rx_streamer::_handle_restart_request( - const res_source_info& src, action_info::sptr) -{ - // FIXME: Now we need to wait until it's safe to restart the radios. - // A flush would achieve this, albeit at the cost of possibly losing - // samples. - // The earliest we can restart is when the FIFOs in upstream producers - // are empty. - RFNOC_LOG_TRACE("Waiting for FIFOs to clear"); - - std::this_thread::sleep_for(100ms); - - // Once it's safe to restart the radios, we ask a radio to send us a - // stream command with its current time. - RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); - post_action({res_source_info::INPUT_EDGE, src.instance}, - action_info::make(ACTION_KEY_RX_RESTART_REQ)); -} - void rfnoc_rx_streamer::_handle_stream_cmd_action( const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action) { |