aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/rfnoc_rx_streamer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/rfnoc_rx_streamer.cpp')
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp90
1 files changed, 90 insertions, 0 deletions
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
index 4340faff0..cfa88b292 100644
--- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -8,7 +8,10 @@
#include <uhdlib/rfnoc/node_accessor.hpp>
#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
#include <atomic>
+#include <thread>
+using namespace std::chrono_literals;
+;
using namespace uhd;
using namespace uhd::rfnoc;
@@ -24,6 +27,31 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
+ //
+ register_action_handler(ACTION_KEY_RX_EVENT,
+ [this](const res_source_info& src, action_info::sptr action) {
+ rx_event_action_info::sptr rx_event_action =
+ std::dynamic_pointer_cast<rx_event_action_info>(action);
+ if (!rx_event_action) {
+ RFNOC_LOG_WARNING("Received invalid RX event action!");
+ return;
+ }
+ _handle_rx_event_action(src, rx_event_action);
+ });
+ register_action_handler(ACTION_KEY_RX_RESTART_REQ,
+ [this](const res_source_info& src, action_info::sptr action) {
+ _handle_restart_request(src, action);
+ });
+ register_action_handler(ACTION_KEY_STREAM_CMD,
+ [this](const res_source_info& src, action_info::sptr action) {
+ stream_cmd_action_info::sptr stream_cmd_action =
+ std::dynamic_pointer_cast<stream_cmd_action_info>(action);
+ if (!stream_cmd_action) {
+ RFNOC_LOG_WARNING("Received invalid stream command action!");
+ return;
+ }
+ _handle_stream_cmd_action(src, stream_cmd_action);
+ });
// Initialize properties
_scaling_in.reserve(num_chans);
@@ -139,3 +167,65 @@ void rfnoc_rx_streamer::_register_props(const size_t chan,
}
});
}
+
+void rfnoc_rx_streamer::_handle_rx_event_action(
+ const res_source_info& src, rx_event_action_info::sptr rx_event_action)
+{
+ UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE);
+ if (rx_event_action->error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) {
+ RFNOC_LOG_DEBUG("Received overrun message on port " << src.instance);
+ if (_overrun_handling_mode.exchange(true)) {
+ RFNOC_LOG_TRACE("Ignoring duplicate overrun message.");
+ return;
+ }
+ RFNOC_LOG_TRACE(
+ "Switching to overrun-handling mode: Stopping all upstream producers...");
+ auto stop_action =
+ stream_cmd_action_info::make(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ // Reminder: Delivery of all of these actions is deferred until this
+ // action handler is complete.
+ for (size_t i = 0; i < get_num_input_ports(); ++i) {
+ post_action({res_source_info::INPUT_EDGE, i}, stop_action);
+ }
+ if (!rx_event_action->args.cast<bool>("cont_mode", false)) {
+ // FIXME wait until it's safe to restart radios
+ // If we don't need to restart, that's all we need to do
+ _overrun_handling_mode = false;
+ }
+ }
+}
+
+void rfnoc_rx_streamer::_handle_restart_request(
+ const res_source_info& src, action_info::sptr)
+{
+ // FIXME: Now we need to wait until it's safe to restart the radios.
+ // A flush would achieve this, albeit at the cost of possibly losing
+ // samples.
+ // The earliest we can restart is when the FIFOs in upstream producers
+ // are empty.
+ RFNOC_LOG_TRACE("Waiting for FIFOs to clear");
+
+ std::this_thread::sleep_for(100ms);
+
+ // Once it's safe to restart the radios, we ask a radio to send us a
+ // stream command with its current time.
+ RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
+ post_action({res_source_info::INPUT_EDGE, src.instance},
+ action_info::make(ACTION_KEY_RX_RESTART_REQ));
+}
+
+void rfnoc_rx_streamer::_handle_stream_cmd_action(
+ const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action)
+{
+ RFNOC_LOG_TRACE("Received stream command on " << src.to_string());
+ UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE);
+ auto start_action =
+ stream_cmd_action_info::make(stream_cmd_action->stream_cmd.stream_mode);
+ start_action->stream_cmd = stream_cmd_action->stream_cmd;
+ for (size_t i = 0; i < get_num_input_ports(); ++i) {
+ post_action({res_source_info::INPUT_EDGE, i}, start_action);
+ }
+ if (_overrun_handling_mode.exchange(false)) {
+ RFNOC_LOG_TRACE("Leaving overrun handling mode.");
+ }
+}