From d7c64eb4b13670dbc5728f994e70dca60616af63 Mon Sep 17 00:00:00 2001 From: michael-west Date: Tue, 7 Jul 2020 13:37:25 -0700 Subject: RFNoC: Add xport disconnect callbacks Transports were not disconnecting their links from the I/O service upon destruction, leaving behind inaccessible send and recv links used by nothing. This led to I/O errors after creating several transports. Added callbacks to transports to automatically disconnect their links from the I/O service when the transport is destroyed. Updated all callers to supply a disconnect callback. Signed-off-by: michael-west --- host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp | 26 +++++++++++++-------- .../include/uhdlib/rfnoc/chdr_rx_data_xport.hpp | 16 +++++++++---- .../include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 12 ++++++++-- host/lib/include/uhdlib/transport/io_service.hpp | 7 ++++++ host/lib/rfnoc/chdr_ctrl_xport.cpp | 8 +++++-- host/lib/rfnoc/chdr_rx_data_xport.cpp | 13 ++++++++--- host/lib/rfnoc/chdr_tx_data_xport.cpp | 17 ++++++++------ host/lib/usrp/mpmd/mpmd_mb_iface.cpp | 27 ++++++++++++++++------ host/lib/usrp/x300/x300_mb_iface.cpp | 27 ++++++++++++++++------ host/tests/streamer_benchmark.cpp | 12 ++++++++-- 10 files changed, 122 insertions(+), 43 deletions(-) (limited to 'host') diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp index 38acd7a34..7d5613f5d 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp @@ -19,12 +19,13 @@ namespace uhd { namespace rfnoc { class chdr_ctrl_xport { public: - using io_service = uhd::transport::io_service; - using frame_buff = uhd::transport::frame_buff; - using send_link_if = uhd::transport::send_link_if; - using recv_link_if = uhd::transport::recv_link_if; - using send_io_if = uhd::transport::send_io_if; - using recv_io_if = uhd::transport::recv_io_if; + using io_service = uhd::transport::io_service; + using frame_buff = uhd::transport::frame_buff; + using send_link_if = uhd::transport::send_link_if; + using recv_link_if = uhd::transport::recv_link_if; + using send_io_if = uhd::transport::send_io_if; + using recv_io_if = uhd::transport::recv_io_if; + using disconnect_callback_t = uhd::transport::disconnect_callback_t; using sptr = std::shared_ptr; @@ -37,6 +38,7 @@ public: * \param my_epid The local EPID for this transport * \param num_send_frames Number of frames to reserve for TX * \param num_recv_frames Number of frames to reserve for RX + * \param disconnect Callback function to disconnect the links */ static sptr make(io_service::sptr io_srv, send_link_if::sptr send_link, @@ -44,7 +46,8 @@ public: const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, - size_t num_recv_frames) + size_t num_recv_frames, + disconnect_callback_t disconnect) { return std::make_shared(io_srv, send_link, @@ -52,7 +55,8 @@ public: pkt_factory, my_epid, num_send_frames, - num_recv_frames); + num_recv_frames, + disconnect); } /*! @@ -64,6 +68,7 @@ public: * \param my_epid The local EPID for this transport * \param num_send_frames Number of frames to reserve for TX * \param num_recv_frames Number of frames to reserve for RX + * \param disconnect Callback function to disconnect the links */ chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, @@ -71,7 +76,8 @@ public: const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, - size_t num_recv_frames); + size_t num_recv_frames, + disconnect_callback_t disconnect); ~chdr_ctrl_xport(); @@ -151,6 +157,8 @@ private: recv_io_if::sptr _ctrl_recv_if; recv_io_if::sptr _mgmt_recv_if; + // Disconnect callback + disconnect_callback_t _disconnect; // FIXME: Remove this when have threaded_io_service std::mutex _mutex; diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp index d6a2e6628..cb0987446 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -100,8 +100,9 @@ private: class chdr_rx_data_xport { public: - using uptr = std::unique_ptr; - using buff_t = transport::frame_buff; + using uptr = std::unique_ptr; + using buff_t = transport::frame_buff; + using disconnect_callback_t = uhd::transport::disconnect_callback_t; //! Values extracted from received RX data packets struct packet_info_t @@ -135,6 +136,7 @@ public: * \param fc_freq Frequency of flow control status messages * \param fc_headroom Headroom for flow control status messages * \param lossy_xport Whether the xport is lossy, for flow control configuration + * \param disconnect Callback function to disconnect the links * \return Parameters for xport flow control */ static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv, @@ -148,7 +150,8 @@ public: const stream_buff_params_t& recv_capacity, const stream_buff_params_t& fc_freq, const stream_buff_params_t& fc_headroom, - const bool lossy_xport); + const bool lossy_xport, + disconnect_callback_t disconnect); /*! Constructor * @@ -159,6 +162,7 @@ public: * \param epids Source and destination endpoint IDs * \param num_recv_frames Num frames to reserve from the recv link * \param fc_params Parameters for flow control + * \param disconnect Callback function to disconnect the links */ chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, @@ -166,7 +170,8 @@ public: const chdr::chdr_packet_factory& pkt_factory, const uhd::rfnoc::sep_id_pair_t& epids, const size_t num_recv_frames, - const fc_params_t& fc_params); + const fc_params_t& fc_params, + disconnect_callback_t disconnect); /*! Destructor */ @@ -404,6 +409,9 @@ private: //! The CHDR width in bytes. size_t _chdr_w_bytes; + + // Disconnect callback + disconnect_callback_t _disconnect; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index ed008c1db..4e8ebe24d 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -106,6 +106,7 @@ public: using buff_t = transport::frame_buff; using enqueue_async_msg_fn_t = std::function; + using disconnect_callback_t = uhd::transport::disconnect_callback_t; //! Information about data packet struct packet_info_t @@ -135,6 +136,7 @@ public: * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata * \param fc_freq_ratio Ratio to use to configure the device fc frequency * \param fc_headroom_ratio Ratio to use to configure the device fc headroom + * \param disconnect Callback function to disconnect the links * \return Parameters for xport flow control */ static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv, @@ -146,7 +148,8 @@ public: const uhd::rfnoc::sw_buff_t pyld_buff_fmt, const uhd::rfnoc::sw_buff_t mdata_buff_fmt, const double fc_freq_ratio, - const double fc_headroom_ratio); + const double fc_headroom_ratio, + disconnect_callback_t disconnect); /*! Constructor * @@ -157,6 +160,7 @@ public: * \param epids Source and destination endpoint IDs * \param num_send_frames Num frames to reserve from the send link * \param fc_params Parameters for flow control + * \param disconnect Callback function to disconnect the links */ chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, @@ -164,7 +168,8 @@ public: const chdr::chdr_packet_factory& pkt_factory, const uhd::rfnoc::sep_id_pair_t& epids, const size_t num_send_frames, - const fc_params_t fc_params); + const fc_params_t fc_params, + disconnect_callback_t disconnect); /*! Destructor */ @@ -397,6 +402,9 @@ private: //! The size of the send frame size_t _frame_size; + + // Disconnect callback + disconnect_callback_t _disconnect; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/transport/io_service.hpp b/host/lib/include/uhdlib/transport/io_service.hpp index 62dfdfe0d..0cc2f86e3 100644 --- a/host/lib/include/uhdlib/transport/io_service.hpp +++ b/host/lib/include/uhdlib/transport/io_service.hpp @@ -77,6 +77,13 @@ namespace uhd { namespace transport { using recv_callback_t = std::function; +/*! + * Callback to disconnect links. This allows a function to be registered + * that can call back to the io_service_mgr to completely disconnect the + * links. + */ +using disconnect_callback_t = std::function; + /*! * Interface for a recv transport to request/release buffers from a link. A * recv transport is a transport with a primary purpose of receiving data, and diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index 8946a3b08..f8b3cf03e 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -19,8 +19,9 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, - size_t num_recv_frames) - : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()) + size_t num_recv_frames, + disconnect_callback_t disconnect) + : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()), _disconnect(disconnect) { /* Make dumb send pipe */ send_io_if::send_callback_t send_cb = [this]( @@ -99,6 +100,9 @@ chdr_ctrl_xport::~chdr_ctrl_xport() _send_if.reset(); _ctrl_recv_if.reset(); _mgmt_recv_if.reset(); + + // Disconnect the transport + _disconnect(); } /*! diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp index d4476d54f..74b310fa1 100644 --- a/host/lib/rfnoc/chdr_rx_data_xport.cpp +++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp @@ -36,11 +36,13 @@ chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, const chdr::chdr_packet_factory& pkt_factory, const uhd::rfnoc::sep_id_pair_t& epids, const size_t num_recv_frames, - const fc_params_t& fc_params) + const fc_params_t& fc_params, + disconnect_callback_t disconnect) : _fc_state(epids, fc_params.freq) , _fc_sender(pkt_factory, epids) , _epid(epids.second) , _chdr_w_bytes(chdr_w_to_bits(pkt_factory.get_chdr_w()) / 8) + , _disconnect(disconnect) { UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", "Creating rx xport with local epid=" << epids.second @@ -87,6 +89,9 @@ chdr_rx_data_xport::~chdr_rx_data_xport() { // Release recv_io before allowing members needed by callbacks be destroyed _recv_io.reset(); + + // Disconnect the links + _disconnect(); } chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv, @@ -100,7 +105,8 @@ chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sp const stream_buff_params_t& recv_capacity, const stream_buff_params_t& fc_freq, const stream_buff_params_t& fc_headroom, - const bool lossy_xport) + const bool lossy_xport, + disconnect_callback_t disconnect) { const sep_id_t remote_epid = epids.first; const sep_id_t local_epid = epids.second; @@ -166,7 +172,8 @@ chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sp pkt_factory, local_epid, 1, // num_send_frames - 1); // num_recv_frames + 1, // num_recv_frames + disconnect); // Setup a route to the EPID // Note that this may be gratuitous--The endpoint may already have been set up diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp index 530da5a47..7e926a163 100644 --- a/host/lib/rfnoc/chdr_tx_data_xport.cpp +++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp @@ -31,12 +31,14 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, const chdr::chdr_packet_factory& pkt_factory, const uhd::rfnoc::sep_id_pair_t& epids, const size_t num_send_frames, - const fc_params_t fc_params) + const fc_params_t fc_params, + disconnect_callback_t disconnect) : _fc_state(fc_params.buff_capacity) , _fc_sender(pkt_factory, epids) , _epid(epids.first) , _chdr_w_bytes(chdr_w_to_bits(pkt_factory.get_chdr_w()) / 8) , _frame_size(send_link->get_send_frame_size()) + , _disconnect(disconnect) { UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", "Creating tx xport with local epid=" << epids.first @@ -78,6 +80,9 @@ chdr_tx_data_xport::~chdr_tx_data_xport() { // Release send_io before allowing members needed by callbacks be destroyed _send_io.reset(); + + // Disconnect the transport + _disconnect(); } /* @@ -225,7 +230,8 @@ chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sp const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, const double fc_freq_ratio, - const double fc_headroom_ratio) + const double fc_headroom_ratio, + disconnect_callback_t disconnect) { const sep_id_t remote_epid = epids.second; const sep_id_t local_epid = epids.first; @@ -238,7 +244,8 @@ chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sp pkt_factory, local_epid, 1, // num_send_frames - 1); // num_recv_frames + 1, // num_recv_frames + disconnect); // Setup a route to the EPID mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); @@ -246,10 +253,6 @@ chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sp mgmt_portal.config_local_tx_stream( *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); - // We no longer need the control xport, release it so - // the control xport is no longer connected to the I/O service. - ctrl_xport.reset(); - return configure_flow_ctrl(io_srv, recv_link, send_link, diff --git a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp index d4f6cc383..534d71a4d 100644 --- a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp +++ b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp @@ -182,7 +182,10 @@ uhd::rfnoc::chdr_ctrl_xport::sptr mpmd_mboard_impl::mpmd_mb_iface::make_ctrl_tra pkt_factory, local_epid, send_link->get_num_send_frames(), - recv_link->get_num_recv_frames()); + recv_link->get_num_recv_frames(), + [this, send_link, recv_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); return xport; } @@ -250,9 +253,11 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport( recv_capacity, fc_freq, fc_headroom, - lossy_xport); + lossy_xport, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); - get_io_srv_mgr()->disconnect_links(recv_link, send_link); cfg_io_srv.reset(); // Connect the links to an I/O service @@ -269,7 +274,10 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport( pkt_factory, epids, recv_link->get_num_recv_frames(), - fc_params); + fc_params, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); return rx_xport; } @@ -320,9 +328,11 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport( pyld_buff_fmt, mdata_buff_fmt, fc_freq_ratio, - fc_headroom_ratio); + fc_headroom_ratio, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); - get_io_srv_mgr()->disconnect_links(recv_link, send_link); cfg_io_srv.reset(); // Connect the links to an I/O service @@ -340,7 +350,10 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport( pkt_factory, epids, send_link->get_num_send_frames(), - buff_capacity); + buff_capacity, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); return tx_xport; } diff --git a/host/lib/usrp/x300/x300_mb_iface.cpp b/host/lib/usrp/x300/x300_mb_iface.cpp index 412b28a92..a6d5e794b 100644 --- a/host/lib/usrp/x300/x300_mb_iface.cpp +++ b/host/lib/usrp/x300/x300_mb_iface.cpp @@ -117,7 +117,10 @@ uhd::rfnoc::chdr_ctrl_xport::sptr x300_impl::x300_mb_iface::make_ctrl_transport( _pkt_factory, local_epid, send_link->get_num_send_frames(), - recv_link->get_num_recv_frames()); + recv_link->get_num_recv_frames(), + [this, send_link, recv_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); return xport; } @@ -182,9 +185,11 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran recv_capacity, fc_freq, fc_headroom, - lossy_xport); + lossy_xport, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); - get_io_srv_mgr()->disconnect_links(recv_link, send_link); cfg_io_srv.reset(); // Connect the links to an I/O service @@ -202,7 +207,10 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran _pkt_factory, epids, recv_link->get_num_recv_frames(), - fc_params); + fc_params, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); return rx_xport; } @@ -251,9 +259,11 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran pyld_buff_fmt, mdata_buff_fmt, fc_freq_ratio, - fc_headroom_ratio); + fc_headroom_ratio, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); - get_io_srv_mgr()->disconnect_links(recv_link, send_link); cfg_io_srv.reset(); // Connect the links to an I/O service @@ -271,7 +281,10 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran _pkt_factory, epids, send_link->get_num_send_frames(), - buff_capacity); + buff_capacity, + [this, recv_link, send_link]() { + this->get_io_srv_mgr()->disconnect_links(recv_link, send_link); + }); return tx_xport; } diff --git a/host/tests/streamer_benchmark.cpp b/host/tests/streamer_benchmark.cpp index 706ecee10..88dd9665b 100644 --- a/host/tests/streamer_benchmark.cpp +++ b/host/tests/streamer_benchmark.cpp @@ -302,7 +302,11 @@ static std::shared_ptr make_rx_streamer_mock_link( pkt_factory, epids, send_link->get_num_send_frames(), - fc_params); + fc_params, + [io_srv = io_srv, recv_link, send_link]() { + io_srv->detach_recv_link(recv_link); + io_srv->detach_send_link(send_link); + }); streamer->connect_channel(0, std::move(xport)); return streamer; @@ -341,7 +345,11 @@ static std::shared_ptr make_tx_streamer_mock_link( pkt_factory, epids, send_link->get_num_send_frames(), - fc_params); + fc_params, + [io_srv = io_srv, recv_link, send_link]() { + io_srv->detach_recv_link(recv_link); + io_srv->detach_send_link(send_link); + }); streamer->connect_channel(0, std::move(xport)); return streamer; -- cgit v1.2.3