//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/exception.hpp>
#include <uhd/rfnoc/node.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/prop_accessor.hpp>
#include <boost/format.hpp>
#include <algorithm>
#include <iostream>

using namespace uhd::rfnoc;

dirtifier_t node_t::ALWAYS_DIRTY{};


node_t::node_t()
{
    _resolve_all_cb = _default_resolve_all_cb;
    register_property(&ALWAYS_DIRTY);
}

std::string node_t::get_unique_id() const
{
    // TODO return something better
    return str(boost::format("%08X") % this);
}

std::vector<std::string> node_t::get_property_ids() const
{
    std::lock_guard<std::mutex> _l(_prop_mutex);
    if (_props.count(res_source_info::USER) == 0) {
        return {};
    }

    auto& user_props = _props.at(res_source_info::USER);
    // TODO use a range here, we're not savages
    std::vector<std::string> return_value(user_props.size());
    for (size_t i = 0; i < user_props.size(); ++i) {
        return_value[i] = user_props[i]->get_id();
    }

    return return_value;
}

void node_t::set_properties(const uhd::device_addr_t& props, const size_t instance)
{
    for (const auto& key : props.keys()) {
        std::string local_key  = key;
        size_t local_instance  = instance;
        const size_t colon_pos = key.find(':');
        if (colon_pos != std::string::npos) {
            // Extract the property ID and instance
            local_key                 = key.substr(0, colon_pos);
            std::string instance_part = key.substr(colon_pos + 1);
            try {
                local_instance = std::stoi(instance_part);
            } catch (...) {
                // If no number, or an invalid number is specified after the
                // colon, throw a value_error.
                throw uhd::value_error("Property id `" + local_key
                                       + "' contains a malformed instance override!");
            }
        }

        property_base_t* prop_ref =
            _find_property({res_source_info::USER, local_instance}, local_key);
        if (!prop_ref) {
            RFNOC_LOG_WARNING("set_properties() cannot set property `"
                              << local_key << "': No such property.");
            continue;
        }
        auto prop_access = _request_property_access(prop_ref, property_base_t::RW);
        prop_ref->set_from_str(props.get(key));
    }

    // Now trigger a property resolution. If other properties depend on modified
    // properties, they will be updated.
    resolve_all();
}

void node_t::set_command_time(uhd::time_spec_t time, const size_t instance)
{
    if (_cmd_timespecs.size() <= instance) {
        _cmd_timespecs.resize(instance + 1, uhd::time_spec_t(0.0));
    }

    _cmd_timespecs[instance] = time;
}

uhd::time_spec_t node_t::get_command_time(const size_t instance) const
{
    if (instance >= _cmd_timespecs.size()) {
        return uhd::time_spec_t::ASAP;
    }

    return _cmd_timespecs.at(instance);
}

void node_t::clear_command_time(const size_t instance)
{
    set_command_time(uhd::time_spec_t(0.0), instance);
}

/*** Protected methods *******************************************************/
void node_t::register_property(property_base_t* prop, resolve_callback_t&& clean_callback)
{
    std::lock_guard<std::mutex> _l(_prop_mutex);
    const auto src_type = prop->get_src_info().type;

    // If the map is empty for this source type, create an empty vector
    if (_props.count(src_type) == 0) {
        _props[src_type] = {};
    }

    auto prop_already_registered = [prop](const property_base_t* existing_prop) {
        return (prop == existing_prop)
               || (prop->get_src_info() == existing_prop->get_src_info()
                      && prop->get_id() == existing_prop->get_id());
    };
    if (!filter_props(prop_already_registered).empty()) {
        throw uhd::runtime_error(std::string("Attempting to double-register property: ")
                                 + prop->get_id() + "[" + prop->get_src_info().to_string()
                                 + "]");
    }

    _props[src_type].push_back(prop);
    if (clean_callback) {
        _clean_cb_registry[prop] = std::move(clean_callback);
    }

    prop_accessor_t{}.set_access(prop, property_base_t::RW);
}

