diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-09-11 16:50:11 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:45 -0800 |
commit | f312d827602fafa21625106dafe2f209e10a22b3 (patch) | |
tree | fc805bebde263da87da3d7ceeb133014156cba3f | |
parent | 1a6368331bf441290b0d08ac233c9e5050021493 (diff) | |
download | uhd-f312d827602fafa21625106dafe2f209e10a22b3.tar.gz uhd-f312d827602fafa21625106dafe2f209e10a22b3.tar.bz2 uhd-f312d827602fafa21625106dafe2f209e10a22b3.zip |
rfnoc: Fix transport buffer reservations
Change transports to reserve the number of frame buffers they actually
need from the I/O service. Previously some I/O service clients reserved
0 buffers since they shared frame buffers with other clients, as we know
the two clients do not use the links simultaneously. This is possible
with the inline_io_service but not with a multithreaded I/O service
which queues buffer for clients before they are requested.
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp | 7 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/inline_io_service.hpp | 13 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_ctrl_xport.cpp | 10 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_rx_data_xport.cpp | 8 | ||||
-rw-r--r-- | host/lib/rfnoc/mgmt_portal.cpp | 8 | ||||
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 85 |
6 files changed, 74 insertions, 57 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp index 726ea7f6c..2a37a5afc 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp @@ -120,6 +120,13 @@ public: void release_recv_buff(frame_buff::uptr buff); /*! + * Release a frame buffer, allowing the recv link driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_mgmt_buff(frame_buff::uptr buff); + + /*! * Get this xport's EPID * * \return the source EPID for this transport diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp index f10e7018d..f207d15a0 100644 --- a/host/lib/include/uhdlib/transport/inline_io_service.hpp +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -71,9 +71,8 @@ private: * Disconnect the sender and free resources * * \param link the link that was used for sending data - * \param num_frames number of frames to release (same as reservation) */ - void disconnect_sender(send_link_if* link, size_t num_frames); + void disconnect_sender(send_link_if* link); /*! * Connect a receiver to the link and reserve resources @@ -87,9 +86,8 @@ private: * Disconnect the receiver from the provided link and free resources * \param link the recv link that was used for reception * \param cb the callback to disassociate - * \param num_frames the number of frames that was reserved for the cb */ - void disconnect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + void disconnect_receiver(recv_link_if* link, inline_recv_cb* cb); /* * Function to perform recv operations on a link, which is potentially @@ -103,14 +101,11 @@ private: frame_buff::uptr recv( inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); - /* Track whether link is muxed, the callback, and buffer reservations */ + /* Track whether link is muxed and the callback */ std::unordered_map<recv_link_if*, - std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>> + std::tuple<inline_recv_mux*, inline_recv_cb*>> _recv_tbl; - /* Track how many send_frames have been reserved for each link */ - std::unordered_map<send_link_if*, size_t> _send_tbl; - /* Shared ptr kept to avoid untimely release */ std::list<send_link_if::sptr> _send_links; std::list<recv_link_if::sptr> _recv_links; diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index 929875dbd..f9f7c9e1b 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -74,9 +74,8 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, return false; }; - // No additional frames reserved specifically for this virtual interface _mgmt_recv_if = io_srv->make_recv_client( - recv_link, 0, mgmt_recv_cb, send_link_if::sptr(), 0, release_cb); + recv_link, 1, mgmt_recv_cb, send_link_if::sptr(), 0, release_cb); } /*! @@ -143,6 +142,13 @@ void chdr_ctrl_xport::release_recv_buff(frame_buff::uptr buff) _ctrl_recv_if->release_recv_buff(std::move(buff)); } +void chdr_ctrl_xport::release_mgmt_buff(frame_buff::uptr buff) +{ + // FIXME: Remove mutex when have threaded_io_service + std::lock_guard<std::mutex> lock(_mutex); + _mgmt_recv_if->release_recv_buff(std::move(buff)); +} + /*! * Get this xport's EPID */ diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp index bcd9f7ea9..cdcd70393 100644 --- a/host/lib/rfnoc/chdr_rx_data_xport.cpp +++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp @@ -143,10 +143,10 @@ chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sp // Create a temporary recv_io to receive the strc init auto recv_io = io_srv->make_recv_client(recv_link, - /* num_recv_frames*/ 1, + 1, // num_recv_frames recv_cb, send_link, - /* num_send_frames*/ 1, + 1, // num_send_frames fc_cb); // Create a control transport with the rx data links to send mgmt packets @@ -157,8 +157,8 @@ chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sp recv_link, pkt_factory, local_epid, - 0, // num_send_frames - 0); // num_recv_frames + 1, // num_send_frames + 1); // num_recv_frames // Setup a route to the EPID // Note that this may be gratuitous--The endpoint may already have been set up diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp index 0e0997a36..1c6e2c608 100644 --- a/host/lib/rfnoc/mgmt_portal.cpp +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -1050,15 +1050,15 @@ private: // Functions // Send the transaction over the wire _send_mgmt_transaction(xport, send); - auto recv_buff = xport.get_mgmt_buff(timeout * 1000); - if (not recv_buff) { + auto mgmt_buff = xport.get_mgmt_buff(timeout * 1000); + if (not mgmt_buff) { throw uhd::io_error("Timed out getting recv buff for management transaction"); } - _recv_pkt->refresh(recv_buff->data()); + _recv_pkt->refresh(mgmt_buff->data()); mgmt_payload recv; recv.set_header(my_epid, _protover, _chdr_w); _recv_pkt->fill_payload(recv); - xport.release_recv_buff(std::move(recv_buff)); + xport.release_mgmt_buff(std::move(mgmt_buff)); return recv; } diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 942c99d5f..9dd0814ca 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -1,3 +1,9 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + #include <uhd/config.hpp> #include <uhd/exception.hpp> #include <uhd/utils/log.hpp> @@ -159,38 +165,43 @@ public: : inline_recv_cb(recv_cb, fc_link.get()) , _io_srv(io_srv) , _data_link(data_link) - , _num_recv_frames(num_recv_frames) , _fc_link(fc_link) - , _num_send_frames(num_send_frames) , _fc_cb(fc_cb) { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; } ~inline_recv_io() { - _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames); + _io_srv->disconnect_receiver(_data_link.get(), this); if (_fc_link) { - _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames); + _io_srv->disconnect_sender(_fc_link.get()); } } frame_buff::uptr get_recv_buff(int32_t timeout_ms) { - return _io_srv->recv(this, _data_link.get(), timeout_ms); + auto buff = _io_srv->recv(this, _data_link.get(), timeout_ms); + if (buff) { + _num_frames_in_use++; + assert(_num_frames_in_use <= _num_recv_frames); + } + return buff; } void release_recv_buff(frame_buff::uptr buff) { _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get()); + _num_frames_in_use--; } private: inline_io_service::sptr _io_srv; recv_link_if::sptr _data_link; - size_t _num_recv_frames; send_link_if::sptr _fc_link; - size_t _num_send_frames; fc_callback_t _fc_cb; + size_t _num_frames_in_use = 0; }; class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb @@ -208,18 +219,18 @@ public: : inline_recv_cb(fc_cb, send_link.get()) , _io_srv(io_srv) , _send_link(send_link) - , _num_send_frames(num_send_frames) , _send_cb(send_cb) , _recv_link(recv_link) - , _num_recv_frames(num_recv_frames) { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; } ~inline_send_io() { - _io_srv->disconnect_sender(_send_link.get(), _num_send_frames); + _io_srv->disconnect_sender(_send_link.get()); if (_recv_link) { - _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames); + _io_srv->disconnect_receiver(_recv_link.get(), this); } } @@ -228,6 +239,8 @@ public: /* Check initial flow control result */ frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); if (buff) { + _num_frames_in_use++; + assert(_num_frames_in_use <= _num_send_frames); return frame_buff::uptr(std::move(buff)); } return frame_buff::uptr(); @@ -241,16 +254,16 @@ public: } _send_cb(buff, _send_link.get()); } + _num_frames_in_use--; } private: inline_io_service::sptr _io_srv; send_link_if::sptr _send_link; - size_t _num_send_frames; send_callback_t _send_cb; recv_link_if::sptr _recv_link; - size_t _num_recv_frames; recv_callback_t _recv_cb; + size_t _num_frames_in_use = 0; }; inline_io_service::~inline_io_service(){}; @@ -260,7 +273,7 @@ void inline_io_service::attach_recv_link(recv_link_if::sptr link) auto link_ptr = link.get(); UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); _recv_tbl[link_ptr] = - std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>(nullptr, nullptr, 0); + std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr); _recv_links.push_back(link); }; @@ -272,9 +285,11 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin recv_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(data_link); + UHD_ASSERT_THROW(num_recv_frames > 0); UHD_ASSERT_THROW(cb); if (fc_link) { UHD_ASSERT_THROW(fc_cb); + UHD_ASSERT_THROW(num_send_frames > 0); connect_sender(fc_link.get(), num_send_frames); } sptr io_srv = shared_from_this(); @@ -286,9 +301,7 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin void inline_io_service::attach_send_link(send_link_if::sptr link) { - auto link_ptr = link.get(); - UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0); - _send_tbl[link_ptr] = 0; + UHD_ASSERT_THROW(std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end()); _send_links.push_back(link); }; @@ -300,6 +313,7 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin recv_callback_t recv_cb) { UHD_ASSERT_THROW(send_link); + UHD_ASSERT_THROW(num_send_frames > 0); UHD_ASSERT_THROW(send_cb); connect_sender(send_link.get(), num_send_frames); sptr io_srv = shared_from_this(); @@ -307,37 +321,37 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); if (recv_link) { UHD_ASSERT_THROW(recv_cb); + UHD_ASSERT_THROW(num_recv_frames > 0); connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); } return send_io; } /* - * Senders are free to mux a send_link, but the total reserved send_frames - * must be less than or equal to the link's capacity + * This I/O service does not check frame reservations strictly since frames can + * be shared by multiple clients as long as they are not in use at the same + * time. */ void inline_io_service::connect_sender(send_link_if* link, size_t num_frames) { - size_t rsvd_frames = _send_tbl.at(link); size_t frame_capacity = link->get_num_send_frames(); - UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames); - _send_tbl[link] = rsvd_frames + num_frames; + UHD_ASSERT_THROW(frame_capacity >= num_frames); } -void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames) +void inline_io_service::disconnect_sender(send_link_if* /*link*/) { - size_t rsvd_frames = _send_tbl.at(link); - UHD_ASSERT_THROW(rsvd_frames >= num_frames); - _send_tbl[link] = rsvd_frames - num_frames; + // No-op } void inline_io_service::connect_receiver( recv_link_if* link, inline_recv_cb* cb, size_t num_frames) { + size_t capacity = link->get_num_recv_frames(); + UHD_ASSERT_THROW(num_frames <= capacity); + inline_recv_mux* mux; inline_recv_cb* rcvr; - size_t rsvd_frames; - std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + std::tie(mux, rcvr) = _recv_tbl.at(link); if (mux) { mux->connect(cb); } else if (rcvr) { @@ -348,19 +362,15 @@ void inline_io_service::connect_receiver( } else { rcvr = cb; } - size_t capacity = link->get_num_recv_frames(); - UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity); - _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames); + _recv_tbl[link] = std::make_tuple(mux, rcvr); } void inline_io_service::disconnect_receiver( - recv_link_if* link, inline_recv_cb* cb, size_t num_frames) + recv_link_if* link, inline_recv_cb* cb) { inline_recv_mux* mux; inline_recv_cb* rcvr; - size_t rsvd_frames; - std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); - UHD_ASSERT_THROW(rsvd_frames >= num_frames); + std::tie(mux, rcvr) = _recv_tbl.at(link); if (mux) { mux->disconnect(cb); if (mux->is_empty()) { @@ -370,7 +380,7 @@ void inline_io_service::disconnect_receiver( } else { rcvr = nullptr; } - _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames); + _recv_tbl[link] = std::make_tuple(mux, rcvr); } frame_buff::uptr inline_io_service::recv( @@ -378,8 +388,7 @@ frame_buff::uptr inline_io_service::recv( { inline_recv_mux* mux; inline_recv_cb* rcvr; - size_t num_frames; - std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link); + std::tie(mux, rcvr) = _recv_tbl.at(recv_link); if (mux) { /* Defer to mux's recv() if present */ |