aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc')
-rw-r--r--host/lib/rfnoc/CMakeLists.txt1
-rw-r--r--host/lib/rfnoc/actions.cpp21
-rw-r--r--host/lib/rfnoc/graph.cpp78
-rw-r--r--host/lib/rfnoc/node.cpp87
4 files changed, 187 insertions, 0 deletions
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index e4715a644..6aab0d499 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -11,6 +11,7 @@
LIBUHD_APPEND_SOURCES(
# Infrastructure:
+ ${CMAKE_CURRENT_SOURCE_DIR}/actions.cpp
${CMAKE_CURRENT_SOURCE_DIR}/async_msg_handler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_container.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_ctrl_base.cpp
diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp
new file mode 100644
index 000000000..1f5f0f2f7
--- /dev/null
+++ b/host/lib/rfnoc/actions.cpp
@@ -0,0 +1,21 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/actions.hpp>
+#include <atomic>
+
+using namespace uhd::rfnoc;
+
+namespace {
+ // A static counter, which we use to uniquely label actions
+ std::atomic<size_t> action_counter{0};
+}
+
+action_info::action_info(const std::string& key_) : id(action_counter++), key(key_)
+{
+ // nop
+}
+
diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp
index f90f70b43..d311a00bd 100644
--- a/host/lib/rfnoc/graph.cpp
+++ b/host/lib/rfnoc/graph.cpp
@@ -18,6 +18,7 @@ using namespace uhd::rfnoc::detail;
namespace {
const std::string LOG_ID = "RFNOC::GRAPH::DETAIL";
+constexpr unsigned MAX_ACTION_ITERATIONS = 200;
/*! Helper function to pretty-print edge info
*/
@@ -120,6 +121,15 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg
src_node, [this]() { this->resolve_all_properties(); });
node_accessor.set_resolve_all_callback(
dst_node, [this]() { this->resolve_all_properties(); });
+ // Set post action callbacks:
+ node_accessor.set_post_action_callback(
+ src_node, [this, src_node](const res_source_info& src, action_info::sptr action) {
+ this->enqueue_action(src_node, src, action);
+ });
+ node_accessor.set_post_action_callback(
+ dst_node, [this, dst_node](const res_source_info& src, action_info::sptr action) {
+ this->enqueue_action(dst_node, src, action);
+ });
// Add nodes to graph, if not already in there:
_add_node(src_node);
@@ -310,6 +320,74 @@ void graph_t::resolve_all_properties()
}
}
+void graph_t::enqueue_action(
+ node_ref_t src_node, res_source_info src_edge, action_info::sptr action)
+{
+ // First, make sure that once we start action handling, no other node from
+ // a different thread can throw in their own actions
+ std::lock_guard<std::recursive_mutex> l(_action_mutex);
+
+ // Check if we're already in the middle of handling actions. In that case,
+ // we're already in the loop below, and then all we want to do is to enqueue
+ // this action tuple. The first call to enqueue_action() within this thread
+ // context will have handling_ongoing == false.
+ const bool handling_ongoing = _action_handling_ongoing.test_and_set();
+
+ _action_queue.emplace_back(std::make_tuple(src_node, src_edge, action));
+ if (handling_ongoing) {
+ UHD_LOG_TRACE(LOG_ID,
+ "Action handling ongoing, deferring delivery of " << action->key << "#"
+ << action->id);
+ return;
+ }
+
+ unsigned iteration_count = 0;
+ while (!_action_queue.empty()) {
+ if (iteration_count++ == MAX_ACTION_ITERATIONS) {
+ throw uhd::runtime_error("Terminating action handling: Reached "
+ "recursion limit!");
+ }
+
+ // Unpack next action
+ auto& next_action = _action_queue.front();
+ node_ref_t action_src_node = std::get<0>(next_action);
+ res_source_info action_src_port = std::get<1>(next_action);
+ action_info::sptr next_action_sptr = std::get<2>(next_action);
+ _action_queue.pop_front();
+
+ // Find the node that is supposed to receive this action, and if we find
+ // something, then send the action
+ auto recipient_info =
+ _find_neighbour(_node_map.at(action_src_node), action_src_port);
+ if (recipient_info.first == nullptr) {
+ UHD_LOG_WARNING(LOG_ID,
+ "Cannot forward action "
+ << action->key << " from " << src_node->get_unique_id()
+ << ":" << src_edge.to_string() << ", no neighbour found!");
+ } else {
+ node_ref_t recipient_node = recipient_info.first;
+ res_source_info recipient_port = {
+ res_source_info::invert_edge(action_src_port.type),
+ action_src_port.type == res_source_info::INPUT_EDGE
+ ? recipient_info.second.dst_port
+ : recipient_info.second.src_port};
+ // The following call can cause other nodes to add more actions to
+ // the end of _action_queue!
+ UHD_LOG_TRACE(LOG_ID,
+ "Now delivering action " << next_action_sptr->key << "#"
+ << next_action_sptr->id);
+ node_accessor_t{}.send_action(
+ recipient_node, recipient_port, next_action_sptr);
+ }
+ }
+ UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling.");
+
+ // Release the action handling flag
+ _action_handling_ongoing.clear();
+ // Now, the _action_mutex is released, and someone else can start sending
+ // actions.
+}
+
/******************************************************************************
* Private methods
*****************************************************************************/
diff --git a/host/lib/rfnoc/node.cpp b/host/lib/rfnoc/node.cpp
index 0b724b889..d569cea4a 100644
--- a/host/lib/rfnoc/node.cpp
+++ b/host/lib/rfnoc/node.cpp
@@ -113,6 +113,27 @@ void node_t::set_prop_forwarding_policy(
_prop_fwd_policies[prop_id] = policy;
}
+void node_t::register_action_handler(const std::string& id, action_handler_t&& handler)
+{
+ if (_action_handlers.count(id)) {
+ _action_handlers.erase(id);
+ }
+ _action_handlers.emplace(id, std::move(handler));
+}
+
+void node_t::set_action_forwarding_policy(
+ node_t::forwarding_policy_t policy, const std::string& action_key)
+{
+ _action_fwd_policies[action_key] = policy;
+}
+
+void node_t::post_action(
+ const res_source_info& edge_info,
+ action_info::sptr action)
+{
+ _post_action_cb(edge_info, action);
+}
+
/*** Private methods *********************************************************/
property_base_t* node_t::_find_property(
res_source_info src_info, const std::string& id) const
@@ -403,6 +424,72 @@ void node_t::forward_edge_property(
prop_accessor.forward<false>(incoming_prop, local_prop);
}
+void node_t::receive_action(const res_source_info& src_info, action_info::sptr action)
+{
+ std::lock_guard<std::mutex> l(_action_mutex);
+ // See if the user defined an action handler for us:
+ if (_action_handlers.count(action->key)) {
+ _action_handlers.at(action->key)(src_info, action);
+ return;
+ }
+
+ // Otherwise, we need to figure out the correct default action handling:
+ const auto fwd_policy = [&](const std::string& id) {
+ if (_action_fwd_policies.count(id)) {
+ return _action_fwd_policies.at(id);
+ }
+ return _action_fwd_policies.at("");
+ }(action->key);
+
+ // Now implement custom forwarding for all forwarding policies:
+ if (fwd_policy == forwarding_policy_t::DROP) {
+ UHD_LOG_TRACE(
+ get_unique_id(), "Dropping action " << action->key);
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) {
+ UHD_LOG_TRACE(
+ get_unique_id(), "Forwarding action " << action->key << " to opposite port");
+ const res_source_info dst_info{
+ res_source_info::invert_edge(src_info.type), src_info.instance};
+ if (_has_port(dst_info)) {
+ post_action(dst_info, action);
+ }
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) {
+ UHD_LOG_TRACE(get_unique_id(),
+ "Forwarding action " << action->key << " to all opposite ports");
+ const auto new_edge_type = res_source_info::invert_edge(src_info.type);
+ const size_t num_ports = new_edge_type == res_source_info::INPUT_EDGE
+ ? get_num_input_ports()
+ : get_num_output_ports();
+ for (size_t i = 0; i < num_ports; i++) {
+ post_action({new_edge_type, i}, action);
+ }
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
+ || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) {
+ UHD_LOG_TRACE(get_unique_id(),
+ "Forwarding action " << action->key << " to all input ports");
+ for (size_t i = 0; i < get_num_input_ports(); i++) {
+ if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) {
+ continue;
+ }
+ post_action({res_source_info::INPUT_EDGE, i}, action);
+ }
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
+ || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) {
+ UHD_LOG_TRACE(get_unique_id(),
+ "Forwarding action " << action->key << " to all output ports");
+ for (size_t i = 0; i < get_num_output_ports(); i++) {
+ if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) {
+ continue;
+ }
+ post_action({res_source_info::OUTPUT_EDGE, i}, action);
+ }
+ }
+}
+
bool node_t::_has_port(const res_source_info& port_info) const
{
return (port_info.type == res_source_info::INPUT_EDGE