void node_t::add_property_resolver(
    prop_ptrs_t&& inputs, prop_ptrs_t&& outputs, resolver_fn_t&& resolver_fn)
{
    std::lock_guard<std::mutex> _l(_prop_mutex);

    // Sanity check: All inputs and outputs must be registered properties
    auto prop_is_registered = [this](property_base_t* prop) -> bool {
        return bool(this->_find_property(prop->get_src_info(), prop->get_id()));
    };
    for (const auto& prop : inputs) {
        if (!prop_is_registered(prop)) {
            throw uhd::runtime_error(
                std::string("Cannot add property resolver, input property ")
                + prop->get_id() + " is not registered!");
        }
    }
    for (const auto& prop : outputs) {
        if (!prop_is_registered(prop)) {
            throw uhd::runtime_error(
                std::string("Cannot add property resolver, output property ")
                + prop->get_id() + " is not registered!");
        }
    }

    // All good, we can store it
    _prop_resolvers.push_back(std::make_tuple(std::forward<prop_ptrs_t>(inputs),
        std::forward<prop_ptrs_t>(outputs),
        std::forward<resolver_fn_t>(resolver_fn)));
}


void node_t::set_prop_forwarding_policy(
    forwarding_policy_t policy, const std::string& prop_id)
{
    _prop_fwd_policies[prop_id] = policy;
}

void node_t::set_prop_forwarding_map(const forwarding_map_t& map)
{
    _prop_fwd_map = map;
}

void node_t::register_action_handler(const std::string& id, action_handler_t&& handler)
{
    if (_action_handlers.count(id)) {
        _action_handlers.erase(id);
    }
    _action_handlers.emplace(id, std::move(handler));
}

void node_t::set_action_forwarding_policy(
    node_t::forwarding_policy_t policy, const std::string& action_key)
{
    _action_fwd_policies[action_key] = policy;
}

void node_t::set_action_forwarding_map(const forwarding_map_t& map)
{
    _action_fwd_map = map;
}

void node_t::post_action(const res_source_info& edge_info, action_info::sptr action)
{
    _post_action_cb(edge_info, action);
}

bool node_t::check_topology(const std::vector<size_t>& connected_inputs,
    const std::vector<size_t>& connected_outputs)
{
    for (size_t port : connected_inputs) {
        if (port >= get_num_input_ports()) {
            return false;
        }
    }
    for (size_t port : connected_outputs) {
        if (port >= get_num_output_ports()) {
            return false;
        }
    }

    return true;
}

/*** Private methods *********************************************************/
property_base_t* node_t::_find_property(
    res_source_info src_info, const std::string& id) const
{
    for (const auto& type_prop_pair : _props) {
        if (type_prop_pair.first != src_info.type) {
            continue;
        }
        for (const auto& prop : type_prop_pair.second) {
            if (prop->get_id() == id && prop->get_src_info() == src_info) {
                return prop;
            }
        }
    }

    return nullptr;
}

uhd::utils::scope_exit::uptr node_t::_request_property_access(
    property_base_t* prop, property_base_t::access_t access) const
{
    return prop_accessor_t{}.get_scoped_prop_access(*prop, access);
}


