aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2019-05-15 10:26:44 -0700
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:14 -0800
commitb8a6c64d6012ab1ec0b3b843fccec2d990d440a3 (patch)
tree31a99d71af5a6aa2db2a7c9f2a7d19986a2d3856 /host
parentd6251df6347390e74784b2fbe116b0e64780547e (diff)
downloaduhd-b8a6c64d6012ab1ec0b3b843fccec2d990d440a3.tar.gz
uhd-b8a6c64d6012ab1ec0b3b843fccec2d990d440a3.tar.bz2
uhd-b8a6c64d6012ab1ec0b3b843fccec2d990d440a3.zip
rfnoc: Add action API
- Added action_info class - Allow to send actions from node to node - Allow to post actions into nodes - Allow to set default forwarding policies - Added unit tests
Diffstat (limited to 'host')
-rw-r--r--host/include/uhd/rfnoc/CMakeLists.txt1
-rw-r--r--host/include/uhd/rfnoc/actions.hpp50
-rw-r--r--host/include/uhd/rfnoc/defaults.hpp45
-rw-r--r--host/include/uhd/rfnoc/node.hpp108
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph.hpp49
-rw-r--r--host/lib/include/uhdlib/rfnoc/node_accessor.hpp20
-rw-r--r--host/lib/rfnoc/CMakeLists.txt1
-rw-r--r--host/lib/rfnoc/actions.cpp21
-rw-r--r--host/lib/rfnoc/graph.cpp78
-rw-r--r--host/lib/rfnoc/node.cpp87
-rw-r--r--host/tests/CMakeLists.txt13
-rw-r--r--host/tests/actions_test.cpp81
-rw-r--r--host/tests/rfnoc_graph_mock_nodes.hpp122
13 files changed, 656 insertions, 20 deletions
diff --git a/host/include/uhd/rfnoc/CMakeLists.txt b/host/include/uhd/rfnoc/CMakeLists.txt
index 894d4fda4..052d44090 100644
--- a/host/include/uhd/rfnoc/CMakeLists.txt
+++ b/host/include/uhd/rfnoc/CMakeLists.txt
@@ -13,6 +13,7 @@ if(ENABLE_RFNOC)
blockdef.hpp
block_id.hpp
constants.hpp
+ defaults.hpp
dirtifier.hpp
graph.hpp
node_ctrl_base.hpp
diff --git a/host/include/uhd/rfnoc/actions.hpp b/host/include/uhd/rfnoc/actions.hpp
new file mode 100644
index 000000000..ac454827c
--- /dev/null
+++ b/host/include/uhd/rfnoc/actions.hpp
@@ -0,0 +1,50 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP
+#define INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP
+
+#include <uhd/config.hpp>
+#include <string>
+#include <vector>
+#include <memory>
+
+namespace uhd { namespace rfnoc {
+
+/*! Container for an action
+ *
+ * In the RFNoC context, an action is comparable to a command. Nodes in the
+ * graph can send each other actions. action_info is the payload of such an
+ * action message. Nodes pass shared pointers to action_info objects between
+ * each other to avoid costly copies of large action_info objects.
+ */
+struct UHD_API action_info
+{
+public:
+ using sptr = std::shared_ptr<action_info>;
+ //! A unique counter for this action
+ const size_t id;
+ //! A string identifier for this action
+ std::string key;
+ //! An arbitrary payload. It is up to consumers and producers to
+ // (de-)serialize it.
+ std::vector<uint8_t> payload;
+
+ //! Factory function
+ static sptr make(const std::string& key="")
+ {
+ //return std::make_shared<action_info>(key);
+ return sptr(new action_info(key));
+ }
+
+private:
+ action_info(const std::string& key);
+};
+
+}} /* namespace uhd::rfnoc */
+
+#endif /* INCLUDED_LIBUHD_RFNOC_ACTIONS_HPP */
+
diff --git a/host/include/uhd/rfnoc/defaults.hpp b/host/include/uhd/rfnoc/defaults.hpp
new file mode 100644
index 000000000..3eb9e1d30
--- /dev/null
+++ b/host/include/uhd/rfnoc/defaults.hpp
@@ -0,0 +1,45 @@
+//
+// Copyright 2014 Ettus Research LLC
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RFNOC_DEFAULTS_HPP
+#define INCLUDED_LIBUHD_RFNOC_DEFAULTS_HPP
+
+#include <string>
+
+namespace uhd { namespace rfnoc {
+
+static const std::string CLOCK_KEY_GRAPH("__graph__");
+
+static const std::string PROP_KEY_DECIM("decim");
+static const std::string PROP_KEY_SAMP_RATE("samp_rate");
+static const std::string PROP_KEY_SCALING("scaling");
+static const std::string PROP_KEY_TYPE("type");
+static const std::string PROP_KEY_FREQ("freq");
+static const std::string PROP_KEY_TICK_RATE("tick_rate");
+static const std::string PROP_KEY_SPP("spp");
+
+static const std::string NODE_ID_SEP("SEP");
+
+using io_type_t = std::string;
+static const io_type_t IO_TYPE_SC16 = "sc16";
+
+static const std::string ACTION_KEY_STREAM_CMD("stream_cmd");
+static const std::string ACTION_KEY_RX_EVENT("rx_event");
+
+//! If the block name can't be automatically detected, this name is used
+static const std::string DEFAULT_BLOCK_NAME = "Block";
+//! This NOC-ID is used to look up the default block
+static const uint32_t DEFAULT_NOC_ID = 0xFFFFFFFF;
+static const double DEFAULT_TICK_RATE = 1.0;
+// Whenever we need a default spp value use this, unless there are some
+// block/device-specific constraints. It will keep the frame size below 1500.
+static const int DEFAULT_SPP = 1996;
+
+}} // namespace uhd::rfnoc
+
+#endif /* INCLUDED_LIBUHD_RFNOC_DEFAULTS_HPP */
+
diff --git a/host/include/uhd/rfnoc/node.hpp b/host/include/uhd/rfnoc/node.hpp
index 1e634ecea..54c66c985 100644
--- a/host/include/uhd/rfnoc/node.hpp
+++ b/host/include/uhd/rfnoc/node.hpp
@@ -7,13 +7,14 @@
#ifndef INCLUDED_LIBUHD_RFNOC_NODE_HPP
#define INCLUDED_LIBUHD_RFNOC_NODE_HPP
-#include <uhd/rfnoc/property.hpp>
+#include <uhd/rfnoc/actions.hpp>
#include <uhd/rfnoc/dirtifier.hpp>
-#include <uhd/utils/scope_exit.hpp>
+#include <uhd/rfnoc/property.hpp>
#include <uhd/utils/log.hpp>
-#include <boost/graph/adjacency_list.hpp>
+#include <uhd/utils/scope_exit.hpp>
#include <unordered_map>
#include <unordered_set>
+#include <boost/graph/adjacency_list.hpp>
#include <functional>
#include <memory>
#include <mutex>
@@ -34,6 +35,8 @@ class UHD_API node_t
public:
using resolver_fn_t = std::function<void(void)>;
using resolve_callback_t = std::function<void(void)>;
+ using action_handler_t =
+ std::function<void(const res_source_info&, action_info::sptr)>;
//! Types of property/action forwarding for those not defined by the block itself
enum class forwarding_policy_t {
@@ -134,11 +137,6 @@ public:
const prop_data_t& get_property(
const std::string& id, const size_t instance = 0) /* mutable */;
- /******************************************
- * Action Specific
- ******************************************/
- // TBW
-
protected:
/******************************************
* Internal Registration Functions
@@ -207,20 +205,58 @@ protected:
void set_prop_forwarding_policy(
forwarding_policy_t policy, const std::string& prop_id = "");
+ /******************************************
+ * Internal action forwarding
+ ******************************************/
/*! Handle a request to perform an action. The default action handler
* ignores user action and forwards port actions.
*
- * \param handler The function that is called to handle the action
+ * \param id The action ID for which this action handler is valid. The first
+ * argument to the handler will be a uhd::rfnoc::action_info::sptr,
+ * and its `id` value will match this parameter (unless the same
+ * action handler is registered multiple times).
+ * If this function was previously called with the same `id` value,
+ * the previous action handler is overwritten.
+ * \param handler The function that is called to handle the action. It needs
+ * to accept a uhd::rfnoc::res_source_info object, and a
+ * uhd::rfnoc::action_info::sptr.
*/
- // void register_action_handler(std::function<
- // void(const action_info& info, const res_source_info& src)
- //> handler);
+ void register_action_handler(const std::string& id, action_handler_t&& handler);
+
+ /*! Set an action forwarding policy
+ *
+ * Whenever this node is asked to handle an action that is not registered,
+ * this is how the node knows what to do with the action. For example, the
+ * FIFO block controller will almost always want to pass on actions to
+ * the next block.
+ *
+ * This method can be called more than once, and it will overwrite previous
+ * policies.
+ * Typically, this function should only ever be called from within the
+ * constructor.
+ *
+ * \param policy The policy that is applied (see also forwarding_policy_t).
+ * \param action_key The action key that this forwarding policy is applied
+ * to. If \p action_key is not given, it will apply to all
+ * properties, unless a different policy was given with a
+ * matching key.
+ */
+ void set_action_forwarding_policy(
+ forwarding_policy_t policy, const std::string& action_key = "");
+
+ /*! Post an action to an up- or downstream node in the graph.
+ *
+ * If the action is posted to an edge which is not connected, the action
+ * is lost.
+ *
+ * \param edge_info The edge to which this action is posted. If
+ * edge_info.type == INPUT_EDGE, the that means the action
+ * will be posted to an upstream node, on port edge_info.instance.
+ * \param action A reference to the action info object.
+ * \throws uhd::runtime_error if edge_info is not either INPUT_EDGE or OUTPUT_EDGE
+ */
+ void post_action(const res_source_info& edge_info, action_info::sptr action);
- /******************************************
- * Internal action forwarding
- ******************************************/
- // TBW
- //
//! A dirtifyer object, useful for properties that always need updating.
static dirtifier_t ALWAYS_DIRTY;
@@ -369,6 +405,27 @@ private:
property_base_t* incoming_prop, const size_t incoming_port);
/**************************************************************************
+ * Action-Related Methods
+ *************************************************************************/
+ /*! Sets a callback that this node can call if it wants to post actions to
+ * other nodes.
+ */
+ void set_post_action_callback(action_handler_t&& post_handler)
+ {
+ _post_action_cb = std::move(post_handler);
+ }
+
+ /*! This function gets called by the framework when there's a new action for
+ * this node. It will then dispatch appropriate action handlers.
+ *
+ * \param src_info Tells us on which edge this came in. If
+ * src_info.type == INPUT_EDGE, then we received this action
+ * on an input edge.
+ * \param action A reference to the action object
+ */
+ void receive_action(const res_source_info& src_info, action_info::sptr action);
+
+ /**************************************************************************
* Private helpers
*************************************************************************/
//! Return true if this node has a port that matches \p port_info
@@ -414,6 +471,23 @@ private:
std::unordered_map<std::string, forwarding_policy_t> _prop_fwd_policies{{
"", forwarding_policy_t::ONE_TO_ONE}};
+ /**************************************************************************
+ * Action-related attributes
+ *************************************************************************/
+ mutable std::mutex _action_mutex;
+
+ //! Storage for action handlers
+ std::unordered_map<std::string, action_handler_t> _action_handlers;
+
+ //! Default action forwarding policies
+ std::unordered_map<std::string, forwarding_policy_t> _action_fwd_policies{{
+ "", forwarding_policy_t::ONE_TO_ONE}};
+
+ //! Callback which allows us to post actions to other nodes in the graph
+ //
+ // The default callback will simply drop actions
+ action_handler_t _post_action_cb = [](const res_source_info&,
+ action_info::sptr) { /* nop */ };
}; // class node_t
}} /* namespace uhd::rfnoc */
diff --git a/host/lib/include/uhdlib/rfnoc/graph.hpp b/host/lib/include/uhdlib/rfnoc/graph.hpp
index f9fb7ac41..ec6309bd0 100644
--- a/host/lib/include/uhdlib/rfnoc/graph.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph.hpp
@@ -7,10 +7,12 @@
#ifndef INCLUDED_LIBUHD_GRAPH_HPP
#define INCLUDED_LIBUHD_GRAPH_HPP
+#include <uhd/rfnoc/actions.hpp>
#include <uhd/rfnoc/node.hpp>
#include <boost/graph/adjacency_list.hpp>
#include <tuple>
#include <memory>
+#include <deque>
namespace uhd { namespace rfnoc {
@@ -192,6 +194,39 @@ private:
void resolve_all_properties();
/**************************************************************************
+ * Action API
+ *************************************************************************/
+ /*! Entrypoint for action delivery
+ *
+ * When a node invokes its node_t::post_action() function, eventually that
+ * call lands here. This function acts as a mailman, that is, it figures out
+ * which edge on which node is supposed to receive this action, and delivers
+ * it via the node_t::receive_action() method.
+ * Note since this is private, nodes can't directly access this functions.
+ * We provide a lambda to nodes for that purpose.
+ *
+ * When an action is posted, that may trigger further actions. In order not
+ * to go into infinite recursion, this function is also responsible for
+ * serializing the actions. Even so, it is possible that, due to
+ * misconfiguration of nodes and their behaviour, a cascade of actions is
+ * posted that never stops. Therefore, another responsibility of this
+ * function is to track the number of follow-up messages sent, and terminate
+ * an infinite cycle of messages.
+ *
+ * \param src_node Reference to the node where the post_action() call is
+ * originating from
+ * \param src_edge The edge on that node where the action is being posted to.
+ * Note that its the edge from the node's point of view, so
+ * if src_edge.type == OUTPUT_EDGE, then the node posted to
+ * its output edge.
+ *
+ * \throws uhd::runtime_error if it has to terminate a infinite cascade of
+ * actions
+ */
+ void enqueue_action(
+ node_ref_t src_node, res_source_info src_edge, action_info::sptr action);
+
+ /**************************************************************************
* Private graph helpers
*************************************************************************/
template <typename VertexContainerType>
@@ -225,7 +260,7 @@ private:
/*! Find the neighbouring node for \p origin based on \p port_info
*
* This function will check port_info to identify the port number and the
- * direction (input or output) from \p port_info. It will then return a
+ * direction (input or output) from \p origin. It will then return a
* reference to the node that is attached to the node \p origin if such a
* node exists, and the edge info.
*
@@ -263,6 +298,18 @@ private:
// descriptor without having to traverse the graph. The rfnoc_graph_t is not
// efficient for lookups of vertices.
node_map_t _node_map;
+
+ using action_tuple_t = std::tuple<node_ref_t, res_source_info, action_info::sptr>;
+
+ //! FIFO for incoming actions
+ std::deque<action_tuple_t> _action_queue;
+
+ //! Flag to ensure serialized handling of actions
+ std::atomic_flag _action_handling_ongoing;
+
+ //! Mutex for to avoid the user from sending one message before another
+ // message is sent
+ std::recursive_mutex _action_mutex;
};
diff --git a/host/lib/include/uhdlib/rfnoc/node_accessor.hpp b/host/lib/include/uhdlib/rfnoc/node_accessor.hpp
index 554cc8f4f..827c87dd2 100644
--- a/host/lib/include/uhdlib/rfnoc/node_accessor.hpp
+++ b/host/lib/include/uhdlib/rfnoc/node_accessor.hpp
@@ -7,7 +7,9 @@
#ifndef INCLUDED_LIBUHD_NODE_ACCESSOR_HPP
#define INCLUDED_LIBUHD_NODE_ACCESSOR_HPP
+#include <uhd/rfnoc/actions.hpp>
#include <uhd/rfnoc/node.hpp>
+#include <uhd/rfnoc/res_source_info.hpp>
#include <functional>
namespace uhd { namespace rfnoc {
@@ -77,6 +79,24 @@ public:
{
dst_node->forward_edge_property(incoming_prop, dst_port);
}
+
+ /*! Set post action callback for the node
+ *
+ * See node_t::set_post_action_callback() for details.
+ */
+ void set_post_action_callback(node_t* node, node_t::action_handler_t&& post_handler)
+ {
+ node->set_post_action_callback(std::move(post_handler));
+ }
+
+ /*! Send an action to \p node
+ *
+ * This will call node_t::receive_action() (see that for details).
+ */
+ void send_action(node_t* node, const res_source_info& port_info, action_info::sptr action)
+ {
+ node->receive_action(port_info, action);
+ }
};
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index e4715a644..6aab0d499 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -11,6 +11,7 @@
LIBUHD_APPEND_SOURCES(
# Infrastructure:
+ ${CMAKE_CURRENT_SOURCE_DIR}/actions.cpp
${CMAKE_CURRENT_SOURCE_DIR}/async_msg_handler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_container.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_ctrl_base.cpp
diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp
new file mode 100644
index 000000000..1f5f0f2f7
--- /dev/null
+++ b/host/lib/rfnoc/actions.cpp
@@ -0,0 +1,21 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/actions.hpp>
+#include <atomic>
+
+using namespace uhd::rfnoc;
+
+namespace {
+ // A static counter, which we use to uniquely label actions
+ std::atomic<size_t> action_counter{0};
+}
+
+action_info::action_info(const std::string& key_) : id(action_counter++), key(key_)
+{
+ // nop
+}
+
diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp
index f90f70b43..d311a00bd 100644
--- a/host/lib/rfnoc/graph.cpp
+++ b/host/lib/rfnoc/graph.cpp
@@ -18,6 +18,7 @@ using namespace uhd::rfnoc::detail;
namespace {
const std::string LOG_ID = "RFNOC::GRAPH::DETAIL";
+constexpr unsigned MAX_ACTION_ITERATIONS = 200;
/*! Helper function to pretty-print edge info
*/
@@ -120,6 +121,15 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg
src_node, [this]() { this->resolve_all_properties(); });
node_accessor.set_resolve_all_callback(
dst_node, [this]() { this->resolve_all_properties(); });
+ // Set post action callbacks:
+ node_accessor.set_post_action_callback(
+ src_node, [this, src_node](const res_source_info& src, action_info::sptr action) {
+ this->enqueue_action(src_node, src, action);
+ });
+ node_accessor.set_post_action_callback(
+ dst_node, [this, dst_node](const res_source_info& src, action_info::sptr action) {
+ this->enqueue_action(dst_node, src, action);
+ });
// Add nodes to graph, if not already in there:
_add_node(src_node);
@@ -310,6 +320,74 @@ void graph_t::resolve_all_properties()
}
}
+void graph_t::enqueue_action(
+ node_ref_t src_node, res_source_info src_edge, action_info::sptr action)
+{
+ // First, make sure that once we start action handling, no other node from
+ // a different thread can throw in their own actions
+ std::lock_guard<std::recursive_mutex> l(_action_mutex);
+
+ // Check if we're already in the middle of handling actions. In that case,
+ // we're already in the loop below, and then all we want to do is to enqueue
+ // this action tuple. The first call to enqueue_action() within this thread
+ // context will have handling_ongoing == false.
+ const bool handling_ongoing = _action_handling_ongoing.test_and_set();
+
+ _action_queue.emplace_back(std::make_tuple(src_node, src_edge, action));
+ if (handling_ongoing) {
+ UHD_LOG_TRACE(LOG_ID,
+ "Action handling ongoing, deferring delivery of " << action->key << "#"
+ << action->id);
+ return;
+ }
+
+ unsigned iteration_count = 0;
+ while (!_action_queue.empty()) {
+ if (iteration_count++ == MAX_ACTION_ITERATIONS) {
+ throw uhd::runtime_error("Terminating action handling: Reached "
+ "recursion limit!");
+ }
+
+ // Unpack next action
+ auto& next_action = _action_queue.front();
+ node_ref_t action_src_node = std::get<0>(next_action);
+ res_source_info action_src_port = std::get<1>(next_action);
+ action_info::sptr next_action_sptr = std::get<2>(next_action);
+ _action_queue.pop_front();
+
+ // Find the node that is supposed to receive this action, and if we find
+ // something, then send the action
+ auto recipient_info =
+ _find_neighbour(_node_map.at(action_src_node), action_src_port);
+ if (recipient_info.first == nullptr) {
+ UHD_LOG_WARNING(LOG_ID,
+ "Cannot forward action "
+ << action->key << " from " << src_node->get_unique_id()
+ << ":" << src_edge.to_string() << ", no neighbour found!");
+ } else {
+ node_ref_t recipient_node = recipient_info.first;
+ res_source_info recipient_port = {
+ res_source_info::invert_edge(action_src_port.type),
+ action_src_port.type == res_source_info::INPUT_EDGE
+ ? recipient_info.second.dst_port
+ : recipient_info.second.src_port};
+ // The following call can cause other nodes to add more actions to
+ // the end of _action_queue!
+ UHD_LOG_TRACE(LOG_ID,
+ "Now delivering action " << next_action_sptr->key << "#"
+ << next_action_sptr->id);
+ node_accessor_t{}.send_action(
+ recipient_node, recipient_port, next_action_sptr);
+ }
+ }
+ UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling.");
+
+ // Release the action handling flag
+ _action_handling_ongoing.clear();
+ // Now, the _action_mutex is released, and someone else can start sending
+ // actions.
+}
+
/******************************************************************************
* Private methods
*****************************************************************************/
diff --git a/host/lib/rfnoc/node.cpp b/host/lib/rfnoc/node.cpp
index 0b724b889..d569cea4a 100644
--- a/host/lib/rfnoc/node.cpp
+++ b/host/lib/rfnoc/node.cpp
@@ -113,6 +113,27 @@ void node_t::set_prop_forwarding_policy(
_prop_fwd_policies[prop_id] = policy;
}
+void node_t::register_action_handler(const std::string& id, action_handler_t&& handler)
+{
+ if (_action_handlers.count(id)) {
+ _action_handlers.erase(id);
+ }
+ _action_handlers.emplace(id, std::move(handler));
+}
+
+void node_t::set_action_forwarding_policy(
+ node_t::forwarding_policy_t policy, const std::string& action_key)
+{
+ _action_fwd_policies[action_key] = policy;
+}
+
+void node_t::post_action(
+ const res_source_info& edge_info,
+ action_info::sptr action)
+{
+ _post_action_cb(edge_info, action);
+}
+
/*** Private methods *********************************************************/
property_base_t* node_t::_find_property(
res_source_info src_info, const std::string& id) const
@@ -403,6 +424,72 @@ void node_t::forward_edge_property(
prop_accessor.forward<false>(incoming_prop, local_prop);
}
+void node_t::receive_action(const res_source_info& src_info, action_info::sptr action)
+{
+ std::lock_guard<std::mutex> l(_action_mutex);
+ // See if the user defined an action handler for us:
+ if (_action_handlers.count(action->key)) {
+ _action_handlers.at(action->key)(src_info, action);
+ return;
+ }
+
+ // Otherwise, we need to figure out the correct default action handling:
+ const auto fwd_policy = [&](const std::string& id) {
+ if (_action_fwd_policies.count(id)) {
+ return _action_fwd_policies.at(id);
+ }
+ return _action_fwd_policies.at("");
+ }(action->key);
+
+ // Now implement custom forwarding for all forwarding policies:
+ if (fwd_policy == forwarding_policy_t::DROP) {
+ UHD_LOG_TRACE(
+ get_unique_id(), "Dropping action " << action->key);
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) {
+ UHD_LOG_TRACE(
+ get_unique_id(), "Forwarding action " << action->key << " to opposite port");
+ const res_source_info dst_info{
+ res_source_info::invert_edge(src_info.type), src_info.instance};
+ if (_has_port(dst_info)) {
+ post_action(dst_info, action);
+ }
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) {
+ UHD_LOG_TRACE(get_unique_id(),
+ "Forwarding action " << action->key << " to all opposite ports");
+ const auto new_edge_type = res_source_info::invert_edge(src_info.type);
+ const size_t num_ports = new_edge_type == res_source_info::INPUT_EDGE
+ ? get_num_input_ports()
+ : get_num_output_ports();
+ for (size_t i = 0; i < num_ports; i++) {
+ post_action({new_edge_type, i}, action);
+ }
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
+ || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) {
+ UHD_LOG_TRACE(get_unique_id(),
+ "Forwarding action " << action->key << " to all input ports");
+ for (size_t i = 0; i < get_num_input_ports(); i++) {
+ if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) {
+ continue;
+ }
+ post_action({res_source_info::INPUT_EDGE, i}, action);
+ }
+ }
+ if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
+ || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) {
+ UHD_LOG_TRACE(get_unique_id(),
+ "Forwarding action " << action->key << " to all output ports");
+ for (size_t i = 0; i < get_num_output_ports(); i++) {
+ if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) {
+ continue;
+ }
+ post_action({res_source_info::OUTPUT_EDGE, i}, action);
+ }
+ }
+}
+
bool node_t::_has_port(const res_source_info& port_info) const
{
return (port_info.type == res_source_info::INPUT_EDGE
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index f31daed50..c308abdcb 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -246,6 +246,19 @@ UHD_INSTALL(TARGETS
COMPONENT tests
)
+add_executable(actions_test
+ actions_test.cpp
+ ${CMAKE_SOURCE_DIR}/lib/rfnoc/graph.cpp
+)
+target_link_libraries(actions_test uhd ${Boost_LIBRARIES})
+UHD_ADD_TEST(actions_test actions_test)
+UHD_INSTALL(TARGETS
+ actions_test
+ RUNTIME
+ DESTINATION ${PKG_LIB_DIR}/tests
+ COMPONENT tests
+)
+
########################################################################
# demo of a loadable module
########################################################################
diff --git a/host/tests/actions_test.cpp b/host/tests/actions_test.cpp
new file mode 100644
index 000000000..c0344eacf
--- /dev/null
+++ b/host/tests/actions_test.cpp
@@ -0,0 +1,81 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/rfnoc/node.hpp>
+#include <uhd/rfnoc/actions.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/node_accessor.hpp>
+#include <uhdlib/rfnoc/prop_accessor.hpp>
+#include <uhdlib/rfnoc/graph.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+#include "rfnoc_graph_mock_nodes.hpp"
+
+
+const std::string STREAM_CMD_KEY = "stream_cmd";
+
+BOOST_AUTO_TEST_CASE(test_actions_single_node)
+{
+ node_accessor_t node_accessor{};
+
+ // Define some mock nodes:
+ mock_radio_node_t mock_radio(0);
+
+ auto stream_cmd = action_info::make(STREAM_CMD_KEY);
+ std::string cmd_payload = "START";
+ stream_cmd->payload = std::vector<uint8_t>(cmd_payload.begin(), cmd_payload.end());
+
+ auto other_cmd = action_info::make("FOO");
+
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, stream_cmd);
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd);
+
+ mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ONE);
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd);
+ mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_FAN);
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd);
+ mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ALL);
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd);
+ mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ALL_IN);
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd);
+ mock_radio.update_fwd_policy(node_t::forwarding_policy_t::ONE_TO_ALL_OUT);
+ node_accessor.send_action(&mock_radio, {res_source_info::INPUT_EDGE, 0}, other_cmd);
+}
+
+BOOST_AUTO_TEST_CASE(test_actions_simple_graph)
+{
+ node_accessor_t node_accessor{};
+ uhd::rfnoc::detail::graph_t graph{};
+
+ // Define some mock nodes:
+ mock_radio_node_t mock_rx_radio{0};
+ mock_ddc_node_t mock_ddc{};
+ mock_fifo_t mock_fifo{1};
+ mock_streamer_t mock_streamer{1};
+
+ // These init calls would normally be done by the framework
+ node_accessor.init_props(&mock_rx_radio);
+ node_accessor.init_props(&mock_ddc);
+ node_accessor.init_props(&mock_fifo);
+ node_accessor.init_props(&mock_streamer);
+
+ graph.connect(&mock_rx_radio, &mock_ddc, {0, 0, graph_edge_t::DYNAMIC, true});
+ graph.connect(&mock_ddc, &mock_fifo, {0, 0, graph_edge_t::DYNAMIC, true});
+ graph.connect(&mock_fifo, &mock_streamer, {0, 0, graph_edge_t::DYNAMIC, true});
+ graph.initialize();
+
+ // Force the DDC to actually set a decimation rate != 1
+ mock_streamer.set_property<double>("samp_rate", 10e6, 0);
+
+ uhd::stream_cmd_t num_samps_cmd(uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
+ constexpr size_t NUM_SAMPS = 100;
+ num_samps_cmd.num_samps = NUM_SAMPS;
+
+ mock_streamer.issue_stream_cmd(num_samps_cmd, 0);
+ BOOST_CHECK_EQUAL(NUM_SAMPS * mock_ddc.get_property<int>("decim", 0),
+ mock_rx_radio.last_num_samps);
+}
diff --git a/host/tests/rfnoc_graph_mock_nodes.hpp b/host/tests/rfnoc_graph_mock_nodes.hpp
index 85e667ebd..a9d8d4e55 100644
--- a/host/tests/rfnoc_graph_mock_nodes.hpp
+++ b/host/tests/rfnoc_graph_mock_nodes.hpp
@@ -7,7 +7,9 @@
#ifndef INCLUDED_LIBUHD_TESTS_MOCK_NODES_HPP
#define INCLUDED_LIBUHD_TESTS_MOCK_NODES_HPP
+#include <uhd/rfnoc/defaults.hpp>
#include <uhd/rfnoc/node.hpp>
+#include <uhd/types/stream_cmd.hpp>
using namespace uhd::rfnoc;
@@ -82,9 +84,37 @@ public:
rssi_resolver_count++;
_rssi = static_cast<double>(rssi_resolver_count);
});
+
+
+ set_action_forwarding_policy(forwarding_policy_t::DROP);
+
+ register_action_handler(
+ "stream_cmd", [this](const res_source_info& src, action_info::sptr action) {
+ UHD_ASSERT_THROW(action->key == "stream_cmd");
+ const std::string cmd(action->payload.begin(), action->payload.end());
+ UHD_LOG_INFO(get_unique_id(),
+ "Received stream command: " << cmd << " to " << src.to_string());
+ if (cmd == "START") {
+ UHD_LOG_INFO(get_unique_id(), "Starting Stream!");
+ } else if (cmd == "STOP") {
+ UHD_LOG_INFO(get_unique_id(), "Stopping Stream!");
+ } else {
+ this->last_num_samps = std::stoul(cmd);
+ UHD_LOG_INFO(get_unique_id(),
+ "Streaming num samps: " << this->last_num_samps);
+ }
+ });
}
- std::string get_unique_id() const { return "MOCK_RADIO" + std::to_string(_radio_idx); }
+ void update_fwd_policy(forwarding_policy_t policy)
+ {
+ set_action_forwarding_policy(policy);
+ }
+
+ std::string get_unique_id() const
+ {
+ return "MOCK_RADIO" + std::to_string(_radio_idx);
+ }
size_t get_num_input_ports() const
{
@@ -101,6 +131,8 @@ public:
bool disable_samp_out_resolver = false;
double force_samp_out_value = 23e6;
+ size_t last_num_samps = 0;
+
private:
const size_t _radio_idx;
@@ -162,6 +194,31 @@ public:
decim = coerce_decim(int(samp_rate_in.get() / samp_rate_out.get()));
samp_rate_in = samp_rate_out.get() * decim.get();
});
+
+ register_action_handler(
+ "stream_cmd", [this](const res_source_info& src, action_info::sptr action) {
+ res_source_info dst_edge{
+ res_source_info::invert_edge(src.type), src.instance};
+ auto new_action = action_info::make(action->key);
+ std::string cmd(action->payload.begin(), action->payload.end());
+ if (cmd == "START" || cmd == "STOP") {
+ new_action->payload = action->payload;
+ } else {
+ unsigned long long num_samps = std::stoull(cmd);
+ if (src.type == res_source_info::OUTPUT_EDGE) {
+ num_samps *= _decim.get();
+ } else {
+ num_samps /= _decim.get();
+ }
+ std::string new_cmd = std::to_string(num_samps);
+ new_action->payload.insert(
+ new_action->payload.begin(), new_cmd.begin(), new_cmd.end());
+ }
+
+ UHD_LOG_INFO(get_unique_id(),
+ "Forwarding stream_cmd, decim is " << _decim.get());
+ post_action(dst_edge, new_action);
+ });
}
std::string get_unique_id() const { return "MOCK_DDC"; }
@@ -203,7 +260,7 @@ private:
/*! FIFO
*
- * Not much here -- we use it to test dynamic prop forwarding.
+ * Not much here -- we use it to test dynamic prop and action forwarding.
*/
class mock_fifo_t : public node_t
{
@@ -211,6 +268,7 @@ public:
mock_fifo_t(const size_t num_ports) : _num_ports(num_ports)
{
set_prop_forwarding_policy(forwarding_policy_t::ONE_TO_ONE);
+ set_action_forwarding_policy(forwarding_policy_t::ONE_TO_ONE);
}
std::string get_unique_id() const { return "MOCK_FIFO"; }
@@ -230,4 +288,64 @@ private:
const size_t _num_ports;
};
+/*! Streamer
+ *
+ * Not much here -- we use it to test dynamic prop and action forwarding.
+ */
+class mock_streamer_t : public node_t
+{
+public:
+ mock_streamer_t(const size_t num_ports) : _num_ports(num_ports)
+ {
+ set_prop_forwarding_policy(forwarding_policy_t::DROP);
+ set_action_forwarding_policy(forwarding_policy_t::DROP);
+ register_property(&_samp_rate_user);
+ register_property(&_samp_rate_in);
+ add_property_resolver({&_samp_rate_user}, {&_samp_rate_in}, [this]() {
+ UHD_LOG_INFO(get_unique_id(), "Calling resolver for `samp_rate_user'...");
+ _samp_rate_in = _samp_rate_user.get();
+ });
+ add_property_resolver({&_samp_rate_in}, {}, [this]() {
+ UHD_LOG_INFO(get_unique_id(), "Calling resolver for `samp_rate_in'...");
+ // nop
+ });
+ }
+
+ std::string get_unique_id() const
+ {
+ return "MOCK_STREAMER";
+ }
+
+ size_t get_num_input_ports() const
+ {
+ return _num_ports;
+ }
+
+ size_t get_num_output_ports() const
+ {
+ return _num_ports;
+ }
+
+ void issue_stream_cmd(uhd::stream_cmd_t stream_cmd, const size_t chan)
+ {
+ std::string cmd =
+ stream_cmd.stream_mode == uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS
+ ? "START"
+ : stream_cmd.stream_mode == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
+ ? "STOP"
+ : std::to_string(stream_cmd.num_samps);
+ auto scmd = action_info::make("stream_cmd");
+ scmd->payload.insert(scmd->payload.begin(), cmd.begin(), cmd.end());
+
+ post_action({res_source_info::INPUT_EDGE, chan}, scmd);
+ }
+
+private:
+ property_t<double> _samp_rate_user{
+ "samp_rate", 1e6, {res_source_info::USER}};
+ property_t<double> _samp_rate_in{
+ "samp_rate", 1e6, {res_source_info::INPUT_EDGE}};
+ const size_t _num_ports;
+};
+
#endif /* INCLUDED_LIBUHD_TESTS_MOCK_NODES_HPP */