aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/include/uhdlib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp49
-rw-r--r--host/lib/include/uhdlib/transport/inline_io_service.hpp6
-rw-r--r--host/lib/include/uhdlib/transport/io_service.hpp39
-rw-r--r--host/lib/include/uhdlib/transport/offload_io_service_client.hpp8
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp2
5 files changed, 77 insertions, 27 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 3226ba59b..47293f44a 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -188,7 +188,11 @@ public:
*/
buff_t::uptr get_send_buff(const int32_t timeout_ms)
{
- return _send_io->get_send_buff(timeout_ms);
+ if (_send_io->wait_for_dest_ready(_frame_size, timeout_ms)) {
+ return _send_io->get_send_buff(timeout_ms);
+ } else {
+ return nullptr;
+ }
}
/*!
@@ -318,27 +322,23 @@ private:
* \param buff the frame buffer to release
* \param send_link the send link for flow control messages
*/
- void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link)
+ void _send_callback(buff_t::uptr buff, transport::send_link_if* send_link)
{
// If the packet size is not a multiple of the word size, then we will
// still occupy an integer multiple of word size bytes in the FPGA, so
// we need to calculate appropriately.
const size_t packet_size_rounded = _round_pkt_size(buff->packet_size());
+ send_link->release_send_buff(std::move(buff));
- if (_fc_state.dest_has_space(packet_size_rounded)) {
- send_link->release_send_buff(std::move(buff));
- buff = nullptr;
-
- _fc_state.data_sent(packet_size_rounded);
+ _fc_state.data_sent(packet_size_rounded);
- if (_fc_state.get_fc_resync_req_pending()
- && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) {
- const auto& xfer_counts = _fc_state.get_xfer_counts();
- const size_t strc_size =
- _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts));
- _fc_state.clear_fc_resync_req_pending();
- _fc_state.data_sent(strc_size);
- }
+ if (_fc_state.get_fc_resync_req_pending()
+ && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) {
+ const auto& xfer_counts = _fc_state.get_xfer_counts();
+ const size_t strc_size =
+ _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts));
+ _fc_state.clear_fc_resync_req_pending();
+ _fc_state.data_sent(strc_size);
}
}
@@ -347,6 +347,22 @@ private:
return ((pkt_size_bytes + _chdr_w_bytes - 1) / _chdr_w_bytes) * _chdr_w_bytes;
}
+ /*!
+ * Flow control callback for I/O service
+ *
+ * The I/O service invokes this callback in the send_io::wait_for_dest_ready
+ * method.
+ *
+ * \param num_bytes The number of bytes in the packet to be sent
+ * \return Whether there are enough flow control credits for num_bytes
+ */
+ bool _fc_callback(const size_t num_bytes)
+ {
+ // No need to round num_bytes since the transport always checks for
+ // enough space for a full frame.
+ return _fc_state.dest_has_space(num_bytes);
+ }
+
// Interface to the I/O service
transport::send_io_if::sptr _send_io;
@@ -379,6 +395,9 @@ private:
//! The CHDR width in bytes.
size_t _chdr_w_bytes;
+
+ //! The size of the send frame
+ size_t _frame_size;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp
index d4a6dbbae..b0153a951 100644
--- a/host/lib/include/uhdlib/transport/inline_io_service.hpp
+++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp
@@ -52,7 +52,8 @@ public:
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb);
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb);
private:
friend class inline_recv_io;
@@ -113,8 +114,9 @@ private:
* \param recv_io_cb the callback+interface initiating the operation
* \param recv_link link to perform receive on
* \param timeout_ms timeout to wait for a buffer on the link
+ * \return Whether a flow control update was received
*/
- void recv_flow_ctrl(
+ bool recv_flow_ctrl(
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms);
/* Track whether link is muxed and the callback */
diff --git a/host/lib/include/uhdlib/transport/io_service.hpp b/host/lib/include/uhdlib/transport/io_service.hpp
index 399b693dc..87702ee2d 100644
--- a/host/lib/include/uhdlib/transport/io_service.hpp
+++ b/host/lib/include/uhdlib/transport/io_service.hpp
@@ -183,19 +183,25 @@ public:
using sptr = std::shared_ptr<send_io_if>;
/*!
- * Callback for sending the packet. Callback is responsible for calling
- * release_send_buff() if it wants to send the packet. This will require
- * moving the uptr's reference. If the packet will NOT be sent, the
- * callback must NOT release the uptr.
+ * Callback for sending the packet. Callback should call release_send_buff()
+ * and update any internal state needed. For example, flow control state
+ * could be updated here, and the header could be filled out as well, like
+ * the packet's sequence number and/or addresses.
*
- * Function should update any internal state needed. For example, flow
- * control state could be updated here, and the header could be filled out
- * as well, like the packet's sequence number and/or addresses.
+ * Callbacks execute on the I/O thread! Be careful about what state is
+ * touched. In addition, this callback should NOT sleep.
+ */
+ using send_callback_t = std::function<void(frame_buff::uptr, send_link_if*)>;
+
+ /*!
+ * Callback to check whether a packet can be sent. For flow controlled
+ * links, the callback should return whether the requested number of bytes
+ * can be received by the destination.
*
* Callbacks execute on the I/O thread! Be careful about what state is
* touched. In addition, this callback should NOT sleep.
*/
- using send_callback_t = std::function<void(frame_buff::uptr&, send_link_if*)>;
+ using fc_callback_t = std::function<bool(const size_t)>;
/* Transport client methods */
/*!
@@ -209,6 +215,19 @@ public:
virtual frame_buff::uptr get_send_buff(int32_t timeout_ms) = 0;
/*!
+ * Wait until the destination is ready for a packet. For flow controlled
+ * transports, this method must be called prior to release_send_buff. If
+ * the transport is not flow controlled, you do not need to call this
+ * method.
+ *
+ * \param num_bytes the number of bytes to be sent in release_send_buff
+ * \param timeout_ms timeout in milliseconds to wait for destination to be
+ * ready
+ * \return whether the destination is ready for the requested bytes
+ */
+ virtual bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms) = 0;
+
+ /*!
* Release the send buffer to the send queue.
* If the frame_buff's packet_size is zero, the link will free the buffer
* without sending it.
@@ -307,6 +326,7 @@ public:
* \param recv_link the link used to observe flow control (can be empty)
* \param num_recv_frames Number of buffers to reserve in recv_link
* \param recv_cb callback function for receiving packets from recv_link
+ * \param fc_cb callback function to check if destination is ready for data
* \return a send_io_if for interfacing with the link
*/
virtual send_io_if::sptr make_send_client(send_link_if::sptr send_link,
@@ -314,7 +334,8 @@ public:
send_io_if::send_callback_t cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb) = 0;
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb) = 0;
/*!
* Create a recv_io_if and registers the transport's callbacks.
diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
index 2f606878c..d0e6bb4bd 100644
--- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
+++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
@@ -132,6 +132,14 @@ public:
}
}
+ bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/)
+ {
+ // For offload_io_service, the destination is the queue to the offload
+ // thread. The queue is always able to accomodate new packets since it
+ // is sized to fit all the frames reserved from the link.
+ return true;
+ }
+
frame_buff::uptr get_send_buff(int32_t timeout_ms)
{
if (polling) {
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index c594dd530..42250c4b1 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -284,7 +284,7 @@ public:
(eob_on_last_packet and final_length == nsamps_to_send_remaining);
num_samps_sent = _send_one_packet(
- buffs, total_nsamps_sent, final_length, metadata, eov, timeout);
+ buffs, total_nsamps_sent, final_length, metadata, eov, timeout_ms);
}
// Advance sample accumulator and decrement remaining samples