property_base_t* node_t::inject_edge_property(
    property_base_t* blueprint, res_source_info new_src_info)
{
    // Check if a property already exists which matches the new property
    // requirements. If so, we can return early:
    auto new_prop = _find_property(new_src_info, blueprint->get_id());
    if (new_prop) {
        return new_prop;
    }

    // We need to create a new property and stash it away:
    new_prop = [&]() -> property_base_t* {
        auto prop = blueprint->clone(new_src_info);
        auto ptr  = prop.get();
        _dynamic_props.emplace(std::move(prop));
        return ptr;
    }();
    register_property(new_prop);

    // Collect some info on how to do the forwarding:
    const auto fwd_policy = [&](const std::string& id) {
        if (_prop_fwd_policies.count(id)) {
            return _prop_fwd_policies.at(id);
        }
        return _prop_fwd_policies.at("");
    }(new_prop->get_id());
    const size_t port_idx = new_prop->get_src_info().instance;
    const auto port_type  = new_prop->get_src_info().type;
    UHD_ASSERT_THROW(port_type == res_source_info::INPUT_EDGE
                     || port_type == res_source_info::OUTPUT_EDGE);

    // Now comes the hard part: Figure out which other properties need to be
    // created, and which resolvers need to be instantiated
    if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) {
        // Figure out if there's an opposite port
        const auto opposite_port_type = res_source_info::invert_edge(port_type);
        if (_has_port({opposite_port_type, port_idx})) {
            // Make sure that the other side's property exists:
            // This is a safe recursion, because we've already created and
            // registered this property.
            auto opposite_prop =
                inject_edge_property(new_prop, {opposite_port_type, port_idx});
            // Now add a resolver that will always forward the value from this
            // property to the other one.
            add_property_resolver(
                {new_prop}, {opposite_prop}, [new_prop, opposite_prop]() {
                    prop_accessor_t{}.forward<false>(new_prop, opposite_prop);
                });
        }
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) {
        const auto opposite_port_type = res_source_info::invert_edge(port_type);
        const size_t num_ports        = opposite_port_type == res_source_info::INPUT_EDGE
                                     ? get_num_input_ports()
                                     : get_num_output_ports();
        for (size_t i = 0; i < num_ports; i++) {
            auto opposite_prop = inject_edge_property(new_prop, {opposite_port_type, i});
            // Now add a resolver that will always forward the value from this
            // property to the other one.
            add_property_resolver(
                {new_prop}, {opposite_prop}, [new_prop, opposite_prop]() {
                    prop_accessor_t{}.forward<false>(new_prop, opposite_prop);
                });
        }
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
        || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) {
        // Loop through all other ports, make sure those properties exist
        for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports();
             other_port_idx++) {
            if (port_type == res_source_info::INPUT_EDGE && other_port_idx == port_idx) {
                continue;
            }
            inject_edge_property(new_prop, {res_source_info::INPUT_EDGE, other_port_idx});
        }
        // Now add a dynamic resolver that will update all input properties.
        // In order to keep this code simple, we bypass the write list and
        // get access via the prop_accessor.
        add_property_resolver({new_prop}, {/* empty */}, [this, new_prop, port_idx]() {
            for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports();
                 other_port_idx++) {
                if (other_port_idx == port_idx) {
                    continue;
                }
                auto prop = _find_property(
                    {res_source_info::INPUT_EDGE, other_port_idx}, new_prop->get_id());
                if (prop) {
                    prop_accessor_t{}.forward<false>(new_prop, prop);
                }
            }
        });
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
        || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) {
        // Loop through all other ports, make sure those properties exist
        for (size_t other_port_idx = 0; other_port_idx < get_num_output_ports();
             other_port_idx++) {
            if (port_type == res_source_info::OUTPUT_EDGE && other_port_idx == port_idx) {
                continue;
            }
            inject_edge_property(
                new_prop, {res_source_info::OUTPUT_EDGE, other_port_idx});
        }
        // Now add a dynamic resolver that will update all input properties.
        // In order to keep this code simple, we bypass the write list and
        // get access via the prop_accessor.
        add_property_resolver({new_prop}, {/* empty */}, [this, new_prop, port_idx]() {
            for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports();
                 other_port_idx++) {
                if (other_port_idx == port_idx) {
                    continue;
                }
                auto prop = _find_property(
                    {res_source_info::OUTPUT_EDGE, other_port_idx}, new_prop->get_id());
                if (prop) {
                    prop_accessor_t{}.forward<false>(new_prop, prop);
                }
            }
        });
    }
    if (fwd_policy == forwarding_policy_t::USE_MAP) {
        const auto src_info   = new_prop->get_src_info();
        const auto& map_entry = _prop_fwd_map.find(src_info);
        if (map_entry != _prop_fwd_map.end()) {
            for (const auto& dst : map_entry->second) {
                if (!_has_port(dst)) {
                    throw uhd::rfnoc_error("Destination port " + dst.to_string()
                                           + " in prop map does not exist");
                }
                auto next_prop = inject_edge_property(new_prop, dst);
                // Now add a resolver that will always forward the value from this
                // property to the other one.
                add_property_resolver({new_prop}, {next_prop}, [new_prop, next_prop]() {
                    prop_accessor_t{}.forward<false>(new_prop, next_prop);
                });
            }
        } else {
            RFNOC_LOG_TRACE("Dropping incoming prop on " << src_info.to_string()
                                                         << " (no destinations in map)");
        }
    }

    return new_prop;
}


