diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-10-28 14:28:33 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:33 -0800 |
commit | a8e286b106f19c37d6cf20de886c65f3c04da162 (patch) | |
tree | 5c1baf4310abc0e4e082c960dffabd18a9fb6a3e /host/lib | |
parent | 10b9d2688b5bcb150eec786a9ef7473f1c1c28ac (diff) | |
download | uhd-a8e286b106f19c37d6cf20de886c65f3c04da162.tar.gz uhd-a8e286b106f19c37d6cf20de886c65f3c04da162.tar.bz2 uhd-a8e286b106f19c37d6cf20de886c65f3c04da162.zip |
rfnoc: Make polling I/O service not block on flow control
Add a new method to io_service::send_io to check whether the destination
is ready for data, to make it possible to poll send_io rather than block
waiting for flow control credits.
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 49 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/inline_io_service.hpp | 6 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/io_service.hpp | 39 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service_client.hpp | 8 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | 2 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_ctrl_xport.cpp | 4 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_tx_data_xport.cpp | 16 | ||||
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 63 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 43 |
9 files changed, 162 insertions, 68 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 3226ba59b..47293f44a 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -188,7 +188,11 @@ public: */ buff_t::uptr get_send_buff(const int32_t timeout_ms) { - return _send_io->get_send_buff(timeout_ms); + if (_send_io->wait_for_dest_ready(_frame_size, timeout_ms)) { + return _send_io->get_send_buff(timeout_ms); + } else { + return nullptr; + } } /*! @@ -318,27 +322,23 @@ private: * \param buff the frame buffer to release * \param send_link the send link for flow control messages */ - void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link) + void _send_callback(buff_t::uptr buff, transport::send_link_if* send_link) { // If the packet size is not a multiple of the word size, then we will // still occupy an integer multiple of word size bytes in the FPGA, so // we need to calculate appropriately. const size_t packet_size_rounded = _round_pkt_size(buff->packet_size()); + send_link->release_send_buff(std::move(buff)); - if (_fc_state.dest_has_space(packet_size_rounded)) { - send_link->release_send_buff(std::move(buff)); - buff = nullptr; - - _fc_state.data_sent(packet_size_rounded); + _fc_state.data_sent(packet_size_rounded); - if (_fc_state.get_fc_resync_req_pending() - && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) { - const auto& xfer_counts = _fc_state.get_xfer_counts(); - const size_t strc_size = - _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts)); - _fc_state.clear_fc_resync_req_pending(); - _fc_state.data_sent(strc_size); - } + if (_fc_state.get_fc_resync_req_pending() + && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) { + const auto& xfer_counts = _fc_state.get_xfer_counts(); + const size_t strc_size = + _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts)); + _fc_state.clear_fc_resync_req_pending(); + _fc_state.data_sent(strc_size); } } @@ -347,6 +347,22 @@ private: return ((pkt_size_bytes + _chdr_w_bytes - 1) / _chdr_w_bytes) * _chdr_w_bytes; } + /*! + * Flow control callback for I/O service + * + * The I/O service invokes this callback in the send_io::wait_for_dest_ready + * method. + * + * \param num_bytes The number of bytes in the packet to be sent + * \return Whether there are enough flow control credits for num_bytes + */ + bool _fc_callback(const size_t num_bytes) + { + // No need to round num_bytes since the transport always checks for + // enough space for a full frame. + return _fc_state.dest_has_space(num_bytes); + } + // Interface to the I/O service transport::send_io_if::sptr _send_io; @@ -379,6 +395,9 @@ private: //! The CHDR width in bytes. size_t _chdr_w_bytes; + + //! The size of the send frame + size_t _frame_size; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp index d4a6dbbae..b0153a951 100644 --- a/host/lib/include/uhdlib/transport/inline_io_service.hpp +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -52,7 +52,8 @@ public: send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb); + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); private: friend class inline_recv_io; @@ -113,8 +114,9 @@ private: * \param recv_io_cb the callback+interface initiating the operation * \param recv_link link to perform receive on * \param timeout_ms timeout to wait for a buffer on the link + * \return Whether a flow control update was received */ - void recv_flow_ctrl( + bool recv_flow_ctrl( inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); /* Track whether link is muxed and the callback */ diff --git a/host/lib/include/uhdlib/transport/io_service.hpp b/host/lib/include/uhdlib/transport/io_service.hpp index 399b693dc..87702ee2d 100644 --- a/host/lib/include/uhdlib/transport/io_service.hpp +++ b/host/lib/include/uhdlib/transport/io_service.hpp @@ -183,19 +183,25 @@ public: using sptr = std::shared_ptr<send_io_if>; /*! - * Callback for sending the packet. Callback is responsible for calling - * release_send_buff() if it wants to send the packet. This will require - * moving the uptr's reference. If the packet will NOT be sent, the - * callback must NOT release the uptr. + * Callback for sending the packet. Callback should call release_send_buff() + * and update any internal state needed. For example, flow control state + * could be updated here, and the header could be filled out as well, like + * the packet's sequence number and/or addresses. * - * Function should update any internal state needed. For example, flow - * control state could be updated here, and the header could be filled out - * as well, like the packet's sequence number and/or addresses. + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + */ + using send_callback_t = std::function<void(frame_buff::uptr, send_link_if*)>; + + /*! + * Callback to check whether a packet can be sent. For flow controlled + * links, the callback should return whether the requested number of bytes + * can be received by the destination. * * Callbacks execute on the I/O thread! Be careful about what state is * touched. In addition, this callback should NOT sleep. */ - using send_callback_t = std::function<void(frame_buff::uptr&, send_link_if*)>; + using fc_callback_t = std::function<bool(const size_t)>; /* Transport client methods */ /*! @@ -209,6 +215,19 @@ public: virtual frame_buff::uptr get_send_buff(int32_t timeout_ms) = 0; /*! + * Wait until the destination is ready for a packet. For flow controlled + * transports, this method must be called prior to release_send_buff. If + * the transport is not flow controlled, you do not need to call this + * method. + * + * \param num_bytes the number of bytes to be sent in release_send_buff + * \param timeout_ms timeout in milliseconds to wait for destination to be + * ready + * \return whether the destination is ready for the requested bytes + */ + virtual bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms) = 0; + + /*! * Release the send buffer to the send queue. * If the frame_buff's packet_size is zero, the link will free the buffer * without sending it. @@ -307,6 +326,7 @@ public: * \param recv_link the link used to observe flow control (can be empty) * \param num_recv_frames Number of buffers to reserve in recv_link * \param recv_cb callback function for receiving packets from recv_link + * \param fc_cb callback function to check if destination is ready for data * \return a send_io_if for interfacing with the link */ virtual send_io_if::sptr make_send_client(send_link_if::sptr send_link, @@ -314,7 +334,8 @@ public: send_io_if::send_callback_t cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) = 0; + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) = 0; /*! * Create a recv_io_if and registers the transport's callbacks. diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp index 2f606878c..d0e6bb4bd 100644 --- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -132,6 +132,14 @@ public: } } + bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/) + { + // For offload_io_service, the destination is the queue to the offload + // thread. The queue is always able to accomodate new packets since it + // is sized to fit all the frames reserved from the link. + return true; + } + frame_buff::uptr get_send_buff(int32_t timeout_ms) { if (polling) { diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index c594dd530..42250c4b1 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -284,7 +284,7 @@ public: (eob_on_last_packet and final_length == nsamps_to_send_remaining); num_samps_sent = _send_one_packet( - buffs, total_nsamps_sent, final_length, metadata, eov, timeout); + buffs, total_nsamps_sent, final_length, metadata, eov, timeout_ms); } // Advance sample accumulator and decrement remaining samples diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index 6b185efab..637d2c302 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -23,12 +23,12 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()) { /* Make dumb send pipe */ - send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, + send_io_if::send_callback_t send_cb = [this](frame_buff::uptr buff, send_link_if* link) { link->release_send_buff(std::move(buff)); }; _send_if = io_srv->make_send_client( - send_link, num_send_frames, send_cb, recv_link_if::sptr(), 0, nullptr); + send_link, num_send_frames, send_cb, recv_link_if::sptr(), 0, nullptr, nullptr); /* Make dumb recv pipe that matches management and control packets */ uhd::transport::recv_callback_t ctrl_recv_cb = [this](frame_buff::uptr& buff, diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp index bb9d1b63e..8837e2dbe 100644 --- a/host/lib/rfnoc/chdr_tx_data_xport.cpp +++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp @@ -36,6 +36,7 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, , _fc_sender(pkt_factory, epids) , _epid(epids.first) , _chdr_w_bytes(chdr_w_to_bits(pkt_factory.get_chdr_w()) / 8) + , _frame_size(send_link->get_send_frame_size()) { UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", "Creating tx xport with local epid=" << epids.first @@ -51,8 +52,8 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, _max_payload_size = send_link->get_send_frame_size() - pyld_offset; // Now create the send I/O we will use for data - auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { - this->_send_callback(buff, send_link); + auto send_cb = [this](buff_t::uptr buff, transport::send_link_if* send_link) { + this->_send_callback(std::move(buff), send_link); }; auto recv_cb = [this](buff_t::uptr& buff, @@ -61,13 +62,18 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, return this->_recv_callback(buff, recv_link, send_link); }; + auto fc_cb = [this](size_t num_bytes) { + return this->_fc_callback(num_bytes); + }; + // Needs just a single recv frame for strs packets _send_io = io_srv->make_send_client(send_link, num_send_frames, send_cb, recv_link, /* num_recv_frames */ 1, - recv_cb); + recv_cb, + fc_cb); } chdr_tx_data_xport::~chdr_tx_data_xport() @@ -96,9 +102,8 @@ static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_s chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic(); // No flow control at initialization, just release all send buffs - auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) { + auto send_cb = [](frame_buff::uptr buff, send_link_if* send_link) { send_link->release_send_buff(std::move(buff)); - buff = nullptr; }; // For recv, just queue strs packets for recv_io to read @@ -124,6 +129,7 @@ static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_s send_cb, nullptr, 0, // num_recv_frames + nullptr, nullptr); auto recv_io = io_srv->make_recv_client(recv_link, diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 6449bbda8..1438699b7 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -144,7 +144,7 @@ public: } } - void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + bool recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) { while (true) { frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); @@ -156,7 +156,7 @@ public: rcvr_found = true; if (rcvr == cb) { assert(!buff); - return; + return true; } else if (buff) { /* NOTE: Should not overflow, by construction * Every queue can hold link->get_num_recv_frames() @@ -174,7 +174,7 @@ public: recv_link->release_recv_buff(std::move(buff)); } } else { /* Timeout */ - return; + return false; } } } @@ -250,12 +250,14 @@ public: send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t fc_cb) - : inline_recv_cb(fc_cb, send_link.get()) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) + : inline_recv_cb(recv_cb, send_link.get()) , _io_srv(io_srv) , _send_link(send_link) , _send_cb(send_cb) , _recv_link(recv_link) + , _fc_cb(fc_cb) { _num_recv_frames = num_recv_frames; _num_send_frames = num_send_frames; @@ -269,9 +271,27 @@ public: } } + bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms) + { + if (!_recv_link) { + // If there is no flow control link, then the destination must + // always be ready for more data. + return true; + } + + while (!_fc_cb(num_bytes)) { + const bool updated = + _io_srv->recv_flow_ctrl(this, _recv_link.get(), timeout_ms); + + if (!updated) { + return false; + } + } + return true; + } + frame_buff::uptr get_send_buff(int32_t timeout_ms) { - /* Check initial flow control result */ frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); if (buff) { _num_frames_in_use++; @@ -283,19 +303,8 @@ public: void release_send_buff(frame_buff::uptr buff) { - while (buff) { - // Try to send a packet - _send_cb(buff, _send_link.get()); - if (_recv_link) { - // If the buffer was not released, use a timeout to receive - // the flow control packet, to avoid wasting CPU. - if (!buff) { - _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0); - } else { - _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100); - } - } - } + // Send the packet using callback + _send_cb(std::move(buff), _send_link.get()); _num_frames_in_use--; } @@ -305,6 +314,7 @@ private: send_callback_t _send_cb; recv_link_if::sptr _recv_link; recv_callback_t _recv_cb; + fc_callback_t _fc_cb; size_t _num_frames_in_use = 0; }; @@ -369,7 +379,8 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(send_link); UHD_ASSERT_THROW(num_send_frames > 0); @@ -377,9 +388,10 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin connect_sender(send_link.get(), num_send_frames); sptr io_srv = shared_from_this(); auto send_io = std::make_shared<inline_send_io>( - io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb); if (recv_link) { UHD_ASSERT_THROW(recv_cb); + UHD_ASSERT_THROW(fc_cb); UHD_ASSERT_THROW(num_recv_frames > 0); connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); } @@ -474,7 +486,7 @@ frame_buff::uptr inline_io_service::recv( } } -void inline_io_service::recv_flow_ctrl( +bool inline_io_service::recv_flow_ctrl( inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) { inline_recv_mux* mux; @@ -483,8 +495,7 @@ void inline_io_service::recv_flow_ctrl( if (mux) { /* Defer to mux's recv_flow_ctrl() if present */ - mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); - return; + return mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); } else { assert(recv_io_cb == rcvr); } @@ -495,13 +506,13 @@ void inline_io_service::recv_flow_ctrl( if (buff) { if (rcvr->callback(buff, recv_link)) { assert(!buff); - return; + return true; } else { UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); recv_link->release_recv_buff(std::move(buff)); } } else { /* Timeout */ - return; + return false; } } } diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index 53c8f017d..8a7895173 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -48,6 +48,16 @@ public: _item_sem.notify(); } + bool peek(queue_item_t& item) + { + if (_item_sem.count()) { + item = _buffer[_read_index]; + return true; + } else { + return false; + } + } + bool pop(queue_item_t& item) { if (_item_sem.try_wait()) { @@ -153,6 +163,13 @@ public: _from_offload_thread.push(queue_element); } + std::tuple<frame_buff*, bool> offload_thread_peek() + { + to_offload_thread_t queue_element; + _to_offload_thread.peek(queue_element); + return std::make_tuple(queue_element.buff, queue_element.disconnect); + } + std::tuple<frame_buff*, bool> offload_thread_pop() { to_offload_thread_t queue_element; @@ -261,7 +278,8 @@ public: send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb); + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); private: offload_io_service_impl(const offload_io_service_impl&) = delete; @@ -495,7 +513,8 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(_offload_thread); @@ -513,12 +532,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se recv_link, num_recv_frames, recv_cb, + fc_cb, port]() { frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames}; _reservation_mgr.reserve_frames(frames); auto inline_send_io = _io_srv->make_send_client( - send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb); send_client_info_t client_info; client_info.inline_io = inline_send_io; @@ -666,10 +686,14 @@ void offload_io_service_impl::_do_work_polling() for (auto it = _send_clients.begin(); it != _send_clients.end();) { frame_buff* buff; bool disconnect; - std::tie(buff, disconnect) = it->port->offload_thread_pop(); + std::tie(buff, disconnect) = it->port->offload_thread_peek(); if (buff) { - _release_send_buff(*it, buff); + if (it->inline_io->wait_for_dest_ready(buff->packet_size(), 0)) { + _release_send_buff(*it, buff); + it->port->offload_thread_pop(); + } } else if (disconnect) { + it->port->offload_thread_pop(); _disconnect_send_client(*it); it = _send_clients.erase(it); // increments it continue; @@ -735,11 +759,14 @@ void offload_io_service_impl::_do_work_blocking() if (it->num_frames_in_use > 0) { frame_buff* buff; bool disconnect; - std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); - + std::tie(buff, disconnect) = it->port->offload_thread_peek(); if (buff) { - _release_send_buff(*it, buff); + if (it->inline_io->wait_for_dest_ready(buff->packet_size(), blocking_timeout_ms)) { + _release_send_buff(*it, buff); + it->port->offload_thread_pop(); + } } else if (disconnect) { + it->port->offload_thread_pop(); _disconnect_send_client(*it); it = _send_clients.erase(it); // increments it continue; |