diff options
Diffstat (limited to 'host/lib/rfnoc/node.cpp')
-rw-r--r-- | host/lib/rfnoc/node.cpp | 312 |
1 files changed, 303 insertions, 9 deletions
diff --git a/host/lib/rfnoc/node.cpp b/host/lib/rfnoc/node.cpp index 68ba5e283..0b724b889 100644 --- a/host/lib/rfnoc/node.cpp +++ b/host/lib/rfnoc/node.cpp @@ -4,12 +4,23 @@ // SPDX-License-Identifier: GPL-3.0-or-later // +#include <uhd/exception.hpp> #include <uhd/rfnoc/node.hpp> +#include <uhd/utils/log.hpp> #include <uhdlib/rfnoc/prop_accessor.hpp> #include <boost/format.hpp> +#include <algorithm> +#include <iostream> using namespace uhd::rfnoc; +dirtifier_t node_t::ALWAYS_DIRTY{}; + + +node_t::node_t() +{ + register_property(&ALWAYS_DIRTY); +} std::string node_t::get_unique_id() const { @@ -35,7 +46,7 @@ std::vector<std::string> node_t::get_property_ids() const } /*** Protected methods *******************************************************/ -void node_t::register_property(property_base_t* prop) +void node_t::register_property(property_base_t* prop, resolve_callback_t&& clean_callback) { std::lock_guard<std::mutex> _l(_prop_mutex); @@ -60,11 +71,13 @@ void node_t::register_property(property_base_t* prop) } _props[src_type].push_back(prop); + if (clean_callback) { + _clean_cb_registry[prop] = std::move(clean_callback); + } } -void node_t::add_property_resolver(std::set<property_base_t*>&& inputs, - std::set<property_base_t*>&& outputs, - resolver_fn_t&& resolver_fn) +void node_t::add_property_resolver( + prop_ptrs_t&& inputs, prop_ptrs_t&& outputs, resolver_fn_t&& resolver_fn) { std::lock_guard<std::mutex> _l(_prop_mutex); @@ -88,14 +101,21 @@ void node_t::add_property_resolver(std::set<property_base_t*>&& inputs, } // All good, we can store it - _prop_resolvers.push_back(std::make_tuple( - std::forward<std::set<property_base_t*>>(inputs), - std::forward<std::set<property_base_t*>>(outputs), - std::forward<resolver_fn_t>(resolver_fn))); + _prop_resolvers.push_back(std::make_tuple(std::forward<prop_ptrs_t>(inputs), + std::forward<prop_ptrs_t>(outputs), + std::forward<resolver_fn_t>(resolver_fn))); +} + + +void node_t::set_prop_forwarding_policy( + forwarding_policy_t policy, const std::string& prop_id) +{ + _prop_fwd_policies[prop_id] = policy; } /*** Private methods *********************************************************/ -property_base_t* node_t::_find_property(res_source_info src_info, const std::string& id) const +property_base_t* node_t::_find_property( + res_source_info src_info, const std::string& id) const { for (const auto& type_prop_pair : _props) { if (type_prop_pair.first != src_info.type) { @@ -117,3 +137,277 @@ uhd::utils::scope_exit::uptr node_t::_request_property_access( return prop_accessor_t{}.get_scoped_prop_access(*prop, access); } + +property_base_t* node_t::inject_edge_property( + property_base_t* blueprint, res_source_info new_src_info) +{ + // Check if a property already exists which matches the new property + // requirements. If so, we can return early: + auto new_prop = _find_property(new_src_info, blueprint->get_id()); + if (new_prop) { + return new_prop; + } + + // We need to create a new property and stash it away: + new_prop = [&]() -> property_base_t* { + auto prop = blueprint->clone(new_src_info); + auto ptr = prop.get(); + _dynamic_props.emplace(std::move(prop)); + return ptr; + }(); + register_property(new_prop); + + // Collect some info on how to do the forwarding: + const auto fwd_policy = [&](const std::string& id) { + if (_prop_fwd_policies.count(id)) { + return _prop_fwd_policies.at(id); + } + return _prop_fwd_policies.at(""); + }(new_prop->get_id()); + const size_t port_idx = new_prop->get_src_info().instance; + const auto port_type = new_prop->get_src_info().type; + UHD_ASSERT_THROW(port_type == res_source_info::INPUT_EDGE + || port_type == res_source_info::OUTPUT_EDGE); + + // Now comes the hard part: Figure out which other properties need to be + // created, and which resolvers need to be instantiated + if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) { + // Figure out if there's an opposite port + const auto opposite_port_type = res_source_info::invert_edge(port_type); + if (_has_port({opposite_port_type, port_idx})) { + // Make sure that the other side's property exists: + // This is a safe recursion, because we've already created and + // registered this property. + auto opposite_prop = + inject_edge_property(new_prop, {opposite_port_type, port_idx}); + // Now add a resolver that will always forward the value from this + // property to the other one. + add_property_resolver( + {new_prop}, {opposite_prop}, [new_prop, opposite_prop]() { + prop_accessor_t{}.forward<false>(new_prop, opposite_prop); + }); + } + } + if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) { + const auto opposite_port_type = res_source_info::invert_edge(port_type); + const size_t num_ports = opposite_port_type == res_source_info::INPUT_EDGE + ? get_num_input_ports() + : get_num_output_ports(); + for (size_t i = 0; i < num_ports; i++) { + auto opposite_prop = + inject_edge_property(new_prop, {opposite_port_type, i}); + // Now add a resolver that will always forward the value from this + // property to the other one. + add_property_resolver( + {new_prop}, {opposite_prop}, [new_prop, opposite_prop]() { + prop_accessor_t{}.forward<false>(new_prop, opposite_prop); + }); + } + } + if (fwd_policy == forwarding_policy_t::ONE_TO_ALL + || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) { + // Loop through all other ports, make sure those properties exist + for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports(); + other_port_idx++) { + if (port_type == res_source_info::INPUT_EDGE && other_port_idx == port_idx) { + continue; + } + inject_edge_property(new_prop, {res_source_info::INPUT_EDGE, other_port_idx}); + } + // Now add a dynamic resolver that will update all input properties. + // In order to keep this code simple, we bypass the write list and + // get access via the prop_accessor. + add_property_resolver({new_prop}, {/* empty */}, [this, new_prop, port_idx]() { + for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports(); + other_port_idx++) { + if (other_port_idx == port_idx) { + continue; + } + auto prop = + _find_property({res_source_info::INPUT_EDGE, other_port_idx}, + new_prop->get_id()); + if (prop) { + prop_accessor_t{}.forward<false>(new_prop, prop); + } + } + }); + } + if (fwd_policy == forwarding_policy_t::ONE_TO_ALL + || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) { + // Loop through all other ports, make sure those properties exist + for (size_t other_port_idx = 0; other_port_idx < get_num_output_ports(); + other_port_idx++) { + if (port_type == res_source_info::OUTPUT_EDGE && other_port_idx == port_idx) { + continue; + } + inject_edge_property(new_prop, {res_source_info::OUTPUT_EDGE, other_port_idx}); + } + // Now add a dynamic resolver that will update all input properties. + // In order to keep this code simple, we bypass the write list and + // get access via the prop_accessor. + add_property_resolver({new_prop}, {/* empty */}, [this, new_prop, port_idx]() { + for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports(); + other_port_idx++) { + if (other_port_idx == port_idx) { + continue; + } + auto prop = + _find_property({res_source_info::OUTPUT_EDGE, other_port_idx}, + new_prop->get_id()); + if (prop) { + prop_accessor_t{}.forward<false>(new_prop, prop); + } + } + }); + } + + return new_prop; +} + + +void node_t::init_props() +{ + std::lock_guard<std::mutex> _l(_prop_mutex); + + prop_accessor_t prop_accessor{}; + + for (auto& resolver_tuple : _prop_resolvers) { + // 1) Set all outputs to RWLOCKED + auto& outputs = std::get<1>(resolver_tuple); + for (auto& output : outputs) { + prop_accessor.set_access(output, property_base_t::RWLOCKED); + } + + // 2) Run the resolver + try { + std::get<2>(resolver_tuple)(); + } catch (const uhd::resolve_error& ex) { + UHD_LOGGER_WARNING(get_unique_id()) + << "Failed to initialize node. Most likely cause: Inconsistent default " + "values. Resolver threw this error: " + << ex.what(); + //throw uhd::runtime_error(std::string("Failed to initialize node ") + get_unique_id()); + } + + // 3) Set outputs back to RO + for (auto& output : outputs) { + prop_accessor.set_access(output, property_base_t::RO); + } + } + + // 4) Mark properties as clean + clean_props(); +} + + +void node_t::resolve_props() +{ + prop_accessor_t prop_accessor{}; + const prop_ptrs_t dirty_props = + filter_props([](property_base_t* prop) { return prop->is_dirty(); }); + prop_ptrs_t written_props{}; + UHD_LOG_TRACE(get_unique_id(), + "Locally resolving " << dirty_props.size() << " dirty properties."); + + // Helper to determine if any element from inputs is in dirty_props + auto in_dirty_props = [&dirty_props](const prop_ptrs_t inputs) { + return std::any_of( + inputs.cbegin(), inputs.cend(), [&dirty_props](property_base_t* prop) { + return dirty_props.count(prop) != 1; + }); + }; + + for (auto& resolver_tuple : _prop_resolvers) { + auto& inputs = std::get<0>(resolver_tuple); + auto& outputs = std::get<1>(resolver_tuple); + if (in_dirty_props(inputs)) { + continue; + } + + // Enable outputs + std::vector<uhd::utils::scope_exit::uptr> access_holder; + access_holder.reserve(outputs.size()); + for (auto& output : outputs) { + access_holder.emplace_back(prop_accessor.get_scoped_prop_access(*output, + written_props.count(output) ? property_base_t::access_t::RWLOCKED + : property_base_t::access_t::RW)); + } + + // Run resolver + std::get<2>(resolver_tuple)(); + + // Take note of outputs + written_props.insert(outputs.cbegin(), outputs.cend()); + + // RW or RWLOCKED gets released here as access_holder goes out of scope. + } +} + +void node_t::resolve_all() +{ + _resolve_all_cb(); +} + + +void node_t::clean_props() +{ + prop_accessor_t prop_accessor{}; + for (const auto& type_prop_pair : _props) { + for (const auto& prop : type_prop_pair.second) { + if (prop->is_dirty() && _clean_cb_registry.count(prop)) { + _clean_cb_registry.at(prop)(); + } + prop_accessor.mark_clean(*prop); + } + } +} + + +void node_t::forward_edge_property( + property_base_t* incoming_prop, const size_t incoming_port) +{ + UHD_ASSERT_THROW( + incoming_prop->get_src_info().type == res_source_info::INPUT_EDGE + || incoming_prop->get_src_info().type == res_source_info::OUTPUT_EDGE); + UHD_LOG_TRACE(get_unique_id(), + "Incoming edge property: `" << incoming_prop->get_id() << "`, source info: " + << incoming_prop->get_src_info().to_string()); + + // The source type of my local prop (it's the opposite of the source type + // of incoming_prop) + const auto prop_src_type = + res_source_info::invert_edge(incoming_prop->get_src_info().type); + // Set of local properties that match incoming_prop. It can be an empty set, + // or, if the node is misconfigured, a set with more than one entry. Or, if + // all is as expected, it's a set with a single entry. + auto local_prop_set = filter_props([prop_src_type, incoming_prop, incoming_port]( + property_base_t* prop) -> bool { + return prop->get_src_info().type == prop_src_type + && prop->get_src_info().instance == incoming_port + && prop->get_id() == incoming_prop->get_id(); + }); + + // If there is no such property, we're forwarding a new property + if (local_prop_set.empty()) { + UHD_LOG_TRACE(get_unique_id(), + "Received unknown incoming edge prop: " << incoming_prop->get_id()); + local_prop_set.emplace( + inject_edge_property(incoming_prop, {prop_src_type, incoming_port})); + } + // There must be either zero results, or one + UHD_ASSERT_THROW(local_prop_set.size() == 1); + + auto local_prop = *local_prop_set.begin(); + + prop_accessor_t prop_accessor{}; + prop_accessor.forward<false>(incoming_prop, local_prop); +} + +bool node_t::_has_port(const res_source_info& port_info) const +{ + return (port_info.type == res_source_info::INPUT_EDGE + && port_info.instance <= get_num_input_ports()) + || (port_info.type == res_source_info::OUTPUT_EDGE + && port_info.instance <= get_num_output_ports()); +} + |