void node_t::init_props()
{
    std::lock_guard<std::mutex> _l(_prop_mutex);

    prop_accessor_t prop_accessor{};

    for (auto& resolver_tuple : _prop_resolvers) {
        // 1) Set all outputs to RWLOCKED
        auto& outputs = std::get<1>(resolver_tuple);
        for (auto& output : outputs) {
            prop_accessor.set_access(output, property_base_t::RWLOCKED);
        }

        // 2) Run the resolver
        try {
            std::get<2>(resolver_tuple)();
        } catch (const uhd::resolve_error& ex) {
            UHD_LOGGER_WARNING(get_unique_id())
                << "Failed to initialize node. Most likely cause: Inconsistent default "
                   "values. Resolver threw this error: "
                << ex.what();
            // throw uhd::runtime_error(std::string("Failed to initialize node ") +
            // get_unique_id());
        }

        // 3) Set outputs back to RO
        for (auto& output : outputs) {
            prop_accessor.set_access(output, property_base_t::RO);
        }
    }

    // 4) Mark properties as clean and read-only
    clean_props();
}


void node_t::resolve_props()
{
    prop_accessor_t prop_accessor{};
    const prop_ptrs_t initial_dirty_props =
        filter_props([](property_base_t* prop) { return prop->is_dirty(); });
    std::list<property_base_t*> all_dirty_props(
        initial_dirty_props.cbegin(), initial_dirty_props.cend());
    prop_ptrs_t processed_props{};
    prop_ptrs_t written_props{};
    RFNOC_LOG_TRACE("Locally resolving " << all_dirty_props.size()
                                         << " dirty properties plus dependencies.");

    // Loop through all dirty properties. The list can be amended during the
    // loop execution.
    for (auto it = all_dirty_props.begin(); it != all_dirty_props.end(); ++it) {
        auto current_input_prop = *it;
        if (processed_props.count(current_input_prop)) {
            continue;
        }
        // Find all resolvers that take this dirty property as an input:
        for (auto& resolver_tuple : _prop_resolvers) {
            auto& inputs  = std::get<0>(resolver_tuple);
            auto& outputs = std::get<1>(resolver_tuple);
            if (!inputs.count(current_input_prop)) {
                continue;
            }

            // Enable outputs
            std::vector<uhd::utils::scope_exit::uptr> access_holder;
            access_holder.reserve(outputs.size());
            for (auto& output : outputs) {
                access_holder.emplace_back(prop_accessor.get_scoped_prop_access(*output,
                    written_props.count(output) ? property_base_t::access_t::RWLOCKED
                                                : property_base_t::access_t::RW));
            }

            // Run resolver
            std::get<2>(resolver_tuple)();

            // Take note of outputs
            written_props.insert(outputs.cbegin(), outputs.cend());

            // Add all outputs that are dirty to the list, unless they have
            // already been processed
            for (auto& output_prop : outputs) {
                if (output_prop->is_dirty() && processed_props.count(output_prop) == 0) {
                    all_dirty_props.push_back(output_prop);
                }
            }

            // RW or RWLOCKED gets released here as access_holder goes out of scope.
        }
        processed_props.insert(current_input_prop);
    }
}

void node_t::resolve_all()
{
    _resolve_all_cb();
}


void node_t::clean_props()
{
    prop_accessor_t prop_accessor{};
    for (const auto& type_prop_pair : _props) {
        for (const auto& prop : type_prop_pair.second) {
            if (prop->is_valid() && prop->is_dirty() && _clean_cb_registry.count(prop)) {
                _clean_cb_registry.at(prop)();
            }
            prop_accessor.mark_clean(*prop);
            prop_accessor.set_access(prop, property_base_t::RO);
        }
    }
}


