From cb99b720eee76dcf83ac89a2107f516443372b7f Mon Sep 17 00:00:00 2001 From: Aaron Rossetto Date: Tue, 19 May 2020 14:16:23 -0500 Subject: rfnoc: Add USE_MAP prop/action forwarding policy This commit adds a new forwarding policy for properties and actions, USE_MAP. This forwarding policy causes the node to consult a user-provided map to determine how to forward the property or action. The map's key is the source edge of the incoming property or action, while the value is a list of destination edges to which the property should be propagated or action should be forwarded. It allows clients to construct sophisticated forwarding behaviors for specialized blocks, such as a split stream block that needs to forward properties and actions only to specific output edges based on the incoming edge. --- host/lib/rfnoc/node.cpp | 78 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 17 deletions(-) (limited to 'host/lib/rfnoc') diff --git a/host/lib/rfnoc/node.cpp b/host/lib/rfnoc/node.cpp index 7cc656a26..0abbb0d3b 100644 --- a/host/lib/rfnoc/node.cpp +++ b/host/lib/rfnoc/node.cpp @@ -154,6 +154,11 @@ void node_t::set_prop_forwarding_policy( _prop_fwd_policies[prop_id] = policy; } +void node_t::set_prop_forwarding_map(const forwarding_map_t& map) +{ + _prop_fwd_map = map; +} + void node_t::register_action_handler(const std::string& id, action_handler_t&& handler) { if (_action_handlers.count(id)) { @@ -168,6 +173,11 @@ void node_t::set_action_forwarding_policy( _action_fwd_policies[action_key] = policy; } +void node_t::set_action_forwarding_map(const forwarding_map_t& map) +{ + _action_fwd_map = map; +} + void node_t::post_action(const res_source_info& edge_info, action_info::sptr action) { _post_action_cb(edge_info, action); @@ -335,6 +345,27 @@ property_base_t* node_t::inject_edge_property( } }); } + if (fwd_policy == forwarding_policy_t::USE_MAP) { + const auto src_info = new_prop->get_src_info(); + const auto& map_entry = _prop_fwd_map.find(src_info); + if (map_entry != _prop_fwd_map.end()) { + for (const auto& dst : map_entry->second) { + if (!_has_port(dst)) { + throw uhd::rfnoc_error("Destination port " + dst.to_string() + + " in prop map does not exist"); + } + auto next_prop = inject_edge_property(new_prop, dst); + // Now add a resolver that will always forward the value from this + // property to the other one. + add_property_resolver({new_prop}, {next_prop}, [new_prop, next_prop]() { + prop_accessor_t{}.forward(new_prop, next_prop); + }); + } + } else { + RFNOC_LOG_TRACE("Dropping incoming prop on " << src_info.to_string() + << " (no destinations in map)"); + } + } return new_prop; } @@ -459,16 +490,15 @@ void node_t::forward_edge_property( UHD_ASSERT_THROW( incoming_prop->get_src_info().type == res_source_info::INPUT_EDGE || incoming_prop->get_src_info().type == res_source_info::OUTPUT_EDGE); - UHD_LOG_TRACE(get_unique_id(), - "Incoming edge property: `" << incoming_prop->get_id() << "`, source info: " - << incoming_prop->get_src_info().to_string()); + RFNOC_LOG_TRACE("Incoming edge property: `" + << incoming_prop->get_id() + << "`, source info: " << incoming_prop->get_src_info().to_string()); // Don't forward properties that are not yet valid if (!incoming_prop->is_valid()) { - UHD_LOG_TRACE(get_unique_id(), - "Skipped empty edge property: `" - << incoming_prop->get_id() - << "`, source info: " << incoming_prop->get_src_info().to_string()); + RFNOC_LOG_TRACE("Skipped empty edge property: `" + << incoming_prop->get_id() << "`, source info: " + << incoming_prop->get_src_info().to_string()); return; } @@ -488,7 +518,7 @@ void node_t::forward_edge_property( // If there is no such property, we're forwarding a new property if (local_prop_set.empty()) { - UHD_LOG_TRACE(get_unique_id(), + RFNOC_LOG_TRACE( "Received unknown incoming edge prop: " << incoming_prop->get_id()); local_prop_set.emplace( inject_edge_property(incoming_prop, {prop_src_type, incoming_port})); @@ -527,11 +557,10 @@ void node_t::receive_action(const res_source_info& src_info, action_info::sptr a // Now implement custom forwarding for all forwarding policies: if (fwd_policy == forwarding_policy_t::DROP) { - UHD_LOG_TRACE(get_unique_id(), "Dropping action " << action->key); + RFNOC_LOG_TRACE("Dropping action " << action->key); } if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) { - UHD_LOG_TRACE( - get_unique_id(), "Forwarding action " << action->key << " to opposite port"); + RFNOC_LOG_TRACE("Forwarding action " << action->key << " to opposite port"); const res_source_info dst_info{ res_source_info::invert_edge(src_info.type), src_info.instance}; if (_has_port(dst_info)) { @@ -539,8 +568,7 @@ void node_t::receive_action(const res_source_info& src_info, action_info::sptr a } } if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) { - UHD_LOG_TRACE(get_unique_id(), - "Forwarding action " << action->key << " to all opposite ports"); + RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all opposite ports"); const auto new_edge_type = res_source_info::invert_edge(src_info.type); const size_t num_ports = new_edge_type == res_source_info::INPUT_EDGE ? get_num_input_ports() @@ -551,8 +579,7 @@ void node_t::receive_action(const res_source_info& src_info, action_info::sptr a } if (fwd_policy == forwarding_policy_t::ONE_TO_ALL || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) { - UHD_LOG_TRACE(get_unique_id(), - "Forwarding action " << action->key << " to all input ports"); + RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all input ports"); for (size_t i = 0; i < get_num_input_ports(); i++) { if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) { continue; @@ -562,8 +589,7 @@ void node_t::receive_action(const res_source_info& src_info, action_info::sptr a } if (fwd_policy == forwarding_policy_t::ONE_TO_ALL || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) { - UHD_LOG_TRACE(get_unique_id(), - "Forwarding action " << action->key << " to all output ports"); + RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all output ports"); for (size_t i = 0; i < get_num_output_ports(); i++) { if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) { continue; @@ -571,6 +597,24 @@ void node_t::receive_action(const res_source_info& src_info, action_info::sptr a post_action({res_source_info::OUTPUT_EDGE, i}, action); } } + if (fwd_policy == forwarding_policy_t::USE_MAP) { + const auto& map_entry = _action_fwd_map.find(src_info); + if (map_entry != _action_fwd_map.end()) { + for (const auto& dst : map_entry->second) { + if (!_has_port(dst)) { + throw uhd::rfnoc_error("Destination port " + dst.to_string() + + " in action map does not exist"); + } + RFNOC_LOG_TRACE( + "Forwarding action " << action->key << " to " << dst.to_string()); + post_action(dst, action); + } + } else { + RFNOC_LOG_TRACE("Dropping action " << action->key << " on " + << src_info.to_string() + << " (no destinations in map)"); + } + } } void node_t::shutdown() -- cgit v1.2.3