aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2019-07-17 15:25:51 -0700
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:32 -0800
commitab87597e9e76854237b5d78f7d35959b14e9737b (patch)
treeffb471238391c8a42a9feadd54d7819c45d9a989 /host/lib
parent83abb81e61beec213f9f83843d6fab637a578f4a (diff)
downloaduhd-ab87597e9e76854237b5d78f7d35959b14e9737b.tar.gz
uhd-ab87597e9e76854237b5d78f7d35959b14e9737b.tar.bz2
uhd-ab87597e9e76854237b5d78f7d35959b14e9737b.zip
rfnoc: Implement overrun handling using action API
In order to enable overrun handling through the action API, a few new features are implemented: - The RX streamer can now accept stream command actions. The streamer will interpret stream command actions as a request to send stream commands upstream to all producers. - A new action type is defined ('restart request') which is understood by the radio and streamer, and is a handshake between producers and consumers. In this case, it will ask the radio to send a stream command itself. When an RX streamer receives an overrun, it will now run the following algorithm: 1. Stop all upstream producers (this was already in the code before this commit). 2. If no restart is required, Wait for the radios to have space in the downstream blocks. The radio, if it was in continuous streaming mode before the overrun, includes a flag in its initial action whether or not to restart the streaming. Also, it will wait for the stop stream command from the streamer. When it receives that, it will initiate a restart request handshake. 3. The streamer submits a restart request action upstream. This action will be received by the radio. The radio will then check the current time, and send a stream command action back downstream. 4. The RX streamer receives the stream command action, and uses it to send another stream command to all upstream producers. This way, all upstream producers receive a start command for the same time.
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp1
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp10
-rw-r--r--host/lib/rfnoc/graph.cpp5
-rw-r--r--host/lib/rfnoc/radio_control_impl.cpp48
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp90
5 files changed, 151 insertions, 3 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
index 0d10fd13b..6b50cc31c 100644
--- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
+++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
@@ -303,6 +303,7 @@ private:
std::unordered_map<size_t, double> _rx_bandwidth;
std::vector<uhd::stream_cmd_t> _last_stream_cmd;
+ std::vector<bool> _restart_cont;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
index 6ced60d19..1d1d0805d 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -11,6 +11,7 @@
#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <uhdlib/transport/rx_streamer_impl.hpp>
#include <string>
+#include <atomic>
namespace uhd { namespace rfnoc {
@@ -77,6 +78,13 @@ public:
private:
void _register_props(const size_t chan, const std::string& otw_format);
+ void _handle_rx_event_action(
+ const res_source_info& src, rx_event_action_info::sptr rx_event_action);
+ void _handle_restart_request(
+ const res_source_info& src, action_info::sptr rx_event_action);
+ void _handle_stream_cmd_action(
+ const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action);
+
// Properties
std::vector<property_t<double>> _scaling_in;
std::vector<property_t<double>> _samp_rate_in;
@@ -88,6 +96,8 @@ private:
// Stream args provided at construction
const uhd::stream_args_t _stream_args;
+
+ std::atomic<bool> _overrun_handling_mode{false};
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp
index ff5fde1e9..1a25f84f9 100644
--- a/host/lib/rfnoc/graph.cpp
+++ b/host/lib/rfnoc/graph.cpp
@@ -417,8 +417,9 @@ void graph_t::enqueue_action(
// The following call can cause other nodes to add more actions to
// the end of _action_queue!
UHD_LOG_TRACE(LOG_ID,
- "Now delivering action " << next_action_sptr->key << "#"
- << next_action_sptr->id);
+ "Now delivering action "
+ << next_action_sptr->key << "#" << next_action_sptr->id << " to "
+ << recipient_node->get_unique_id() << "@" << recipient_port.to_string());
node_accessor_t{}.send_action(recipient_node, recipient_port, next_action_sptr);
}
UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling.");
diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp
index 018f54d9e..4ed0c4b60 100644
--- a/host/lib/rfnoc/radio_control_impl.cpp
+++ b/host/lib/rfnoc/radio_control_impl.cpp
@@ -74,6 +74,7 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
, _spc(_radio_width & 0xFFFF)
, _last_stream_cmd(
get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
+ , _restart_cont(get_num_output_ports(), false)
{
uhd::assert_fpga_compat(MAJOR_COMPAT,
MINOR_COMPAT,
@@ -99,7 +100,8 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
"Received stream command: " << stream_cmd_action->stream_cmd.stream_mode
<< " to " << src.to_string());
if (src.type != res_source_info::OUTPUT_EDGE) {
- RFNOC_LOG_WARNING("Received stream command, but not to output port! Ignoring.");
+ RFNOC_LOG_WARNING(
+ "Received stream command, but not to output port! Ignoring.");
return;
}
const size_t port = src.instance;
@@ -108,6 +110,36 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
return;
}
issue_stream_cmd(stream_cmd_action->stream_cmd, port);
+ if (stream_cmd_action->stream_cmd.stream_mode
+ == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
+ && _restart_cont.at(port)) {
+ RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now "
+ "request restart.");
+ _restart_cont[port] = false;
+ auto restart_request_action =
+ action_info::make(ACTION_KEY_RX_RESTART_REQ);
+ post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action);
+ }
+ });
+ register_action_handler(ACTION_KEY_RX_RESTART_REQ,
+ [this](const res_source_info& src, action_info::sptr /*action*/) {
+ RFNOC_LOG_TRACE("Received restart request command to " << src.to_string());
+ if (src.type != res_source_info::OUTPUT_EDGE) {
+ RFNOC_LOG_WARNING(
+ "Received stream command, but not to output port! Ignoring.");
+ return;
+ }
+ auto stream_cmd_action = stream_cmd_action_info::make(
+ uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
+ // FIXME
+ // stream_cmd_action->stream_cmd.stream_now = false;
+ // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA;
+ const size_t port = src.instance;
+ if (port > get_num_output_ports()) {
+ RFNOC_LOG_WARNING("Received stream command to invalid output port!");
+ return;
+ }
+ post_action({res_source_info::OUTPUT_EDGE, port}, stream_cmd_action);
});
// Register spp properties and resolvers
_spp_prop.reserve(get_num_output_ports());
@@ -827,6 +859,11 @@ void radio_control_impl::async_message_handler(
}
switch (addr_base + addr_offset) {
case regmap::SWREG_TX_ERR: {
+ if (chan > get_num_input_ports()) {
+ RFNOC_LOG_WARNING(
+ "Cannot process TX-related async message to invalid chan " << chan);
+ return;
+ }
switch (code) {
case err_codes::ERR_TX_UNDERRUN:
UHD_LOG_FASTPATH("U");
@@ -838,11 +875,20 @@ void radio_control_impl::async_message_handler(
break;
}
case regmap::SWREG_RX_ERR: {
+ if (chan > get_num_input_ports()) {
+ RFNOC_LOG_WARNING(
+ "Cannot process RX-related async message to invalid chan " << chan);
+ return;
+ }
switch (code) {
case err_codes::ERR_RX_OVERRUN: {
UHD_LOG_FASTPATH("O");
auto rx_event_action = rx_event_action_info::make();
rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW;
+ const bool cont_mode = _last_stream_cmd.at(chan).stream_mode
+ == stream_cmd_t::STREAM_MODE_START_CONTINUOUS;
+ _restart_cont[chan] = cont_mode;
+ rx_event_action->args["cont_mode"] = std::to_string(cont_mode);
RFNOC_LOG_TRACE("Posting overrun event action message.");
post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan},
rx_event_action);
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
index 4340faff0..cfa88b292 100644
--- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -8,7 +8,10 @@
#include <uhdlib/rfnoc/node_accessor.hpp>
#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
#include <atomic>
+#include <thread>
+using namespace std::chrono_literals;
+;
using namespace uhd;
using namespace uhd::rfnoc;
@@ -24,6 +27,31 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
+ //
+ register_action_handler(ACTION_KEY_RX_EVENT,
+ [this](const res_source_info& src, action_info::sptr action) {
+ rx_event_action_info::sptr rx_event_action =
+ std::dynamic_pointer_cast<rx_event_action_info>(action);
+ if (!rx_event_action) {
+ RFNOC_LOG_WARNING("Received invalid RX event action!");
+ return;
+ }
+ _handle_rx_event_action(src, rx_event_action);
+ });
+ register_action_handler(ACTION_KEY_RX_RESTART_REQ,
+ [this](const res_source_info& src, action_info::sptr action) {
+ _handle_restart_request(src, action);
+ });
+ register_action_handler(ACTION_KEY_STREAM_CMD,
+ [this](const res_source_info& src, action_info::sptr action) {
+ stream_cmd_action_info::sptr stream_cmd_action =
+ std::dynamic_pointer_cast<stream_cmd_action_info>(action);
+ if (!stream_cmd_action) {
+ RFNOC_LOG_WARNING("Received invalid stream command action!");
+ return;
+ }
+ _handle_stream_cmd_action(src, stream_cmd_action);
+ });
// Initialize properties
_scaling_in.reserve(num_chans);
@@ -139,3 +167,65 @@ void rfnoc_rx_streamer::_register_props(const size_t chan,
}
});
}
+
+void rfnoc_rx_streamer::_handle_rx_event_action(
+ const res_source_info& src, rx_event_action_info::sptr rx_event_action)
+{
+ UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE);
+ if (rx_event_action->error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) {
+ RFNOC_LOG_DEBUG("Received overrun message on port " << src.instance);
+ if (_overrun_handling_mode.exchange(true)) {
+ RFNOC_LOG_TRACE("Ignoring duplicate overrun message.");
+ return;
+ }
+ RFNOC_LOG_TRACE(
+ "Switching to overrun-handling mode: Stopping all upstream producers...");
+ auto stop_action =
+ stream_cmd_action_info::make(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ // Reminder: Delivery of all of these actions is deferred until this
+ // action handler is complete.
+ for (size_t i = 0; i < get_num_input_ports(); ++i) {
+ post_action({res_source_info::INPUT_EDGE, i}, stop_action);
+ }
+ if (!rx_event_action->args.cast<bool>("cont_mode", false)) {
+ // FIXME wait until it's safe to restart radios
+ // If we don't need to restart, that's all we need to do
+ _overrun_handling_mode = false;
+ }
+ }
+}
+
+void rfnoc_rx_streamer::_handle_restart_request(
+ const res_source_info& src, action_info::sptr)
+{
+ // FIXME: Now we need to wait until it's safe to restart the radios.
+ // A flush would achieve this, albeit at the cost of possibly losing
+ // samples.
+ // The earliest we can restart is when the FIFOs in upstream producers
+ // are empty.
+ RFNOC_LOG_TRACE("Waiting for FIFOs to clear");
+
+ std::this_thread::sleep_for(100ms);
+
+ // Once it's safe to restart the radios, we ask a radio to send us a
+ // stream command with its current time.
+ RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
+ post_action({res_source_info::INPUT_EDGE, src.instance},
+ action_info::make(ACTION_KEY_RX_RESTART_REQ));
+}
+
+void rfnoc_rx_streamer::_handle_stream_cmd_action(
+ const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action)
+{
+ RFNOC_LOG_TRACE("Received stream command on " << src.to_string());
+ UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE);
+ auto start_action =
+ stream_cmd_action_info::make(stream_cmd_action->stream_cmd.stream_mode);
+ start_action->stream_cmd = stream_cmd_action->stream_cmd;
+ for (size_t i = 0; i < get_num_input_ports(); ++i) {
+ post_action({res_source_info::INPUT_EDGE, i}, start_action);
+ }
+ if (_overrun_handling_mode.exchange(false)) {
+ RFNOC_LOG_TRACE("Leaving overrun handling mode.");
+ }
+}