void node_t::forward_edge_property(
    property_base_t* incoming_prop, const size_t incoming_port)
{
    UHD_ASSERT_THROW(
        incoming_prop->get_src_info().type == res_source_info::INPUT_EDGE
        || incoming_prop->get_src_info().type == res_source_info::OUTPUT_EDGE);
    RFNOC_LOG_TRACE("Incoming edge property: `"
                    << incoming_prop->get_id()
                    << "`, source info: " << incoming_prop->get_src_info().to_string());

    // Don't forward properties that are not yet valid
    if (!incoming_prop->is_valid()) {
        RFNOC_LOG_TRACE("Skipped empty edge property: `"
                        << incoming_prop->get_id() << "`, source info: "
                        << incoming_prop->get_src_info().to_string());
        return;
    }

    // The source type of my local prop (it's the opposite of the source type
    // of incoming_prop)
    const auto prop_src_type =
        res_source_info::invert_edge(incoming_prop->get_src_info().type);
    // Set of local properties that match incoming_prop. It can be an empty set,
    // or, if the node is misconfigured, a set with more than one entry. Or, if
    // all is as expected, it's a set with a single entry.
    auto local_prop_set = filter_props(
        [prop_src_type, incoming_prop, incoming_port](property_base_t* prop) -> bool {
            return prop->get_src_info().type == prop_src_type
                   && prop->get_src_info().instance == incoming_port
                   && prop->get_id() == incoming_prop->get_id();
        });

    // If there is no such property, we're forwarding a new property
    if (local_prop_set.empty()) {
        RFNOC_LOG_TRACE(
            "Received unknown incoming edge prop: " << incoming_prop->get_id());
        local_prop_set.emplace(
            inject_edge_property(incoming_prop, {prop_src_type, incoming_port}));
    }
    // There must be either zero results, or one
    UHD_ASSERT_THROW(local_prop_set.size() == 1);

    auto local_prop = *local_prop_set.begin();

    prop_accessor_t prop_accessor{};
    prop_accessor.forward<false>(incoming_prop, local_prop);
}

void node_t::receive_action(const res_source_info& src_info, action_info::sptr action)
{
    std::lock_guard<std::mutex> l(_action_mutex);
    // See if the user defined an action handler for us:
    if (_action_handlers.count(action->key)) {
        _action_handlers.at(action->key)(src_info, action);
        return;
    }

    // We won't forward actions if they were for us
    if (src_info.type == res_source_info::USER) {
        RFNOC_LOG_TRACE("Dropping USER action " << action->key << "#" << action->id);
        return;
    }

    // Otherwise, we need to figure out the correct default action handling:
    const auto fwd_policy = [&](const std::string& id) {
        if (_action_fwd_policies.count(id)) {
            return _action_fwd_policies.at(id);
        }
        return _action_fwd_policies.at("");
    }(action->key);

    // Now implement custom forwarding for all forwarding policies:
    if (fwd_policy == forwarding_policy_t::DROP) {
        RFNOC_LOG_TRACE("Dropping action " << action->key);
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) {
        RFNOC_LOG_TRACE("Forwarding action " << action->key << " to opposite port");
        const res_source_info dst_info{
            res_source_info::invert_edge(src_info.type), src_info.instance};
        if (_has_port(dst_info)) {
            post_action(dst_info, action);
        }
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) {
        RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all opposite ports");
        const auto new_edge_type = res_source_info::invert_edge(src_info.type);
        const size_t num_ports   = new_edge_type == res_source_info::INPUT_EDGE
                                     ? get_num_input_ports()
                                     : get_num_output_ports();
        for (size_t i = 0; i < num_ports; i++) {
            post_action({new_edge_type, i}, action);
        }
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
        || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) {
        RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all input ports");
        for (size_t i = 0; i < get_num_input_ports(); i++) {
            if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) {
                continue;
            }
            post_action({res_source_info::INPUT_EDGE, i}, action);
        }
    }
    if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
        || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) {
        RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all output ports");
        for (size_t i = 0; i < get_num_output_ports(); i++) {
            if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) {
                continue;
            }
            post_action({res_source_info::OUTPUT_EDGE, i}, action);
        }
    }
    if (fwd_policy == forwarding_policy_t::USE_MAP) {
        const auto& map_entry = _action_fwd_map.find(src_info);
        if (map_entry != _action_fwd_map.end()) {
            for (const auto& dst : map_entry->second) {
                if (!_has_port(dst)) {
                    throw uhd::rfnoc_error("Destination port " + dst.to_string()
                                           + " in action map does not exist");
                }
                RFNOC_LOG_TRACE(
                    "Forwarding action " << action->key << " to " << dst.to_string());
                post_action(dst, action);
            }
        } else {
            RFNOC_LOG_TRACE("Dropping action " << action->key << " on "
                                               << src_info.to_string()
                                               << " (no destinations in map)");
        }
    }
}

void node_t::shutdown()
{
    RFNOC_LOG_DEBUG("shutdown() not implemented.");
}

bool node_t::_has_port(const res_source_info& port_info) const
{
    return (port_info.type == res_source_info::INPUT_EDGE
               && port_info.instance < get_num_input_ports())
           || (port_info.type == res_source_info::OUTPUT_EDGE
                  && port_info.instance < get_num_output_ports());
}