diff options
Diffstat (limited to 'host/lib/rfnoc/rfnoc_rx_streamer.cpp')
-rw-r--r-- | host/lib/rfnoc/rfnoc_rx_streamer.cpp | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp index 4340faff0..cfa88b292 100644 --- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp @@ -8,7 +8,10 @@ #include <uhdlib/rfnoc/node_accessor.hpp> #include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp> #include <atomic> +#include <thread> +using namespace std::chrono_literals; +; using namespace uhd; using namespace uhd::rfnoc; @@ -24,6 +27,31 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans, // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); + // + register_action_handler(ACTION_KEY_RX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + rx_event_action_info::sptr rx_event_action = + std::dynamic_pointer_cast<rx_event_action_info>(action); + if (!rx_event_action) { + RFNOC_LOG_WARNING("Received invalid RX event action!"); + return; + } + _handle_rx_event_action(src, rx_event_action); + }); + register_action_handler(ACTION_KEY_RX_RESTART_REQ, + [this](const res_source_info& src, action_info::sptr action) { + _handle_restart_request(src, action); + }); + register_action_handler(ACTION_KEY_STREAM_CMD, + [this](const res_source_info& src, action_info::sptr action) { + stream_cmd_action_info::sptr stream_cmd_action = + std::dynamic_pointer_cast<stream_cmd_action_info>(action); + if (!stream_cmd_action) { + RFNOC_LOG_WARNING("Received invalid stream command action!"); + return; + } + _handle_stream_cmd_action(src, stream_cmd_action); + }); // Initialize properties _scaling_in.reserve(num_chans); @@ -139,3 +167,65 @@ void rfnoc_rx_streamer::_register_props(const size_t chan, } }); } + +void rfnoc_rx_streamer::_handle_rx_event_action( + const res_source_info& src, rx_event_action_info::sptr rx_event_action) +{ + UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); + if (rx_event_action->error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) { + RFNOC_LOG_DEBUG("Received overrun message on port " << src.instance); + if (_overrun_handling_mode.exchange(true)) { + RFNOC_LOG_TRACE("Ignoring duplicate overrun message."); + return; + } + RFNOC_LOG_TRACE( + "Switching to overrun-handling mode: Stopping all upstream producers..."); + auto stop_action = + stream_cmd_action_info::make(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); + // Reminder: Delivery of all of these actions is deferred until this + // action handler is complete. + for (size_t i = 0; i < get_num_input_ports(); ++i) { + post_action({res_source_info::INPUT_EDGE, i}, stop_action); + } + if (!rx_event_action->args.cast<bool>("cont_mode", false)) { + // FIXME wait until it's safe to restart radios + // If we don't need to restart, that's all we need to do + _overrun_handling_mode = false; + } + } +} + +void rfnoc_rx_streamer::_handle_restart_request( + const res_source_info& src, action_info::sptr) +{ + // FIXME: Now we need to wait until it's safe to restart the radios. + // A flush would achieve this, albeit at the cost of possibly losing + // samples. + // The earliest we can restart is when the FIFOs in upstream producers + // are empty. + RFNOC_LOG_TRACE("Waiting for FIFOs to clear"); + + std::this_thread::sleep_for(100ms); + + // Once it's safe to restart the radios, we ask a radio to send us a + // stream command with its current time. + RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node..."); + post_action({res_source_info::INPUT_EDGE, src.instance}, + action_info::make(ACTION_KEY_RX_RESTART_REQ)); +} + +void rfnoc_rx_streamer::_handle_stream_cmd_action( + const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action) +{ + RFNOC_LOG_TRACE("Received stream command on " << src.to_string()); + UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); + auto start_action = + stream_cmd_action_info::make(stream_cmd_action->stream_cmd.stream_mode); + start_action->stream_cmd = stream_cmd_action->stream_cmd; + for (size_t i = 0; i < get_num_input_ports(); ++i) { + post_action({res_source_info::INPUT_EDGE, i}, start_action); + } + if (_overrun_handling_mode.exchange(false)) { + RFNOC_LOG_TRACE("Leaving overrun handling mode."); + } +} |