path: root/host/lib/experts/expert_container.cpp
diff options
Diffstat (limited to 'host/lib/experts/expert_container.cpp')
1 files changed, 528 insertions, 0 deletions
diff --git a/host/lib/experts/expert_container.cpp b/host/lib/experts/expert_container.cpp
new file mode 100644
index 000000000..dc98ee4ed
--- /dev/null
+++ b/host/lib/experts/expert_container.cpp
@@ -0,0 +1,528 @@
+// Copyright 2016 Ettus Research
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU General Public License for more details.
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+#include "expert_container.hpp"
+#include <uhd/exception.hpp>
+#include <uhd/utils/msg.hpp>
+#include <boost/format.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread.hpp>
+#include <boost/graph/graph_traits.hpp>
+#include <boost/graph/depth_first_search.hpp>
+#include <boost/graph/topological_sort.hpp>
+#include <boost/graph/adjacency_list.hpp>
+#define EX_LOG(depth, str) _log(depth, str)
+#define EX_LOG(depth, str)
+namespace uhd { namespace experts {
+typedef boost::adjacency_list<
+ boost::vecS, //Container used to represent the edge-list for each of the vertices.
+ boost::vecS, //container used to represent the vertex-list of the graph.
+ boost::directedS, //Directionality of graph
+ dag_vertex_t*, //Storage for each vertex
+ boost::no_property, //Storage for each edge
+ boost::no_property, //Storage for graph object
+ boost::listS //Container used to represent the edge-list for the graph.
+> expert_graph_t;
+typedef std::map<std::string, expert_graph_t::vertex_descriptor> vertex_map_t;
+typedef std::list<expert_graph_t::vertex_descriptor> node_queue_t;
+typedef boost::graph_traits<expert_graph_t>::edge_iterator edge_iter;
+typedef boost::graph_traits<expert_graph_t>::vertex_iterator vertex_iter;
+class expert_container_impl : public expert_container
+private: //Visitor class for cycle detection algorithm
+ struct cycle_det_visitor : public boost::dfs_visitor<>
+ {
+ cycle_det_visitor(std::vector<std::string>& back_edges):
+ _back_edges(back_edges) {}
+ template <class Edge, class Graph>
+ void back_edge(Edge u, const Graph& g) {
+ _back_edges.push_back(
+ g[boost::source(u,g)]->get_name() + "->" + g[boost::target(u,g)]->get_name());
+ }
+ private: std::vector<std::string>& _back_edges;
+ };
+ expert_container_impl(const std::string& name):
+ _name(name)
+ {
+ }
+ ~expert_container_impl()
+ {
+ clear();
+ }
+ const std::string& get_name() const
+ {
+ return _name;
+ }
+ void resolve_all(bool force = false)
+ {
+ boost::lock_guard<boost::recursive_mutex> resolve_lock(_resolve_mutex);
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ EX_LOG(0, str(boost::format("resolve_all(%s)") % (force?"force":"")));
+ _resolve_helper("", "", force);
+ }
+ void resolve_from(const std::string& node_name)
+ {
+ boost::lock_guard<boost::recursive_mutex> resolve_lock(_resolve_mutex);
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ EX_LOG(0, str(boost::format("resolve_from(%s)") % node_name));
+ _resolve_helper(node_name, "", false);
+ }
+ void resolve_to(const std::string& node_name)
+ {
+ boost::lock_guard<boost::recursive_mutex> resolve_lock(_resolve_mutex);
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ EX_LOG(0, str(boost::format("resolve_to(%s)") % node_name));
+ _resolve_helper("", node_name, false);
+ }
+ dag_vertex_t& retrieve(const std::string& name) const
+ {
+ try {
+ expert_graph_t::vertex_descriptor vertex = _lookup_vertex(name);
+ return _get_vertex(vertex);
+ } catch(std::exception&) {
+ throw uhd::lookup_error("failed to find node " + name + " in expert graph");
+ }
+ }
+ const dag_vertex_t& lookup(const std::string& name) const
+ {
+ return retrieve(name);
+ }
+ const node_retriever_t& node_retriever() const
+ {
+ return *this;
+ }
+ std::string to_dot() const
+ {
+ static const std::string DATA_SHAPE("ellipse");
+ static const std::string WORKER_SHAPE("box");
+ std::string dot_str;
+ dot_str += "digraph uhd_experts_" + _name + " {\n rankdir=LR;\n";
+ // Iterate through the vertices and print them out
+ for (std::pair<vertex_iter, vertex_iter> vi = boost::vertices(_expert_dag);
+ vi.first != vi.second;
+ ++vi.first
+ ) {
+ const dag_vertex_t& vertex = _get_vertex(*vi.first);
+ if (vertex.get_class() != CLASS_WORKER) {
+ dot_str += str(boost::format(" %d [label=\"%s\",shape=%s,xlabel=\"%s\"];\n") %
+ boost::uint32_t(*vi.first) % vertex.get_name() %
+ DATA_SHAPE % vertex.get_dtype());
+ } else {
+ dot_str += str(boost::format(" %d [label=\"%s\",shape=%s];\n") %
+ boost::uint32_t(*vi.first) % vertex.get_name() % WORKER_SHAPE);
+ }
+ }
+ // Iterate through the edges and print them out
+ for (std::pair<edge_iter, edge_iter> ei = boost::edges(_expert_dag);
+ ei.first != ei.second;
+ ++ei.first
+ ) {
+ dot_str += str(boost::format(" %d -> %d;\n") %
+ boost::uint32_t(boost::source(*(ei.first), _expert_dag)) %
+ boost::uint32_t(boost::target(*(ei.first), _expert_dag)));
+ }
+ dot_str += "}\n";
+ return dot_str;
+ }
+ void debug_audit() const
+ {
+ EX_LOG(0, "debug_audit()");
+ //Test 1: Check for cycles in graph
+ std::vector<std::string> back_edges;
+ cycle_det_visitor cdet_vis(back_edges);
+ boost::depth_first_search(_expert_dag, boost::visitor(cdet_vis));
+ if (back_edges.empty()) {
+ EX_LOG(1, "cycle check ... PASSED");
+ } else {
+ EX_LOG(1, "cycle check ... ERROR!!!");
+ BOOST_FOREACH(const std::string& e, back_edges) {
+ EX_LOG(2, "back edge: " + e);
+ }
+ }
+ back_edges.clear();
+ //Test 2: Check data node input and output edges
+ std::vector<std::string> data_node_issues;
+ BOOST_FOREACH(const vertex_map_t::value_type& v, _datanode_map) {
+ size_t in_count = 0, out_count = 0;
+ for (std::pair<edge_iter, edge_iter> ei = boost::edges(_expert_dag);
+ ei.first != ei.second;
+ ++ei.first
+ ) {
+ if (boost::target(*(ei.first), _expert_dag) == v.second)
+ in_count++;
+ if (boost::source(*(ei.first), _expert_dag) == v.second)
+ out_count++;
+ }
+ bool prop_unused = false;
+ if (in_count > 1) {
+ data_node_issues.push_back(v.first + ": multiple writers (workers)");
+ } else if (in_count > 0) {
+ if (_expert_dag[v.second]->get_class() == CLASS_PROPERTY) {
+ data_node_issues.push_back(v.first + ": multiple writers (worker and property tree)");
+ }
+ } else {
+ if (_expert_dag[v.second]->get_class() != CLASS_PROPERTY) {
+ data_node_issues.push_back(v.first + ": unreachable (will always hold initial value)");
+ } else if (_expert_dag[v.second]->get_class() == CLASS_PROPERTY and not _expert_dag[v.second]->has_write_callback()) {
+ if (out_count > 0) {
+ data_node_issues.push_back(v.first + ": needs explicit resolve after write");
+ } else {
+ data_node_issues.push_back(v.first + ": unused (no readers or writers)");
+ prop_unused = true;
+ }
+ }
+ }
+ if (out_count < 1) {
+ if (_expert_dag[v.second]->get_class() != CLASS_PROPERTY) {
+ data_node_issues.push_back(v.first + ": unused (is not read by any worker)");
+ } else if (_expert_dag[v.second]->get_class() == CLASS_PROPERTY and not _expert_dag[v.second]->has_read_callback()) {
+ if (not prop_unused) {
+ data_node_issues.push_back(v.first + ": needs explicit resolve to read");
+ }
+ }
+ }
+ }
+ if (data_node_issues.empty()) {
+ EX_LOG(1, "data node check ... PASSED");
+ } else {
+ EX_LOG(1, "data node check ... WARNING!");
+ BOOST_FOREACH(const std::string& i, data_node_issues) {
+ EX_LOG(2, i);
+ }
+ }
+ data_node_issues.clear();
+ //Test 3: Check worker node input and output edges
+ std::vector<std::string> worker_issues;
+ BOOST_FOREACH(const vertex_map_t::value_type& v, _worker_map) {
+ size_t in_count = 0, out_count = 0;
+ for (std::pair<edge_iter, edge_iter> ei = boost::edges(_expert_dag);
+ ei.first != ei.second;
+ ++ei.first
+ ) {
+ if (boost::target(*(ei.first), _expert_dag) == v.second)
+ in_count++;
+ if (boost::source(*(ei.first), _expert_dag) == v.second)
+ out_count++;
+ }
+ if (in_count < 1) {
+ worker_issues.push_back(v.first + ": no inputs (will never resolve)");
+ }
+ if (out_count < 1) {
+ worker_issues.push_back(v.first + ": no outputs");
+ }
+ }
+ if (worker_issues.empty()) {
+ EX_LOG(1, "worker check ... PASSED");
+ } else {
+ EX_LOG(1, "worker check ... WARNING!");
+ BOOST_FOREACH(const std::string& i, worker_issues) {
+ EX_LOG(2, i);
+ }
+ }
+ worker_issues.clear();
+ }
+ inline boost::recursive_mutex& resolve_mutex() {
+ return _resolve_mutex;
+ }
+ void add_data_node(dag_vertex_t* data_node, auto_resolve_mode_t resolve_mode)
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ //Sanity check node pointer
+ if (data_node == NULL) {
+ throw uhd::runtime_error("NULL data node passed into expert container for registration.");
+ }
+ //Sanity check the data node and ensure that it is not already in this graph
+ EX_LOG(0, str(boost::format("add_data_node(%s)") % data_node->get_name()));
+ if (data_node->get_class() == CLASS_WORKER) {
+ throw uhd::runtime_error("Supplied node " + data_node->get_name() + " is not a data/property node.");
+ delete data_node;
+ }
+ if (_datanode_map.find(data_node->get_name()) != _datanode_map.end()) {
+ throw uhd::runtime_error("Data node with name " + data_node->get_name() + " already exists");
+ delete data_node;
+ }
+ try {
+ //Add a vertex in this graph for the data node
+ expert_graph_t::vertex_descriptor gr_node = boost::add_vertex(data_node, _expert_dag);
+ EX_LOG(1, str(boost::format("added vertex %s") % data_node->get_name()));
+ _datanode_map.insert(vertex_map_t::value_type(data_node->get_name(), gr_node));
+ //Add resolve callbacks
+ if (resolve_mode == AUTO_RESOLVE_ON_WRITE or resolve_mode == AUTO_RESOLVE_ON_READ_WRITE) {
+ EX_LOG(2, str(boost::format("added write callback")));
+ data_node->set_write_callback(boost::bind(&expert_container_impl::resolve_from, this, _1));
+ }
+ if (resolve_mode == AUTO_RESOLVE_ON_READ or resolve_mode == AUTO_RESOLVE_ON_READ_WRITE) {
+ EX_LOG(2, str(boost::format("added read callback")));
+ data_node->set_read_callback(boost::bind(&expert_container_impl::resolve_to, this, _1));
+ }
+ } catch (...) {
+ clear();
+ throw uhd::assertion_error("Unknown unrecoverable error adding data node. Cleared expert container.");
+ }
+ }
+ void add_worker(worker_node_t* worker)
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ //Sanity check node pointer
+ if (worker == NULL) {
+ throw uhd::runtime_error("NULL worker passed into expert container for registration.");
+ }
+ //Sanity check the data node and ensure that it is not already in this graph
+ EX_LOG(0, str(boost::format("add_worker(%s)") % worker->get_name()));
+ if (worker->get_class() != CLASS_WORKER) {
+ throw uhd::runtime_error("Supplied node " + worker->get_name() + " is not a worker node.");
+ delete worker;
+ }
+ if (_worker_map.find(worker->get_name()) != _worker_map.end()) {
+ throw uhd::runtime_error("Resolver with name " + worker->get_name() + " already exists.");
+ delete worker;
+ }
+ try {
+ //Add a vertex in this graph for the worker node
+ expert_graph_t::vertex_descriptor gr_node = boost::add_vertex(worker, _expert_dag);
+ EX_LOG(1, str(boost::format("added vertex %s") % worker->get_name()));
+ _worker_map.insert(vertex_map_t::value_type(worker->get_name(), gr_node));
+ //For each input, add an edge from the input to this node
+ BOOST_FOREACH(const std::string& node_name, worker->get_inputs()) {
+ vertex_map_t::const_iterator node = _datanode_map.find(node_name);
+ if (node != _datanode_map.end()) {
+ boost::add_edge((*node).second, gr_node, _expert_dag);
+ EX_LOG(1, str(boost::format("added edge %s->%s") % _expert_dag[(*node).second]->get_name() % _expert_dag[gr_node]->get_name()));
+ } else {
+ throw uhd::runtime_error("Data node with name " + node_name + " was not found");
+ }
+ }
+ //For each output, add an edge from this node to the output
+ BOOST_FOREACH(const std::string& node_name, worker->get_outputs()) {
+ vertex_map_t::const_iterator node = _datanode_map.find(node_name);
+ if (node != _datanode_map.end()) {
+ boost::add_edge(gr_node, (*node).second, _expert_dag);
+ EX_LOG(1, str(boost::format("added edge %s->%s") % _expert_dag[gr_node]->get_name() % _expert_dag[(*node).second]->get_name()));
+ } else {
+ throw uhd::runtime_error("Data node with name " + node_name + " was not found");
+ }
+ }
+ } catch (uhd::runtime_error& ex) {
+ clear();
+ //Promote runtime_error to assertion_error
+ throw uhd::assertion_error(std::string(ex.what()) + " (Cleared expert container because error is unrecoverable).");
+ } catch (...) {
+ clear();
+ throw uhd::assertion_error("Unknown unrecoverable error adding worker. Cleared expert container.");
+ }
+ }
+ void clear()
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ EX_LOG(0, "clear()");
+ // Iterate through the vertices and release their node storage
+ typedef boost::graph_traits<expert_graph_t>::vertex_iterator vertex_iter;
+ for (std::pair<vertex_iter, vertex_iter> vi = boost::vertices(_expert_dag);
+ vi.first != vi.second;
+ ++vi.first
+ ) {
+ try {
+ delete _expert_dag[*vi.first];
+ _expert_dag[*vi.first] = NULL;
+ } catch (...) {
+ //If a dag_vertex is a worker, it has a virtual dtor which
+ //can possibly throw an exception. We will not let that
+ //terminate clear() and leave things in a bad state.
+ }
+ }
+ //The following calls will not throw because they all contain
+ //intrinsic types.
+ // Release all vertices and edges in the DAG
+ _expert_dag.clear();
+ // Release all nodes in the map
+ _worker_map.clear();
+ _datanode_map.clear();
+ }
+ void _resolve_helper(std::string start, std::string stop, bool force)
+ {
+ //Sort the graph topologically. This ensures that for all dependencies, the dependant
+ //is always after all of its dependencies.
+ node_queue_t sorted_nodes;
+ try {
+ boost::topological_sort(_expert_dag, std::front_inserter(sorted_nodes));
+ } catch (boost::not_a_dag&) {
+ std::vector<std::string> back_edges;
+ cycle_det_visitor cdet_vis(back_edges);
+ boost::depth_first_search(_expert_dag, boost::visitor(cdet_vis));
+ if (not back_edges.empty()) {
+ std::string edges;
+ BOOST_FOREACH(const std::string& e, back_edges) {
+ edges += "* " + e + "";
+ }
+ throw uhd::runtime_error("Cannot resolve expert because it has at least one cycle!\n"
+ "The following back-edges were found:" + edges);
+ }
+ }
+ if (sorted_nodes.empty()) return;
+ //Determine the start and stop node. If one is not explicitly specified then
+ //resolve everything
+ expert_graph_t::vertex_descriptor start_vertex = sorted_nodes.front();
+ expert_graph_t::vertex_descriptor stop_vertex = sorted_nodes.back();
+ if (not start.empty()) start_vertex = _lookup_vertex(start);
+ if (not stop.empty()) stop_vertex = _lookup_vertex(stop);
+ //First Pass: Resolve all nodes if they are dirty, in a topological order
+ std::list<dag_vertex_t*> resolved_workers;
+ bool start_node_encountered = false;
+ for (node_queue_t::iterator node_iter = sorted_nodes.begin();
+ node_iter != sorted_nodes.end();
+ ++node_iter
+ ) {
+ //Determine if we are at or beyond the starting node
+ if (*node_iter == start_vertex) start_node_encountered = true;
+ //Only resolve if the starting node has passed
+ if (start_node_encountered) {
+ dag_vertex_t& node = _get_vertex(*node_iter);
+ if (force or node.is_dirty()) {
+ node.resolve();
+ if (node.get_class() == CLASS_WORKER) {
+ resolved_workers.push_back(&node);
+ }
+ EX_LOG(1, str(boost::format("resolved node %s (%s)") % node.get_name() % (node.is_dirty()?"dirty":"clean")));
+ } else {
+ EX_LOG(1, str(boost::format("skipped node %s (%s)") % node.get_name() % (node.is_dirty()?"dirty":"clean")));
+ }
+ }
+ //Determine if we are beyond the stop node
+ if (*node_iter == stop_vertex) break;
+ }
+ //Second Pass: Mark all the workers clean. The policy is that a worker will mark all of
+ //its dependencies clean so after this step all data nodes that are not consumed by a worker
+ //will remain dirty (as they should because no one has consumed their value)
+ for (std::list<dag_vertex_t*>::iterator worker = resolved_workers.begin();
+ worker != resolved_workers.end();
+ ++worker
+ ) {
+ (*worker)->mark_clean();
+ }
+ }
+ expert_graph_t::vertex_descriptor _lookup_vertex(const std::string& name) const
+ {
+ expert_graph_t::vertex_descriptor vertex;
+ //Look for node in the data-node map
+ vertex_map_t::const_iterator vertex_iter = _datanode_map.find(name);
+ if (vertex_iter != _datanode_map.end()) {
+ vertex = (*vertex_iter).second;
+ } else {
+ //If not found, look in the worker-node map
+ vertex_iter = _worker_map.find(name);
+ if (vertex_iter != _worker_map.end()) {
+ vertex = (*vertex_iter).second;
+ } else {
+ throw uhd::lookup_error("Could not find node with name " + name);
+ }
+ }
+ return vertex;
+ }
+ dag_vertex_t& _get_vertex(expert_graph_t::vertex_descriptor desc) const {
+ //Requirement: Node must exist in expert graph
+ dag_vertex_t* vertex_ptr = _expert_dag[desc];
+ if (vertex_ptr) {
+ return *vertex_ptr;
+ } else {
+ throw uhd::assertion_error("Expert graph malformed. Found a NULL node.");
+ }
+ }
+ void _log(size_t depth, const std::string& str) const
+ {
+ std::string indents;
+ for (size_t i = 0; i < depth; i++) indents += "- ";
+ UHD_MSG(fastpath) << "[expert::" + _name + "] " << indents << str << std::endl;
+ }
+ const std::string _name;
+ expert_graph_t _expert_dag; //The primary graph data structure as an adjacency list
+ vertex_map_t _worker_map; //A map from vertex name to vertex descriptor for workers
+ vertex_map_t _datanode_map; //A map from vertex name to vertex descriptor for data nodes
+ boost::mutex _mutex;
+ boost::recursive_mutex _resolve_mutex;
+expert_container::sptr expert_container::make(const std::string& name)
+ return boost::make_shared<expert_container_impl>(name);