diff options
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/graph.hpp | 49 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/node_accessor.hpp | 20 | ||||
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/rfnoc/actions.cpp | 21 | ||||
-rw-r--r-- | host/lib/rfnoc/graph.cpp | 78 | ||||
-rw-r--r-- | host/lib/rfnoc/node.cpp | 87 |
6 files changed, 255 insertions, 1 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/graph.hpp b/host/lib/include/uhdlib/rfnoc/graph.hpp index f9fb7ac41..ec6309bd0 100644 --- a/host/lib/include/uhdlib/rfnoc/graph.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph.hpp @@ -7,10 +7,12 @@ #ifndef INCLUDED_LIBUHD_GRAPH_HPP #define INCLUDED_LIBUHD_GRAPH_HPP +#include <uhd/rfnoc/actions.hpp> #include <uhd/rfnoc/node.hpp> #include <boost/graph/adjacency_list.hpp> #include <tuple> #include <memory> +#include <deque> namespace uhd { namespace rfnoc { @@ -192,6 +194,39 @@ private: void resolve_all_properties(); /************************************************************************** + * Action API + *************************************************************************/ + /*! Entrypoint for action delivery + * + * When a node invokes its node_t::post_action() function, eventually that + * call lands here. This function acts as a mailman, that is, it figures out + * which edge on which node is supposed to receive this action, and delivers + * it via the node_t::receive_action() method. + * Note since this is private, nodes can't directly access this functions. + * We provide a lambda to nodes for that purpose. + * + * When an action is posted, that may trigger further actions. In order not + * to go into infinite recursion, this function is also responsible for + * serializing the actions. Even so, it is possible that, due to + * misconfiguration of nodes and their behaviour, a cascade of actions is + * posted that never stops. Therefore, another responsibility of this + * function is to track the number of follow-up messages sent, and terminate + * an infinite cycle of messages. + * + * \param src_node Reference to the node where the post_action() call is + * originating from + * \param src_edge The edge on that node where the action is being posted to. + * Note that its the edge from the node's point of view, so + * if src_edge.type == OUTPUT_EDGE, then the node posted to + * its output edge. + * + * \throws uhd::runtime_error if it has to terminate a infinite cascade of + * actions + */ + void enqueue_action( + node_ref_t src_node, res_source_info src_edge, action_info::sptr action); + + /************************************************************************** * Private graph helpers *************************************************************************/ template <typename VertexContainerType> @@ -225,7 +260,7 @@ private: /*! Find the neighbouring node for \p origin based on \p port_info * * This function will check port_info to identify the port number and the - * direction (input or output) from \p port_info. It will then return a + * direction (input or output) from \p origin. It will then return a * reference to the node that is attached to the node \p origin if such a * node exists, and the edge info. * @@ -263,6 +298,18 @@ private: // descriptor without having to traverse the graph. The rfnoc_graph_t is not // efficient for lookups of vertices. node_map_t _node_map; + + using action_tuple_t = std::tuple<node_ref_t, res_source_info, action_info::sptr>; + + //! FIFO for incoming actions + std::deque<action_tuple_t> _action_queue; + + //! Flag to ensure serialized handling of actions + std::atomic_flag _action_handling_ongoing; + + //! Mutex for to avoid the user from sending one message before another + // message is sent + std::recursive_mutex _action_mutex; }; diff --git a/host/lib/include/uhdlib/rfnoc/node_accessor.hpp b/host/lib/include/uhdlib/rfnoc/node_accessor.hpp index 554cc8f4f..827c87dd2 100644 --- a/host/lib/include/uhdlib/rfnoc/node_accessor.hpp +++ b/host/lib/include/uhdlib/rfnoc/node_accessor.hpp @@ -7,7 +7,9 @@ #ifndef INCLUDED_LIBUHD_NODE_ACCESSOR_HPP #define INCLUDED_LIBUHD_NODE_ACCESSOR_HPP +#include <uhd/rfnoc/actions.hpp> #include <uhd/rfnoc/node.hpp> +#include <uhd/rfnoc/res_source_info.hpp> #include <functional> namespace uhd { namespace rfnoc { @@ -77,6 +79,24 @@ public: { dst_node->forward_edge_property(incoming_prop, dst_port); } + + /*! Set post action callback for the node + * + * See node_t::set_post_action_callback() for details. + */ + void set_post_action_callback(node_t* node, node_t::action_handler_t&& post_handler) + { + node->set_post_action_callback(std::move(post_handler)); + } + + /*! Send an action to \p node + * + * This will call node_t::receive_action() (see that for details). + */ + void send_action(node_t* node, const res_source_info& port_info, action_info::sptr action) + { + node->receive_action(port_info, action); + } }; diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index e4715a644..6aab0d499 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -11,6 +11,7 @@ LIBUHD_APPEND_SOURCES( # Infrastructure: + ${CMAKE_CURRENT_SOURCE_DIR}/actions.cpp ${CMAKE_CURRENT_SOURCE_DIR}/async_msg_handler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_container.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_ctrl_base.cpp diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp new file mode 100644 index 000000000..1f5f0f2f7 --- /dev/null +++ b/host/lib/rfnoc/actions.cpp @@ -0,0 +1,21 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/rfnoc/actions.hpp> +#include <atomic> + +using namespace uhd::rfnoc; + +namespace { + // A static counter, which we use to uniquely label actions + std::atomic<size_t> action_counter{0}; +} + +action_info::action_info(const std::string& key_) : id(action_counter++), key(key_) +{ + // nop +} + diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp index f90f70b43..d311a00bd 100644 --- a/host/lib/rfnoc/graph.cpp +++ b/host/lib/rfnoc/graph.cpp @@ -18,6 +18,7 @@ using namespace uhd::rfnoc::detail; namespace { const std::string LOG_ID = "RFNOC::GRAPH::DETAIL"; +constexpr unsigned MAX_ACTION_ITERATIONS = 200; /*! Helper function to pretty-print edge info */ @@ -120,6 +121,15 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg src_node, [this]() { this->resolve_all_properties(); }); node_accessor.set_resolve_all_callback( dst_node, [this]() { this->resolve_all_properties(); }); + // Set post action callbacks: + node_accessor.set_post_action_callback( + src_node, [this, src_node](const res_source_info& src, action_info::sptr action) { + this->enqueue_action(src_node, src, action); + }); + node_accessor.set_post_action_callback( + dst_node, [this, dst_node](const res_source_info& src, action_info::sptr action) { + this->enqueue_action(dst_node, src, action); + }); // Add nodes to graph, if not already in there: _add_node(src_node); @@ -310,6 +320,74 @@ void graph_t::resolve_all_properties() } } +void graph_t::enqueue_action( + node_ref_t src_node, res_source_info src_edge, action_info::sptr action) +{ + // First, make sure that once we start action handling, no other node from + // a different thread can throw in their own actions + std::lock_guard<std::recursive_mutex> l(_action_mutex); + + // Check if we're already in the middle of handling actions. In that case, + // we're already in the loop below, and then all we want to do is to enqueue + // this action tuple. The first call to enqueue_action() within this thread + // context will have handling_ongoing == false. + const bool handling_ongoing = _action_handling_ongoing.test_and_set(); + + _action_queue.emplace_back(std::make_tuple(src_node, src_edge, action)); + if (handling_ongoing) { + UHD_LOG_TRACE(LOG_ID, + "Action handling ongoing, deferring delivery of " << action->key << "#" + << action->id); + return; + } + + unsigned iteration_count = 0; + while (!_action_queue.empty()) { + if (iteration_count++ == MAX_ACTION_ITERATIONS) { + throw uhd::runtime_error("Terminating action handling: Reached " + "recursion limit!"); + } + + // Unpack next action + auto& next_action = _action_queue.front(); + node_ref_t action_src_node = std::get<0>(next_action); + res_source_info action_src_port = std::get<1>(next_action); + action_info::sptr next_action_sptr = std::get<2>(next_action); + _action_queue.pop_front(); + + // Find the node that is supposed to receive this action, and if we find + // something, then send the action + auto recipient_info = + _find_neighbour(_node_map.at(action_src_node), action_src_port); + if (recipient_info.first == nullptr) { + UHD_LOG_WARNING(LOG_ID, + "Cannot forward action " + << action->key << " from " << src_node->get_unique_id() + << ":" << src_edge.to_string() << ", no neighbour found!"); + } else { + node_ref_t recipient_node = recipient_info.first; + res_source_info recipient_port = { + res_source_info::invert_edge(action_src_port.type), + action_src_port.type == res_source_info::INPUT_EDGE + ? recipient_info.second.dst_port + : recipient_info.second.src_port}; + // The following call can cause other nodes to add more actions to + // the end of _action_queue! + UHD_LOG_TRACE(LOG_ID, + "Now delivering action " << next_action_sptr->key << "#" + << next_action_sptr->id); + node_accessor_t{}.send_action( + recipient_node, recipient_port, next_action_sptr); + } + } + UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling."); + + // Release the action handling flag + _action_handling_ongoing.clear(); + // Now, the _action_mutex is released, and someone else can start sending + // actions. +} + /****************************************************************************** * Private methods *****************************************************************************/ diff --git a/host/lib/rfnoc/node.cpp b/host/lib/rfnoc/node.cpp index 0b724b889..d569cea4a 100644 --- a/host/lib/rfnoc/node.cpp +++ b/host/lib/rfnoc/node.cpp @@ -113,6 +113,27 @@ void node_t::set_prop_forwarding_policy( _prop_fwd_policies[prop_id] = policy; } +void node_t::register_action_handler(const std::string& id, action_handler_t&& handler) +{ + if (_action_handlers.count(id)) { + _action_handlers.erase(id); + } + _action_handlers.emplace(id, std::move(handler)); +} + +void node_t::set_action_forwarding_policy( + node_t::forwarding_policy_t policy, const std::string& action_key) +{ + _action_fwd_policies[action_key] = policy; +} + +void node_t::post_action( + const res_source_info& edge_info, + action_info::sptr action) +{ + _post_action_cb(edge_info, action); +} + /*** Private methods *********************************************************/ property_base_t* node_t::_find_property( res_source_info src_info, const std::string& id) const @@ -403,6 +424,72 @@ void node_t::forward_edge_property( prop_accessor.forward<false>(incoming_prop, local_prop); } +void node_t::receive_action(const res_source_info& src_info, action_info::sptr action) +{ + std::lock_guard<std::mutex> l(_action_mutex); + // See if the user defined an action handler for us: + if (_action_handlers.count(action->key)) { + _action_handlers.at(action->key)(src_info, action); + return; + } + + // Otherwise, we need to figure out the correct default action handling: + const auto fwd_policy = [&](const std::string& id) { + if (_action_fwd_policies.count(id)) { + return _action_fwd_policies.at(id); + } + return _action_fwd_policies.at(""); + }(action->key); + + // Now implement custom forwarding for all forwarding policies: + if (fwd_policy == forwarding_policy_t::DROP) { + UHD_LOG_TRACE( + get_unique_id(), "Dropping action " << action->key); + } + if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) { + UHD_LOG_TRACE( + get_unique_id(), "Forwarding action " << action->key << " to opposite port"); + const res_source_info dst_info{ + res_source_info::invert_edge(src_info.type), src_info.instance}; + if (_has_port(dst_info)) { + post_action(dst_info, action); + } + } + if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) { + UHD_LOG_TRACE(get_unique_id(), + "Forwarding action " << action->key << " to all opposite ports"); + const auto new_edge_type = res_source_info::invert_edge(src_info.type); + const size_t num_ports = new_edge_type == res_source_info::INPUT_EDGE + ? get_num_input_ports() + : get_num_output_ports(); + for (size_t i = 0; i < num_ports; i++) { + post_action({new_edge_type, i}, action); + } + } + if (fwd_policy == forwarding_policy_t::ONE_TO_ALL + || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) { + UHD_LOG_TRACE(get_unique_id(), + "Forwarding action " << action->key << " to all input ports"); + for (size_t i = 0; i < get_num_input_ports(); i++) { + if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) { + continue; + } + post_action({res_source_info::INPUT_EDGE, i}, action); + } + } + if (fwd_policy == forwarding_policy_t::ONE_TO_ALL + || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) { + UHD_LOG_TRACE(get_unique_id(), + "Forwarding action " << action->key << " to all output ports"); + for (size_t i = 0; i < get_num_output_ports(); i++) { + if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) { + continue; + } + post_action({res_source_info::OUTPUT_EDGE, i}, action); + } + } +} + bool node_t::_has_port(const res_source_info& port_info) const { return (port_info.type == res_source_info::INPUT_EDGE |