diff options
Diffstat (limited to 'host/lib/include/uhdlib')
5 files changed, 77 insertions, 27 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 3226ba59b..47293f44a 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -188,7 +188,11 @@ public: */ buff_t::uptr get_send_buff(const int32_t timeout_ms) { - return _send_io->get_send_buff(timeout_ms); + if (_send_io->wait_for_dest_ready(_frame_size, timeout_ms)) { + return _send_io->get_send_buff(timeout_ms); + } else { + return nullptr; + } } /*! @@ -318,27 +322,23 @@ private: * \param buff the frame buffer to release * \param send_link the send link for flow control messages */ - void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link) + void _send_callback(buff_t::uptr buff, transport::send_link_if* send_link) { // If the packet size is not a multiple of the word size, then we will // still occupy an integer multiple of word size bytes in the FPGA, so // we need to calculate appropriately. const size_t packet_size_rounded = _round_pkt_size(buff->packet_size()); + send_link->release_send_buff(std::move(buff)); - if (_fc_state.dest_has_space(packet_size_rounded)) { - send_link->release_send_buff(std::move(buff)); - buff = nullptr; - - _fc_state.data_sent(packet_size_rounded); + _fc_state.data_sent(packet_size_rounded); - if (_fc_state.get_fc_resync_req_pending() - && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) { - const auto& xfer_counts = _fc_state.get_xfer_counts(); - const size_t strc_size = - _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts)); - _fc_state.clear_fc_resync_req_pending(); - _fc_state.data_sent(strc_size); - } + if (_fc_state.get_fc_resync_req_pending() + && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) { + const auto& xfer_counts = _fc_state.get_xfer_counts(); + const size_t strc_size = + _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts)); + _fc_state.clear_fc_resync_req_pending(); + _fc_state.data_sent(strc_size); } } @@ -347,6 +347,22 @@ private: return ((pkt_size_bytes + _chdr_w_bytes - 1) / _chdr_w_bytes) * _chdr_w_bytes; } + /*! + * Flow control callback for I/O service + * + * The I/O service invokes this callback in the send_io::wait_for_dest_ready + * method. + * + * \param num_bytes The number of bytes in the packet to be sent + * \return Whether there are enough flow control credits for num_bytes + */ + bool _fc_callback(const size_t num_bytes) + { + // No need to round num_bytes since the transport always checks for + // enough space for a full frame. + return _fc_state.dest_has_space(num_bytes); + } + // Interface to the I/O service transport::send_io_if::sptr _send_io; @@ -379,6 +395,9 @@ private: //! The CHDR width in bytes. size_t _chdr_w_bytes; + + //! The size of the send frame + size_t _frame_size; }; }} // namespace uhd::rfnoc diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp index d4a6dbbae..b0153a951 100644 --- a/host/lib/include/uhdlib/transport/inline_io_service.hpp +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -52,7 +52,8 @@ public: send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb); + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); private: friend class inline_recv_io; @@ -113,8 +114,9 @@ private: * \param recv_io_cb the callback+interface initiating the operation * \param recv_link link to perform receive on * \param timeout_ms timeout to wait for a buffer on the link + * \return Whether a flow control update was received */ - void recv_flow_ctrl( + bool recv_flow_ctrl( inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); /* Track whether link is muxed and the callback */ diff --git a/host/lib/include/uhdlib/transport/io_service.hpp b/host/lib/include/uhdlib/transport/io_service.hpp index 399b693dc..87702ee2d 100644 --- a/host/lib/include/uhdlib/transport/io_service.hpp +++ b/host/lib/include/uhdlib/transport/io_service.hpp @@ -183,19 +183,25 @@ public: using sptr = std::shared_ptr<send_io_if>; /*! - * Callback for sending the packet. Callback is responsible for calling - * release_send_buff() if it wants to send the packet. This will require - * moving the uptr's reference. If the packet will NOT be sent, the - * callback must NOT release the uptr. + * Callback for sending the packet. Callback should call release_send_buff() + * and update any internal state needed. For example, flow control state + * could be updated here, and the header could be filled out as well, like + * the packet's sequence number and/or addresses. * - * Function should update any internal state needed. For example, flow - * control state could be updated here, and the header could be filled out - * as well, like the packet's sequence number and/or addresses. + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + */ + using send_callback_t = std::function<void(frame_buff::uptr, send_link_if*)>; + + /*! + * Callback to check whether a packet can be sent. For flow controlled + * links, the callback should return whether the requested number of bytes + * can be received by the destination. * * Callbacks execute on the I/O thread! Be careful about what state is * touched. In addition, this callback should NOT sleep. */ - using send_callback_t = std::function<void(frame_buff::uptr&, send_link_if*)>; + using fc_callback_t = std::function<bool(const size_t)>; /* Transport client methods */ /*! @@ -209,6 +215,19 @@ public: virtual frame_buff::uptr get_send_buff(int32_t timeout_ms) = 0; /*! + * Wait until the destination is ready for a packet. For flow controlled + * transports, this method must be called prior to release_send_buff. If + * the transport is not flow controlled, you do not need to call this + * method. + * + * \param num_bytes the number of bytes to be sent in release_send_buff + * \param timeout_ms timeout in milliseconds to wait for destination to be + * ready + * \return whether the destination is ready for the requested bytes + */ + virtual bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms) = 0; + + /*! * Release the send buffer to the send queue. * If the frame_buff's packet_size is zero, the link will free the buffer * without sending it. @@ -307,6 +326,7 @@ public: * \param recv_link the link used to observe flow control (can be empty) * \param num_recv_frames Number of buffers to reserve in recv_link * \param recv_cb callback function for receiving packets from recv_link + * \param fc_cb callback function to check if destination is ready for data * \return a send_io_if for interfacing with the link */ virtual send_io_if::sptr make_send_client(send_link_if::sptr send_link, @@ -314,7 +334,8 @@ public: send_io_if::send_callback_t cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) = 0; + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) = 0; /*! * Create a recv_io_if and registers the transport's callbacks. diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp index 2f606878c..d0e6bb4bd 100644 --- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -132,6 +132,14 @@ public: } } + bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/) + { + // For offload_io_service, the destination is the queue to the offload + // thread. The queue is always able to accomodate new packets since it + // is sized to fit all the frames reserved from the link. + return true; + } + frame_buff::uptr get_send_buff(int32_t timeout_ms) { if (polling) { diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index c594dd530..42250c4b1 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -284,7 +284,7 @@ public: (eob_on_last_packet and final_length == nsamps_to_send_remaining); num_samps_sent = _send_one_packet( - buffs, total_nsamps_sent, final_length, metadata, eov, timeout); + buffs, total_nsamps_sent, final_length, metadata, eov, timeout_ms); } // Advance sample accumulator and decrement remaining samples |