diff options
Diffstat (limited to 'host/lib/rfnoc/graph.cpp')
-rw-r--r-- | host/lib/rfnoc/graph.cpp | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp index f90f70b43..d311a00bd 100644 --- a/host/lib/rfnoc/graph.cpp +++ b/host/lib/rfnoc/graph.cpp @@ -18,6 +18,7 @@ using namespace uhd::rfnoc::detail; namespace { const std::string LOG_ID = "RFNOC::GRAPH::DETAIL"; +constexpr unsigned MAX_ACTION_ITERATIONS = 200; /*! Helper function to pretty-print edge info */ @@ -120,6 +121,15 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg src_node, [this]() { this->resolve_all_properties(); }); node_accessor.set_resolve_all_callback( dst_node, [this]() { this->resolve_all_properties(); }); + // Set post action callbacks: + node_accessor.set_post_action_callback( + src_node, [this, src_node](const res_source_info& src, action_info::sptr action) { + this->enqueue_action(src_node, src, action); + }); + node_accessor.set_post_action_callback( + dst_node, [this, dst_node](const res_source_info& src, action_info::sptr action) { + this->enqueue_action(dst_node, src, action); + }); // Add nodes to graph, if not already in there: _add_node(src_node); @@ -310,6 +320,74 @@ void graph_t::resolve_all_properties() } } +void graph_t::enqueue_action( + node_ref_t src_node, res_source_info src_edge, action_info::sptr action) +{ + // First, make sure that once we start action handling, no other node from + // a different thread can throw in their own actions + std::lock_guard<std::recursive_mutex> l(_action_mutex); + + // Check if we're already in the middle of handling actions. In that case, + // we're already in the loop below, and then all we want to do is to enqueue + // this action tuple. The first call to enqueue_action() within this thread + // context will have handling_ongoing == false. + const bool handling_ongoing = _action_handling_ongoing.test_and_set(); + + _action_queue.emplace_back(std::make_tuple(src_node, src_edge, action)); + if (handling_ongoing) { + UHD_LOG_TRACE(LOG_ID, + "Action handling ongoing, deferring delivery of " << action->key << "#" + << action->id); + return; + } + + unsigned iteration_count = 0; + while (!_action_queue.empty()) { + if (iteration_count++ == MAX_ACTION_ITERATIONS) { + throw uhd::runtime_error("Terminating action handling: Reached " + "recursion limit!"); + } + + // Unpack next action + auto& next_action = _action_queue.front(); + node_ref_t action_src_node = std::get<0>(next_action); + res_source_info action_src_port = std::get<1>(next_action); + action_info::sptr next_action_sptr = std::get<2>(next_action); + _action_queue.pop_front(); + + // Find the node that is supposed to receive this action, and if we find + // something, then send the action + auto recipient_info = + _find_neighbour(_node_map.at(action_src_node), action_src_port); + if (recipient_info.first == nullptr) { + UHD_LOG_WARNING(LOG_ID, + "Cannot forward action " + << action->key << " from " << src_node->get_unique_id() + << ":" << src_edge.to_string() << ", no neighbour found!"); + } else { + node_ref_t recipient_node = recipient_info.first; + res_source_info recipient_port = { + res_source_info::invert_edge(action_src_port.type), + action_src_port.type == res_source_info::INPUT_EDGE + ? recipient_info.second.dst_port + : recipient_info.second.src_port}; + // The following call can cause other nodes to add more actions to + // the end of _action_queue! + UHD_LOG_TRACE(LOG_ID, + "Now delivering action " << next_action_sptr->key << "#" + << next_action_sptr->id); + node_accessor_t{}.send_action( + recipient_node, recipient_port, next_action_sptr); + } + } + UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling."); + + // Release the action handling flag + _action_handling_ongoing.clear(); + // Now, the _action_mutex is released, and someone else can start sending + // actions. +} + /****************************************************************************** * Private methods *****************************************************************************/ |