From 5a00f4d7864c6258f0d070b4753569413c7cfc4f Mon Sep 17 00:00:00 2001 From: michael-west Date: Fri, 12 Jun 2020 10:13:35 -0700 Subject: RFNoC: Add disconnect methods to graph - Added method to disconnect an edge - Added method to remove a node - Fixed algorithm to check edges during connect. Previous code was checking some edges twice and allowing duplicate edges to be created for existing edges. Signed-off-by: michael-west --- host/include/uhd/rfnoc_graph.hpp | 40 ++++- host/lib/include/uhdlib/rfnoc/graph.hpp | 29 +++- host/lib/rfnoc/graph.cpp | 139 +++++++++++------ host/lib/rfnoc/rfnoc_graph.cpp | 257 +++++++++++++++++++++++++------- 4 files changed, 364 insertions(+), 101 deletions(-) (limited to 'host') diff --git a/host/include/uhd/rfnoc_graph.hpp b/host/include/uhd/rfnoc_graph.hpp index b414cc852..fabee53c7 100644 --- a/host/include/uhd/rfnoc_graph.hpp +++ b/host/include/uhd/rfnoc_graph.hpp @@ -166,7 +166,7 @@ public: const block_id_t& dst_blk, size_t dst_port) = 0; - /*! Connect a RFNOC block with block ID \p src_block to another with block ID \p + /*! Connect a RFNoC block with block ID \p src_block to another with block ID \p * dst_block. * * Note you need to also call this on statically connected blocks if you @@ -221,6 +221,44 @@ public: size_t strm_port, uhd::transport::adapter_id_t adapter_id = uhd::transport::NULL_ADAPTER_ID) = 0; + /*! Disconnect a RFNoC block with block ID \p src_blk from another with block ID + * \p dst_blk. This will logically disconnect the blocks, but the physical + * connection will not be changed until a new connection is made on the source + * port. + * + * \param src_blk The block ID of the source block to disconnect. + * \param src_port The port of the source block to disconnect. + * \param dst_blk The block ID of the destination block to disconnect from. + * \param dst_port The port of the destination block to disconnect from. + */ + virtual void disconnect(const block_id_t& src_blk, + size_t src_port, + const block_id_t& dst_blk, + size_t dst_port) = 0; + + /*! Disconnect a streamer with ID \p streamer_id. This will logically + * disconnect the streamer, but physical connection will not be changed. + * For RX streamers, the physical connection will be changed when a new + * connection is made to the upstream source port(s). For TX streamers, + * the physical connection will be changed when the TX streamer is + * destroyed or the port(s) are connected to another block. + * + * \param streamer_id The ID of the streamer to disconnect. + */ + virtual void disconnect(const std::string& streamer_id) = 0; + + /*! Disconnect port \p port of a streamer with ID \p streamer_id. This + * will logically disconnect the streamer, but physical connection will + * not be changed. For RX streamers, the physical connection will be + * changed when a new connection is made to the upstreame source port. + * For TX streamers, the physical connection will be changed when the TX + * streamer is destroyed or the port is connected to another block. + * + * \param streamer_id The ID of the streamer. + * \param port The port to disconnect. + */ + virtual void disconnect(const std::string& streamer_id, size_t port) = 0; + /*! Enumerate all the possible host transport adapters that can be used to * receive from the specified block * diff --git a/host/lib/include/uhdlib/rfnoc/graph.hpp b/host/lib/include/uhdlib/rfnoc/graph.hpp index a3f6d4e28..fa9921cf6 100644 --- a/host/lib/include/uhdlib/rfnoc/graph.hpp +++ b/host/lib/include/uhdlib/rfnoc/graph.hpp @@ -39,11 +39,24 @@ public: */ void connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edge_info); - // void disconnect(node_ref_t src_node, - // node_ref_t dst_node, - // const size_t src_port, - // const size_t dst_port); - // + /*! Remove a connection from the graph + * + * After this function returns, the nodes will be considered disconnected + * along the ports specified in \p edge_info. + * + * \param src_node A reference to the source node + * \param dst_node A reference to the destination node + * \param edge_info Information about the type of edge + */ + void disconnect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edge_info); + + /*! Remove a node from the graph + * + * Disconnects all edges and removes the node from the graph. + * + * \param src_node A reference to the node + */ + void remove(node_ref_t node); /*! Commit graph and run initial checks * @@ -222,6 +235,12 @@ private: */ void _add_node(node_ref_t node); + /*! Remove a node, but only if it's in the graph. + * + * If it's not there, do nothing. + */ + void _remove_node(node_ref_t node); + /*! Find the neighbouring node for \p origin based on \p port_info * * This function will check port_info to identify the port number and the diff --git a/host/lib/rfnoc/graph.cpp b/host/lib/rfnoc/graph.cpp index 56ebba5b8..2bd47777d 100644 --- a/host/lib/rfnoc/graph.cpp +++ b/host/lib/rfnoc/graph.cpp @@ -42,44 +42,6 @@ auto get_dirty_props(graph_t::node_ref_t node_ref) }); } -/*! Check that \p new_edge_info does not conflict with \p existing_edge_info - * - * \throws uhd::rfnoc_error if it does. - */ -void assert_edge_new(const graph_t::graph_edge_t& new_edge_info, - const graph_t::graph_edge_t& existing_edge_info) -{ - if (existing_edge_info == new_edge_info) { - UHD_LOG_INFO(LOG_ID, - "Ignoring repeated call to connect " - << new_edge_info.src_blockid << ":" << new_edge_info.src_port << " -> " - << new_edge_info.dst_blockid << ":" << new_edge_info.dst_port); - return; - } else if (existing_edge_info.src_port == new_edge_info.src_port - && existing_edge_info.src_blockid == new_edge_info.src_blockid - && existing_edge_info.dst_port == new_edge_info.dst_port - && existing_edge_info.dst_blockid == new_edge_info.dst_blockid) { - UHD_LOG_ERROR(LOG_ID, - "Caught attempt to modify properties of edge " - << existing_edge_info.src_blockid << ":" << existing_edge_info.src_port - << " -> " << existing_edge_info.dst_blockid << ":" - << existing_edge_info.dst_port); - throw uhd::rfnoc_error("Caught attempt to modify properties of edge!"); - } else if (new_edge_info.src_blockid == existing_edge_info.src_blockid - && new_edge_info.src_port == existing_edge_info.src_port) { - UHD_LOG_ERROR(LOG_ID, - "Attempting to reconnect output port " << existing_edge_info.src_blockid - << ":" << existing_edge_info.src_port); - throw uhd::rfnoc_error("Attempting to reconnect output port!"); - } else if (new_edge_info.dst_blockid == existing_edge_info.dst_blockid - && new_edge_info.dst_port == existing_edge_info.dst_port) { - UHD_LOG_ERROR(LOG_ID, - "Attempting to reconnect output port " << existing_edge_info.dst_blockid - << ":" << existing_edge_info.dst_port); - throw uhd::rfnoc_error("Attempting to reconnect input port!"); - } -} - } // namespace /*! Graph-filtering predicate to find dirty nodes only @@ -141,17 +103,56 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg this->enqueue_action(dst_node, src, action); }); - // Check if connection exists - // This can be optimized: Edges can appear in both out_edges and in_edges, - // and we could skip double-checking them. + // Check if edge exists auto out_edge_range = boost::out_edges(src_vertex_desc, _graph); for (auto edge_it = out_edge_range.first; edge_it != out_edge_range.second; ++edge_it) { - assert_edge_new(edge_info, boost::get(edge_property_t(), _graph, *edge_it)); + auto existing_edge_info = boost::get(edge_property_t(), _graph, *edge_it); + + // if exact edge exists, do nothing and return + if (existing_edge_info == edge_info) { + UHD_LOG_INFO(LOG_ID, + "Ignoring repeated call to connect " + << edge_info.src_blockid << ":" << edge_info.src_port << " -> " + << edge_info.dst_blockid << ":" << edge_info.dst_port); + return; + } + + // if there is already an edge for the source block and port + if (existing_edge_info.src_port == edge_info.src_port + && existing_edge_info.src_blockid == edge_info.src_blockid) { + // if same destination block and port + if (existing_edge_info.dst_port == edge_info.dst_port + && existing_edge_info.dst_blockid == edge_info.dst_blockid) { + // attempt to modify edge properties - throw an error + UHD_LOG_ERROR(LOG_ID, + "Caught attempt to modify properties of edge " + << existing_edge_info.src_blockid << ":" + << existing_edge_info.src_port << " -> " + << existing_edge_info.dst_blockid << ":" + << existing_edge_info.dst_port); + throw uhd::rfnoc_error("Caught attempt to modify properties of edge!"); + } else { + // Attempt to reconnect already connected source block and port + UHD_LOG_ERROR(LOG_ID, + "Attempting to reconnect output port " + << existing_edge_info.src_blockid << ":" + << existing_edge_info.src_port); + throw uhd::rfnoc_error("Attempting to reconnect output port!"); + } + } } auto in_edge_range = boost::in_edges(dst_vertex_desc, _graph); for (auto edge_it = in_edge_range.first; edge_it != in_edge_range.second; ++edge_it) { - assert_edge_new(edge_info, boost::get(edge_property_t(), _graph, *edge_it)); + auto existing_edge_info = boost::get(edge_property_t(), _graph, *edge_it); + if (edge_info.dst_blockid == existing_edge_info.dst_blockid + && edge_info.dst_port == existing_edge_info.dst_port) { + UHD_LOG_ERROR(LOG_ID, + "Attempting to reconnect input port " << existing_edge_info.dst_blockid + << ":" + << existing_edge_info.dst_port); + throw uhd::rfnoc_error("Attempting to reconnect input port!"); + } } // Create edge @@ -176,6 +177,35 @@ void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edg } } +void graph_t::disconnect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edge_info) +{ + // Find vertex descriptor + auto src_vertex_desc = _node_map.at(src_node); + auto dst_vertex_desc = _node_map.at(dst_node); + + edge_info.src_blockid = src_node->get_unique_id(); + edge_info.dst_blockid = dst_node->get_unique_id(); + + boost::remove_out_edge_if(src_vertex_desc, + [this, edge_info](rfnoc_graph_t::edge_descriptor edge_desc) { + return (edge_info == boost::get(edge_property_t(), this->_graph, edge_desc)); + }, + _graph); + + if (boost::degree(src_vertex_desc, _graph) == 0) { + _remove_node(src_node); + } + + if (boost::degree(dst_vertex_desc, _graph) == 0) { + _remove_node(dst_node); + } +} + +void graph_t::remove(node_ref_t node) +{ + _remove_node(node); +} + void graph_t::commit() { std::lock_guard l(_release_mutex); @@ -511,6 +541,29 @@ void graph_t::_add_node(node_ref_t new_node) _node_map.emplace(new_node, boost::add_vertex(new_node, _graph)); } +void graph_t::_remove_node(node_ref_t node) +{ + if (_node_map.count(node)) { + auto vertex_desc = _node_map.at(node); + + // Remove all edges + boost::clear_vertex(vertex_desc, _graph); + + // Remove the vertex + boost::remove_vertex(vertex_desc, _graph); + _node_map.erase(node); + + // Removing the vertex changes the vertex descriptors, + // so update the node map + auto vertex_range = boost::vertices(_graph); + for (auto vertex_it = vertex_range.first; vertex_it != vertex_range.second; + vertex_it++) { + auto node = boost::get(vertex_property_t(), _graph, *vertex_it); + _node_map[node] = *vertex_it; + } + } +} + void graph_t::_forward_edge_props(graph_t::rfnoc_graph_t::vertex_descriptor origin) { diff --git a/host/lib/rfnoc/rfnoc_graph.cpp b/host/lib/rfnoc/rfnoc_graph.cpp index 6304fa467..027d27f81 100644 --- a/host/lib/rfnoc/rfnoc_graph.cpp +++ b/host/lib/rfnoc/rfnoc_graph.cpp @@ -35,6 +35,29 @@ struct block_xbar_info noc_id_t noc_id; size_t inst_num; }; + +//! Information about a specific connection (used for streamer disconnect) +struct connection_info_t +{ + detail::graph_t::node_ref_t src; + detail::graph_t::node_ref_t dst; + graph_edge_t edge; +}; + +//! Information about a streamer (used for streamer disconnect) +struct streamer_info_t +{ + detail::graph_t::node_ref_t node; + std::map connections; +}; + +//! Information about a route (used for physical connect/disconnect) +struct route_info_t +{ + graph_edge_t::edge_t edge_type; + graph_edge_t src_static_edge; + graph_edge_t dst_static_edge; +}; } // namespace class rfnoc_graph_impl : public rfnoc_graph @@ -209,6 +232,30 @@ public: skip_property_propagation); } + void disconnect(const block_id_t& src_blk, + size_t src_port, + const block_id_t& dst_blk, + size_t dst_port) + { + if (not has_block(src_blk)) { + throw uhd::lookup_error( + std::string("Cannot disconnect blocks, source block not found: ") + + src_blk.to_string()); + } + if (not has_block(dst_blk)) { + throw uhd::lookup_error( + std::string("Cannot disconnect blocks, destination block not found: ") + + dst_blk.to_string()); + } + auto edge_type = _physical_disconnect(src_blk, src_port, dst_blk, dst_port); + graph_edge_t edge_info(src_port, dst_port, edge_type, true); + auto src = get_block(src_blk); + auto dst = get_block(dst_blk); + edge_info.src_blockid = src->get_unique_id(); + edge_info.dst_blockid = dst->get_unique_id(); + _graph->disconnect(src.get(), dst.get(), edge_info); + } + void connect(uhd::tx_streamer::sptr streamer, size_t strm_port, const block_id_t& dst_blk, @@ -264,6 +311,10 @@ public: auto dst = get_block(dst_blk); graph_edge_t edge_info(strm_port, dst_port, graph_edge_t::TX_STREAM, true); _graph->connect(rfnoc_streamer.get(), dst.get(), edge_info); + + _tx_streamers[rfnoc_streamer->get_unique_id()].node = rfnoc_streamer.get(); + _tx_streamers[rfnoc_streamer->get_unique_id()].connections[strm_port] = { + rfnoc_streamer.get(), dst.get(), edge_info}; } void connect(const block_id_t& src_blk, @@ -321,20 +372,85 @@ public: auto src = get_block(src_blk); graph_edge_t edge_info(src_port, strm_port, graph_edge_t::RX_STREAM, true); _graph->connect(src.get(), rfnoc_streamer.get(), edge_info); + + _rx_streamers[rfnoc_streamer->get_unique_id()].node = rfnoc_streamer.get(); + _rx_streamers[rfnoc_streamer->get_unique_id()].connections[strm_port] = { + src.get(), rfnoc_streamer.get(), edge_info}; + } + + void disconnect(const std::string& streamer_id) + { + UHD_LOG_TRACE(LOG_ID, std::string("Disconnecting ") + streamer_id); + if (_tx_streamers.count(streamer_id)) { + // TODO: Physically disconnect all connections + // This may not be strictly necessary because the destruction of + // the xport will prevent packets from being sent to the + // destination. + + // Remove the node from the graph + _graph->remove(_tx_streamers[streamer_id].node); + + // Remove the streamer from the map + _tx_streamers.erase(streamer_id); + } else if (_rx_streamers.count(streamer_id)) { + // TODO: Physically disconnect all connections + + // Remove the node from the graph (logically disconnect) + _graph->remove(_rx_streamers[streamer_id].node); + + // Remove the streamer from the map + _rx_streamers.erase(streamer_id); + } else { + throw uhd::lookup_error( + std::string("Cannot disconnect streamer. Streamer not found: ") + + streamer_id); + } + UHD_LOG_TRACE(LOG_ID, std::string("Disconnected ") + streamer_id); + } + + void disconnect(const std::string& streamer_id, size_t port) + { + std::string id_str = streamer_id + ":" + std::to_string(port); + UHD_LOG_TRACE(LOG_ID, std::string("Disconnecting ") + id_str); + if (_tx_streamers.count(streamer_id)) { + if (_tx_streamers[streamer_id].connections.count(port)) { + auto connection = _tx_streamers[streamer_id].connections[port]; + _graph->disconnect(connection.src, connection.dst, connection.edge); + _tx_streamers[streamer_id].connections.erase(port); + } else { + throw uhd::lookup_error( + std::string("Cannot disconnect. Port not connected: ") + id_str); + } + } else if (_rx_streamers.count(streamer_id)) { + if (_rx_streamers[streamer_id].connections.count(port)) { + auto connection = _rx_streamers[streamer_id].connections[port]; + // TODO: Physically disconnect port + _graph->disconnect(connection.src, connection.dst, connection.edge); + _rx_streamers[streamer_id].connections.erase(port); + } else { + throw uhd::lookup_error( + std::string("Cannot disconnect. Port not connected: ") + id_str); + } + } else { + throw uhd::lookup_error( + std::string("Cannot disconnect streamer. Streamer not found: ") + + streamer_id); + } + UHD_LOG_TRACE(LOG_ID, std::string("Disconnected ") + id_str); } uhd::rx_streamer::sptr create_rx_streamer( - const size_t num_chans, const uhd::stream_args_t& args) + const size_t num_ports, const uhd::stream_args_t& args) { - _rx_streamers.push_back(std::make_shared(num_chans, args)); - return _rx_streamers.back(); + return std::make_shared( + num_ports, args, [this](const std::string& id) { this->disconnect(id); }); } uhd::tx_streamer::sptr create_tx_streamer( - const size_t num_chans, const uhd::stream_args_t& args) + const size_t num_ports, const uhd::stream_args_t& args) { - _tx_streamers.push_back(std::make_shared(num_chans, args)); - return _tx_streamers.back(); + return std::make_shared( + num_ports, args, [this](const std::string& id) { this->disconnect(id); }); } size_t get_num_mboards() const @@ -703,66 +819,62 @@ private: _graph->connect(src_blk.get(), dst_blk.get(), edge_info); } - /*! Internal physical connection helper + /*! Internal helper to get route information * - * Make the connections in the physical device + * Checks the validity of the route and returns route information. * * \throws uhd::routing_error - * if the blocks are statically connected to something else + * if the routing is impossible */ - graph_edge_t::edge_t _physical_connect(const block_id_t& src_blk, + route_info_t _get_route_info(const block_id_t& src_blk, size_t src_port, const block_id_t& dst_blk, size_t dst_port) { + graph_edge_t::edge_t edge_type = graph_edge_t::DYNAMIC; + const std::string src_blk_info = src_blk.to_string() + ":" + std::to_string(src_port); const std::string dst_blk_info = dst_blk.to_string() + ":" + std::to_string(dst_port); // Find the static edge for src_blk:src_port - graph_edge_t src_static_edge = _assert_edge( + auto src_static_edge = _assert_edge( _get_static_edge( [src_blk_id = src_blk.to_string(), src_port](const graph_edge_t& edge) { return edge.src_blockid == src_blk_id && edge.src_port == src_port; }), src_blk_info); + // Now find the static edge for the destination SEP + auto dst_static_edge = _assert_edge( + _get_static_edge( + [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) { + return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port; + }), + dst_blk_info); + // Now see if it's already connected to the destination if (src_static_edge.dst_blockid == dst_blk.to_string() && src_static_edge.dst_port == dst_port) { + // Blocks are statically connected UHD_LOG_TRACE(LOG_ID, "Blocks " << src_blk_info << " and " << dst_blk_info - << " are already statically connected, no physical connection " - "required."); - return graph_edge_t::STATIC; - } - - // If they're not statically connected, the source *must* be connected - // to an SEP, or this route is impossible - if (block_id_t(src_static_edge.dst_blockid).get_block_name() != NODE_ID_SEP) { + << " are statically connected"); + edge_type = graph_edge_t::STATIC; + } else if (block_id_t(src_static_edge.dst_blockid).get_block_name() + != NODE_ID_SEP) { + // Blocks are not statically connected and the source is not + // connected to an SEP const std::string err_msg = src_blk_info + " is neither statically connected to " + dst_blk_info + " nor to an SEP! Routing impossible."; UHD_LOG_ERROR(LOG_ID, err_msg); throw uhd::routing_error(err_msg); - } - - // OK, now we know which source SEP we have - const std::string src_sep_info = src_static_edge.dst_blockid; - const sep_addr_t src_sep_addr = _sep_map.at(src_sep_info); - - // Now find the static edge for the destination SEP - auto dst_static_edge = _assert_edge( - _get_static_edge( - [dst_blk_id = dst_blk.to_string(), dst_port](const graph_edge_t& edge) { - return edge.dst_blockid == dst_blk_id && edge.dst_port == dst_port; - }), - dst_blk_info); - - // If they're not statically connected, the source *must* be connected - // to an SEP, or this route is impossible - if (block_id_t(dst_static_edge.src_blockid).get_block_name() != NODE_ID_SEP) { + } else if (block_id_t(dst_static_edge.src_blockid).get_block_name() + != NODE_ID_SEP) { + // Blocks are not statically connected and the destination is not + // connected to an SEP const std::string err_msg = dst_blk_info + " is neither statically connected to " + src_blk_info + " nor to an SEP! Routing impossible."; @@ -770,21 +882,62 @@ private: throw uhd::routing_error(err_msg); } - // OK, now we know which destination SEP we have - const std::string dst_sep_info = dst_static_edge.src_blockid; - const sep_addr_t dst_sep_addr = _sep_map.at(dst_sep_info); + return {edge_type, src_static_edge, dst_static_edge}; + } + + /*! Internal physical connection helper + * + * Make the connections in the physical device + * + * \throws uhd::routing_error + * if the routing is impossible + */ + graph_edge_t::edge_t _physical_connect(const block_id_t& src_blk, + size_t src_port, + const block_id_t& dst_blk, + size_t dst_port) + { + auto route_info = _get_route_info(src_blk, src_port, dst_blk, dst_port); + + if (route_info.edge_type == graph_edge_t::DYNAMIC) { + const std::string src_sep_info = route_info.src_static_edge.dst_blockid; + const sep_addr_t src_sep_addr = _sep_map.at(src_sep_info); + const std::string dst_sep_info = route_info.dst_static_edge.src_blockid; + const sep_addr_t dst_sep_addr = _sep_map.at(dst_sep_info); + + auto strm_info = _gsm->create_device_to_device_data_stream( + dst_sep_addr, src_sep_addr, false, 0.1, 0.0, false); + + UHD_LOGGER_DEBUG(LOG_ID) + << boost::format( + "Data stream between EPID %d and EPID %d established " + "where downstream buffer can hold %lu bytes and %u packets") + % std::get<0>(strm_info).first % std::get<0>(strm_info).second + % std::get<1>(strm_info).bytes % std::get<1>(strm_info).packets; + } + + return route_info.edge_type; + } - // Now all we need to do is dynamically connect those SEPs - auto strm_info = _gsm->create_device_to_device_data_stream( - dst_sep_addr, src_sep_addr, false, 0.1, 0.0, false); + /*! Internal physical disconnection helper + * + * Disconnects the physical device + * + * \throws uhd::routing_error + * if the routing is impossible + */ + graph_edge_t::edge_t _physical_disconnect(const block_id_t& src_blk, + size_t src_port, + const block_id_t& dst_blk, + size_t dst_port) + { + auto route_info = _get_route_info(src_blk, src_port, dst_blk, dst_port); - UHD_LOGGER_DEBUG(LOG_ID) - << boost::format("Data stream between EPID %d and EPID %d established " - "where downstream buffer can hold %lu bytes and %u packets") - % std::get<0>(strm_info).first % std::get<0>(strm_info).second - % std::get<1>(strm_info).bytes % std::get<1>(strm_info).packets; + if (route_info.edge_type == graph_edge_t::DYNAMIC) { + // TODO: Add call into _gsm to physically disconnect the SEPs + } - return graph_edge_t::DYNAMIC; + return route_info.edge_type; } //! Flush and reset each connected port on the mboard @@ -890,11 +1043,11 @@ private: //! Reference to a packet factory object. Gets initialized just before the GSM std::unique_ptr _pkt_factory; - //! Reference to RX streamers - std::vector _rx_streamers; + //! Map from TX streamer ID to streamer info + std::map _tx_streamers; - //! Reference to TX streamers - std::vector _tx_streamers; + //! Map from RX streamer ID to streamer info + std::map _rx_streamers; }; /* class rfnoc_graph_impl */ -- cgit v1.2.3