aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
Diffstat (limited to 'host')
-rw-r--r--host/include/uhd/rfnoc/defaults.hpp1
-rw-r--r--host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp1
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp10
-rw-r--r--host/lib/rfnoc/graph.cpp5
-rw-r--r--host/lib/rfnoc/radio_control_impl.cpp48
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp90
6 files changed, 152 insertions, 3 deletions
diff --git a/host/include/uhd/rfnoc/defaults.hpp b/host/include/uhd/rfnoc/defaults.hpp
index 696d31f30..e1046ada2 100644
--- a/host/include/uhd/rfnoc/defaults.hpp
+++ b/host/include/uhd/rfnoc/defaults.hpp
@@ -29,6 +29,7 @@ static const io_type_t IO_TYPE_SC16 = "sc16";
static const std::string ACTION_KEY_STREAM_CMD("stream_cmd");
static const std::string ACTION_KEY_RX_EVENT("rx_event");
+static const std::string ACTION_KEY_RX_RESTART_REQ("restart_request");
//! If the block name can't be automatically detected, this name is used
static const std::string DEFAULT_BLOCK_NAME = "Block";
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
index 0d10fd13b..6b50cc31c 100644
--- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
+++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
@@ -303,6 +303,7 @@ private:
std::unordered_map<size_t, double> _rx_bandwidth;
std::vector<uhd::stream_cmd_t> _last_stream_cmd;
+ std::vector<bool> _restart_cont;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
index 6ced60d19..1d1d0805d 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -11,6 +11,7 @@
#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <uhdlib/transport/rx_streamer_impl.hpp>
#include <string>
+#include <atomic>
namespace uhd { namespace rfnoc {
@@ -77,6 +78,13 @@ public:
private:
void _register_props(const size_t chan, const std::string& otw_format);
+ void _handle_rx_event_action(
+ const res_source_info& src, rx_event_action_info::sptr rx_event_action);
+ void _handle_restart_request(
+ const res_source_info& src, action_info::sptr rx_event_action);
+ void _handle_stream_cmd_action(
+ const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action);
+
// Properties
std::vector<property_t<double>> _scaling_in;
std::vector<property_t<double>> _samp_rate_in;
@@ -88,6 +96,8 @@ private:
// Stream args provided at construction
const uhd::stream_args_t _stream_args;
+
+ std::atomic<bool> _overrun_handling_mode{false};
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp
index ff5fde1e9..1a25f84f9 100644
--- a/host/lib/rfnoc/graph.cpp
+++ b/host/lib/rfnoc/graph.cpp
@@ -417,8 +417,9 @@ void graph_t::enqueue_action(
// The following call can cause other nodes to add more actions to
// the end of _action_queue!
UHD_LOG_TRACE(LOG_ID,
- "Now delivering action " << next_action_sptr->key << "#"
- << next_action_sptr->id);
+ "Now delivering action "
+ << next_action_sptr->key << "#" << next_action_sptr->id << " to "
+ << recipient_node->get_unique_id() << "@" << recipient_port.to_string());
node_accessor_t{}.send_action(recipient_node, recipient_port, next_action_sptr);
}
UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling.");
diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp
index 018f54d9e..4ed0c4b60 100644
--- a/host/lib/rfnoc/radio_control_impl.cpp
+++ b/host/lib/rfnoc/radio_control_impl.cpp
@@ -74,6 +74,7 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
, _spc(_radio_width & 0xFFFF)
, _last_stream_cmd(
get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
+ , _restart_cont(get_num_output_ports(), false)
{
uhd::assert_fpga_compat(MAJOR_COMPAT,
MINOR_COMPAT,
@@ -99,7 +100,8 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
"Received stream command: " << stream_cmd_action->stream_cmd.stream_mode
<< " to " << src.to_string());
if (src.type != res_source_info::OUTPUT_EDGE) {
- RFNOC_LOG_WARNING("Received stream command, but not to output port! Ignoring.");
+ RFNOC_LOG_WARNING(
+ "Received stream command, but not to output port! Ignoring.");
return;
}
const size_t port = src.instance;
@@ -108,6 +110,36 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
return;
}
issue_stream_cmd(stream_cmd_action->stream_cmd, port);
+ if (stream_cmd_action->stream_cmd.stream_mode
+ == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
+ && _restart_cont.at(port)) {
+ RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now "
+ "request restart.");
+ _restart_cont[port] = false;
+ auto restart_request_action =
+ action_info::make(ACTION_KEY_RX_RESTART_REQ);
+ post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action);
+ }
+ });
+ register_action_handler(ACTION_KEY_RX_RESTART_REQ,
+ [this](const res_source_info& src, action_info::sptr /*action*/) {
+ RFNOC_LOG_TRACE("Received restart request command to " << src.to_string());
+ if (src.type != res_source_info::OUTPUT_EDGE) {
+ RFNOC_LOG_WARNING(
+ "Received stream command, but not to output port! Ignoring.");
+ return;
+ }
+ auto stream_cmd_action = stream_cmd_action_info::make(
+ uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
+ // FIXME
+ // stream_cmd_action->stream_cmd.stream_now = false;
+ // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA;
+ const size_t port = src.instance;
+ if (port > get_num_output_ports()) {
+ RFNOC_LOG_WARNING("Received stream command to invalid output port!");
+ return;
+ }
+ post_action({res_source_info::OUTPUT_EDGE, port}, stream_cmd_action);
});
// Register spp properties and resolvers
_spp_prop.reserve(get_num_output_ports());
@@ -827,6 +859,11 @@ void radio_control_impl::async_message_handler(
}
switch (addr_base + addr_offset) {
case regmap::SWREG_TX_ERR: {
+ if (chan > get_num_input_ports()) {
+ RFNOC_LOG_WARNING(
+ "Cannot process TX-related async message to invalid chan " << chan);
+ return;
+ }
switch (code) {
case err_codes::ERR_TX_UNDERRUN:
UHD_LOG_FASTPATH("U");
@@ -838,11 +875,20 @@ void radio_control_impl::async_message_handler(
break;
}
case regmap::SWREG_RX_ERR: {
+ if (chan > get_num_input_ports()) {
+ RFNOC_LOG_WARNING(
+ "Cannot process RX-related async message to invalid chan " << chan);
+ return;
+ }
switch (code) {
case err_codes::ERR_RX_OVERRUN: {
UHD_LOG_FASTPATH("O");
auto rx_event_action = rx_event_action_info::make();
rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW;
+ const bool cont_mode = _last_stream_cmd.at(chan).stream_mode
+ == stream_cmd_t::STREAM_MODE_START_CONTINUOUS;
+ _restart_cont[chan] = cont_mode;
+ rx_event_action->args["cont_mode"] = std::to_string(cont_mode);
RFNOC_LOG_TRACE("Posting overrun event action message.");
post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan},
rx_event_action);
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
index 4340faff0..cfa88b292 100644
--- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -8,7 +8,10 @@
#include <uhdlib/rfnoc/node_accessor.hpp>
#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
#include <atomic>
+#include <thread>
+using namespace std::chrono_literals;
+;
using namespace uhd;
using namespace uhd::rfnoc;
@@ -24,6 +27,31 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
+ //
+ register_action_handler(ACTION_KEY_RX_EVENT,
+ [this](const res_source_info& src, action_info::sptr action) {
+ rx_event_action_info::sptr rx_event_action =
+ std::dynamic_pointer_cast<rx_event_action_info>(action);
+ if (!rx_event_action) {
+ RFNOC_LOG_WARNING("Received invalid RX event action!");
+ return;
+ }
+ _handle_rx_event_action(src, rx_event_action);
+ });
+ register_action_handler(ACTION_KEY_RX_RESTART_REQ,
+ [this](const res_source_info& src, action_info::sptr action) {
+ _handle_restart_request(src, action);
+ });
+ register_action_handler(ACTION_KEY_STREAM_CMD,
+ [this](const res_source_info& src, action_info::sptr action) {
+ stream_cmd_action_info::sptr stream_cmd_action =
+ std::dynamic_pointer_cast<stream_cmd_action_info>(action);
+ if (!stream_cmd_action) {
+ RFNOC_LOG_WARNING("Received invalid stream command action!");
+ return;
+ }
+ _handle_stream_cmd_action(src, stream_cmd_action);
+ });
// Initialize properties
_scaling_in.reserve(num_chans);
@@ -139,3 +167,65 @@ void rfnoc_rx_streamer::_register_props(const size_t chan,
}
});
}
+
+void rfnoc_rx_streamer::_handle_rx_event_action(
+ const res_source_info& src, rx_event_action_info::sptr rx_event_action)
+{
+ UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE);
+ if (rx_event_action->error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) {
+ RFNOC_LOG_DEBUG("Received overrun message on port " << src.instance);
+ if (_overrun_handling_mode.exchange(true)) {
+ RFNOC_LOG_TRACE("Ignoring duplicate overrun message.");
+ return;
+ }
+ RFNOC_LOG_TRACE(
+ "Switching to overrun-handling mode: Stopping all upstream producers...");
+ auto stop_action =
+ stream_cmd_action_info::make(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ // Reminder: Delivery of all of these actions is deferred until this
+ // action handler is complete.
+ for (size_t i = 0; i < get_num_input_ports(); ++i) {
+ post_action({res_source_info::INPUT_EDGE, i}, stop_action);
+ }
+ if (!rx_event_action->args.cast<bool>("cont_mode", false)) {
+ // FIXME wait until it's safe to restart radios
+ // If we don't need to restart, that's all we need to do
+ _overrun_handling_mode = false;
+ }
+ }
+}
+
+void rfnoc_rx_streamer::_handle_restart_request(
+ const res_source_info& src, action_info::sptr)
+{
+ // FIXME: Now we need to wait until it's safe to restart the radios.
+ // A flush would achieve this, albeit at the cost of possibly losing
+ // samples.
+ // The earliest we can restart is when the FIFOs in upstream producers
+ // are empty.
+ RFNOC_LOG_TRACE("Waiting for FIFOs to clear");
+
+ std::this_thread::sleep_for(100ms);
+
+ // Once it's safe to restart the radios, we ask a radio to send us a
+ // stream command with its current time.
+ RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
+ post_action({res_source_info::INPUT_EDGE, src.instance},
+ action_info::make(ACTION_KEY_RX_RESTART_REQ));
+}
+
+void rfnoc_rx_streamer::_handle_stream_cmd_action(
+ const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action)
+{
+ RFNOC_LOG_TRACE("Received stream command on " << src.to_string());
+ UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE);
+ auto start_action =
+ stream_cmd_action_info::make(stream_cmd_action->stream_cmd.stream_mode);
+ start_action->stream_cmd = stream_cmd_action->stream_cmd;
+ for (size_t i = 0; i < get_num_input_ports(); ++i) {
+ post_action({res_source_info::INPUT_EDGE, i}, start_action);
+ }
+ if (_overrun_handling_mode.exchange(false)) {
+ RFNOC_LOG_TRACE("Leaving overrun handling mode.");
+ }
+}