aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp49
-rw-r--r--host/lib/include/uhdlib/transport/inline_io_service.hpp6
-rw-r--r--host/lib/include/uhdlib/transport/io_service.hpp39
-rw-r--r--host/lib/include/uhdlib/transport/offload_io_service_client.hpp8
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp2
-rw-r--r--host/lib/rfnoc/chdr_ctrl_xport.cpp4
-rw-r--r--host/lib/rfnoc/chdr_tx_data_xport.cpp16
-rw-r--r--host/lib/transport/inline_io_service.cpp63
-rw-r--r--host/lib/transport/offload_io_service.cpp43
-rw-r--r--host/tests/common/mock_transport.hpp25
-rw-r--r--host/tests/offload_io_srv_test.cpp24
11 files changed, 193 insertions, 86 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 3226ba59b..47293f44a 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -188,7 +188,11 @@ public:
*/
buff_t::uptr get_send_buff(const int32_t timeout_ms)
{
- return _send_io->get_send_buff(timeout_ms);
+ if (_send_io->wait_for_dest_ready(_frame_size, timeout_ms)) {
+ return _send_io->get_send_buff(timeout_ms);
+ } else {
+ return nullptr;
+ }
}
/*!
@@ -318,27 +322,23 @@ private:
* \param buff the frame buffer to release
* \param send_link the send link for flow control messages
*/
- void _send_callback(buff_t::uptr& buff, transport::send_link_if* send_link)
+ void _send_callback(buff_t::uptr buff, transport::send_link_if* send_link)
{
// If the packet size is not a multiple of the word size, then we will
// still occupy an integer multiple of word size bytes in the FPGA, so
// we need to calculate appropriately.
const size_t packet_size_rounded = _round_pkt_size(buff->packet_size());
+ send_link->release_send_buff(std::move(buff));
- if (_fc_state.dest_has_space(packet_size_rounded)) {
- send_link->release_send_buff(std::move(buff));
- buff = nullptr;
-
- _fc_state.data_sent(packet_size_rounded);
+ _fc_state.data_sent(packet_size_rounded);
- if (_fc_state.get_fc_resync_req_pending()
- && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) {
- const auto& xfer_counts = _fc_state.get_xfer_counts();
- const size_t strc_size =
- _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts));
- _fc_state.clear_fc_resync_req_pending();
- _fc_state.data_sent(strc_size);
- }
+ if (_fc_state.get_fc_resync_req_pending()
+ && _fc_state.dest_has_space(chdr::strc_payload::MAX_PACKET_SIZE)) {
+ const auto& xfer_counts = _fc_state.get_xfer_counts();
+ const size_t strc_size =
+ _round_pkt_size(_fc_sender.send_strc_resync(send_link, xfer_counts));
+ _fc_state.clear_fc_resync_req_pending();
+ _fc_state.data_sent(strc_size);
}
}
@@ -347,6 +347,22 @@ private:
return ((pkt_size_bytes + _chdr_w_bytes - 1) / _chdr_w_bytes) * _chdr_w_bytes;
}
+ /*!
+ * Flow control callback for I/O service
+ *
+ * The I/O service invokes this callback in the send_io::wait_for_dest_ready
+ * method.
+ *
+ * \param num_bytes The number of bytes in the packet to be sent
+ * \return Whether there are enough flow control credits for num_bytes
+ */
+ bool _fc_callback(const size_t num_bytes)
+ {
+ // No need to round num_bytes since the transport always checks for
+ // enough space for a full frame.
+ return _fc_state.dest_has_space(num_bytes);
+ }
+
// Interface to the I/O service
transport::send_io_if::sptr _send_io;
@@ -379,6 +395,9 @@ private:
//! The CHDR width in bytes.
size_t _chdr_w_bytes;
+
+ //! The size of the send frame
+ size_t _frame_size;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp
index d4a6dbbae..b0153a951 100644
--- a/host/lib/include/uhdlib/transport/inline_io_service.hpp
+++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp
@@ -52,7 +52,8 @@ public:
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb);
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb);
private:
friend class inline_recv_io;
@@ -113,8 +114,9 @@ private:
* \param recv_io_cb the callback+interface initiating the operation
* \param recv_link link to perform receive on
* \param timeout_ms timeout to wait for a buffer on the link
+ * \return Whether a flow control update was received
*/
- void recv_flow_ctrl(
+ bool recv_flow_ctrl(
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms);
/* Track whether link is muxed and the callback */
diff --git a/host/lib/include/uhdlib/transport/io_service.hpp b/host/lib/include/uhdlib/transport/io_service.hpp
index 399b693dc..87702ee2d 100644
--- a/host/lib/include/uhdlib/transport/io_service.hpp
+++ b/host/lib/include/uhdlib/transport/io_service.hpp
@@ -183,19 +183,25 @@ public:
using sptr = std::shared_ptr<send_io_if>;
/*!
- * Callback for sending the packet. Callback is responsible for calling
- * release_send_buff() if it wants to send the packet. This will require
- * moving the uptr's reference. If the packet will NOT be sent, the
- * callback must NOT release the uptr.
+ * Callback for sending the packet. Callback should call release_send_buff()
+ * and update any internal state needed. For example, flow control state
+ * could be updated here, and the header could be filled out as well, like
+ * the packet's sequence number and/or addresses.
*
- * Function should update any internal state needed. For example, flow
- * control state could be updated here, and the header could be filled out
- * as well, like the packet's sequence number and/or addresses.
+ * Callbacks execute on the I/O thread! Be careful about what state is
+ * touched. In addition, this callback should NOT sleep.
+ */
+ using send_callback_t = std::function<void(frame_buff::uptr, send_link_if*)>;
+
+ /*!
+ * Callback to check whether a packet can be sent. For flow controlled
+ * links, the callback should return whether the requested number of bytes
+ * can be received by the destination.
*
* Callbacks execute on the I/O thread! Be careful about what state is
* touched. In addition, this callback should NOT sleep.
*/
- using send_callback_t = std::function<void(frame_buff::uptr&, send_link_if*)>;
+ using fc_callback_t = std::function<bool(const size_t)>;
/* Transport client methods */
/*!
@@ -209,6 +215,19 @@ public:
virtual frame_buff::uptr get_send_buff(int32_t timeout_ms) = 0;
/*!
+ * Wait until the destination is ready for a packet. For flow controlled
+ * transports, this method must be called prior to release_send_buff. If
+ * the transport is not flow controlled, you do not need to call this
+ * method.
+ *
+ * \param num_bytes the number of bytes to be sent in release_send_buff
+ * \param timeout_ms timeout in milliseconds to wait for destination to be
+ * ready
+ * \return whether the destination is ready for the requested bytes
+ */
+ virtual bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms) = 0;
+
+ /*!
* Release the send buffer to the send queue.
* If the frame_buff's packet_size is zero, the link will free the buffer
* without sending it.
@@ -307,6 +326,7 @@ public:
* \param recv_link the link used to observe flow control (can be empty)
* \param num_recv_frames Number of buffers to reserve in recv_link
* \param recv_cb callback function for receiving packets from recv_link
+ * \param fc_cb callback function to check if destination is ready for data
* \return a send_io_if for interfacing with the link
*/
virtual send_io_if::sptr make_send_client(send_link_if::sptr send_link,
@@ -314,7 +334,8 @@ public:
send_io_if::send_callback_t cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb) = 0;
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb) = 0;
/*!
* Create a recv_io_if and registers the transport's callbacks.
diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
index 2f606878c..d0e6bb4bd 100644
--- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
+++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
@@ -132,6 +132,14 @@ public:
}
}
+ bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/)
+ {
+ // For offload_io_service, the destination is the queue to the offload
+ // thread. The queue is always able to accomodate new packets since it
+ // is sized to fit all the frames reserved from the link.
+ return true;
+ }
+
frame_buff::uptr get_send_buff(int32_t timeout_ms)
{
if (polling) {
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index c594dd530..42250c4b1 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -284,7 +284,7 @@ public:
(eob_on_last_packet and final_length == nsamps_to_send_remaining);
num_samps_sent = _send_one_packet(
- buffs, total_nsamps_sent, final_length, metadata, eov, timeout);
+ buffs, total_nsamps_sent, final_length, metadata, eov, timeout_ms);
}
// Advance sample accumulator and decrement remaining samples
diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp
index 6b185efab..637d2c302 100644
--- a/host/lib/rfnoc/chdr_ctrl_xport.cpp
+++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp
@@ -23,12 +23,12 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv,
: _my_epid(my_epid), _recv_packet(pkt_factory.make_generic())
{
/* Make dumb send pipe */
- send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff,
+ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr buff,
send_link_if* link) {
link->release_send_buff(std::move(buff));
};
_send_if = io_srv->make_send_client(
- send_link, num_send_frames, send_cb, recv_link_if::sptr(), 0, nullptr);
+ send_link, num_send_frames, send_cb, recv_link_if::sptr(), 0, nullptr, nullptr);
/* Make dumb recv pipe that matches management and control packets */
uhd::transport::recv_callback_t ctrl_recv_cb = [this](frame_buff::uptr& buff,
diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp
index bb9d1b63e..8837e2dbe 100644
--- a/host/lib/rfnoc/chdr_tx_data_xport.cpp
+++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp
@@ -36,6 +36,7 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
, _fc_sender(pkt_factory, epids)
, _epid(epids.first)
, _chdr_w_bytes(chdr_w_to_bits(pkt_factory.get_chdr_w()) / 8)
+ , _frame_size(send_link->get_send_frame_size())
{
UHD_LOG_TRACE("XPORT::TX_DATA_XPORT",
"Creating tx xport with local epid=" << epids.first
@@ -51,8 +52,8 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
_max_payload_size = send_link->get_send_frame_size() - pyld_offset;
// Now create the send I/O we will use for data
- auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) {
- this->_send_callback(buff, send_link);
+ auto send_cb = [this](buff_t::uptr buff, transport::send_link_if* send_link) {
+ this->_send_callback(std::move(buff), send_link);
};
auto recv_cb = [this](buff_t::uptr& buff,
@@ -61,13 +62,18 @@ chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv,
return this->_recv_callback(buff, recv_link, send_link);
};
+ auto fc_cb = [this](size_t num_bytes) {
+ return this->_fc_callback(num_bytes);
+ };
+
// Needs just a single recv frame for strs packets
_send_io = io_srv->make_send_client(send_link,
num_send_frames,
send_cb,
recv_link,
/* num_recv_frames */ 1,
- recv_cb);
+ recv_cb,
+ fc_cb);
}
chdr_tx_data_xport::~chdr_tx_data_xport()
@@ -96,9 +102,8 @@ static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_s
chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic();
// No flow control at initialization, just release all send buffs
- auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) {
+ auto send_cb = [](frame_buff::uptr buff, send_link_if* send_link) {
send_link->release_send_buff(std::move(buff));
- buff = nullptr;
};
// For recv, just queue strs packets for recv_io to read
@@ -124,6 +129,7 @@ static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_s
send_cb,
nullptr,
0, // num_recv_frames
+ nullptr,
nullptr);
auto recv_io = io_srv->make_recv_client(recv_link,
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 6449bbda8..1438699b7 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -144,7 +144,7 @@ public:
}
}
- void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
+ bool recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
{
while (true) {
frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
@@ -156,7 +156,7 @@ public:
rcvr_found = true;
if (rcvr == cb) {
assert(!buff);
- return;
+ return true;
} else if (buff) {
/* NOTE: Should not overflow, by construction
* Every queue can hold link->get_num_recv_frames()
@@ -174,7 +174,7 @@ public:
recv_link->release_recv_buff(std::move(buff));
}
} else { /* Timeout */
- return;
+ return false;
}
}
}
@@ -250,12 +250,14 @@ public:
send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t fc_cb)
- : inline_recv_cb(fc_cb, send_link.get())
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
+ : inline_recv_cb(recv_cb, send_link.get())
, _io_srv(io_srv)
, _send_link(send_link)
, _send_cb(send_cb)
, _recv_link(recv_link)
+ , _fc_cb(fc_cb)
{
_num_recv_frames = num_recv_frames;
_num_send_frames = num_send_frames;
@@ -269,9 +271,27 @@ public:
}
}
+ bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms)
+ {
+ if (!_recv_link) {
+ // If there is no flow control link, then the destination must
+ // always be ready for more data.
+ return true;
+ }
+
+ while (!_fc_cb(num_bytes)) {
+ const bool updated =
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), timeout_ms);
+
+ if (!updated) {
+ return false;
+ }
+ }
+ return true;
+ }
+
frame_buff::uptr get_send_buff(int32_t timeout_ms)
{
- /* Check initial flow control result */
frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms);
if (buff) {
_num_frames_in_use++;
@@ -283,19 +303,8 @@ public:
void release_send_buff(frame_buff::uptr buff)
{
- while (buff) {
- // Try to send a packet
- _send_cb(buff, _send_link.get());
- if (_recv_link) {
- // If the buffer was not released, use a timeout to receive
- // the flow control packet, to avoid wasting CPU.
- if (!buff) {
- _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0);
- } else {
- _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100);
- }
- }
- }
+ // Send the packet using callback
+ _send_cb(std::move(buff), _send_link.get());
_num_frames_in_use--;
}
@@ -305,6 +314,7 @@ private:
send_callback_t _send_cb;
recv_link_if::sptr _recv_link;
recv_callback_t _recv_cb;
+ fc_callback_t _fc_cb;
size_t _num_frames_in_use = 0;
};
@@ -369,7 +379,8 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb)
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(send_link);
UHD_ASSERT_THROW(num_send_frames > 0);
@@ -377,9 +388,10 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
connect_sender(send_link.get(), num_send_frames);
sptr io_srv = shared_from_this();
auto send_io = std::make_shared<inline_send_io>(
- io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb);
if (recv_link) {
UHD_ASSERT_THROW(recv_cb);
+ UHD_ASSERT_THROW(fc_cb);
UHD_ASSERT_THROW(num_recv_frames > 0);
connect_receiver(recv_link.get(), send_io.get(), num_recv_frames);
}
@@ -474,7 +486,7 @@ frame_buff::uptr inline_io_service::recv(
}
}
-void inline_io_service::recv_flow_ctrl(
+bool inline_io_service::recv_flow_ctrl(
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms)
{
inline_recv_mux* mux;
@@ -483,8 +495,7 @@ void inline_io_service::recv_flow_ctrl(
if (mux) {
/* Defer to mux's recv_flow_ctrl() if present */
- mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
- return;
+ return mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
} else {
assert(recv_io_cb == rcvr);
}
@@ -495,13 +506,13 @@ void inline_io_service::recv_flow_ctrl(
if (buff) {
if (rcvr->callback(buff, recv_link)) {
assert(!buff);
- return;
+ return true;
} else {
UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
recv_link->release_recv_buff(std::move(buff));
}
} else { /* Timeout */
- return;
+ return false;
}
}
}
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index 53c8f017d..8a7895173 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -48,6 +48,16 @@ public:
_item_sem.notify();
}
+ bool peek(queue_item_t& item)
+ {
+ if (_item_sem.count()) {
+ item = _buffer[_read_index];
+ return true;
+ } else {
+ return false;
+ }
+ }
+
bool pop(queue_item_t& item)
{
if (_item_sem.try_wait()) {
@@ -153,6 +163,13 @@ public:
_from_offload_thread.push(queue_element);
}
+ std::tuple<frame_buff*, bool> offload_thread_peek()
+ {
+ to_offload_thread_t queue_element;
+ _to_offload_thread.peek(queue_element);
+ return std::make_tuple(queue_element.buff, queue_element.disconnect);
+ }
+
std::tuple<frame_buff*, bool> offload_thread_pop()
{
to_offload_thread_t queue_element;
@@ -261,7 +278,8 @@ public:
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb);
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb);
private:
offload_io_service_impl(const offload_io_service_impl&) = delete;
@@ -495,7 +513,8 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb)
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(_offload_thread);
@@ -513,12 +532,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
recv_link,
num_recv_frames,
recv_cb,
+ fc_cb,
port]() {
frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames};
_reservation_mgr.reserve_frames(frames);
auto inline_send_io = _io_srv->make_send_client(
- send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb);
send_client_info_t client_info;
client_info.inline_io = inline_send_io;
@@ -666,10 +686,14 @@ void offload_io_service_impl::_do_work_polling()
for (auto it = _send_clients.begin(); it != _send_clients.end();) {
frame_buff* buff;
bool disconnect;
- std::tie(buff, disconnect) = it->port->offload_thread_pop();
+ std::tie(buff, disconnect) = it->port->offload_thread_peek();
if (buff) {
- _release_send_buff(*it, buff);
+ if (it->inline_io->wait_for_dest_ready(buff->packet_size(), 0)) {
+ _release_send_buff(*it, buff);
+ it->port->offload_thread_pop();
+ }
} else if (disconnect) {
+ it->port->offload_thread_pop();
_disconnect_send_client(*it);
it = _send_clients.erase(it); // increments it
continue;
@@ -735,11 +759,14 @@ void offload_io_service_impl::_do_work_blocking()
if (it->num_frames_in_use > 0) {
frame_buff* buff;
bool disconnect;
- std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms);
-
+ std::tie(buff, disconnect) = it->port->offload_thread_peek();
if (buff) {
- _release_send_buff(*it, buff);
+ if (it->inline_io->wait_for_dest_ready(buff->packet_size(), blocking_timeout_ms)) {
+ _release_send_buff(*it, buff);
+ it->port->offload_thread_pop();
+ }
} else if (disconnect) {
+ it->port->offload_thread_pop();
_disconnect_send_client(*it);
it = _send_clients.erase(it); // increments it
continue;
diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp
index 321f22830..a8dc761f1 100644
--- a/host/tests/common/mock_transport.hpp
+++ b/host/tests/common/mock_transport.hpp
@@ -40,12 +40,13 @@ public:
uint16_t src_addr,
uint32_t credits)
: _credits(credits)
+ , _frame_size(send_link->get_send_frame_size())
{
_send_addr = (dst_addr << 16) | (src_addr << 0);
_recv_addr = (src_addr << 16) | (dst_addr << 0);
/* Make message client for sending side-band messages */
- send_io_if::send_callback_t msg_send_cb = [this](frame_buff::uptr& buff,
+ send_io_if::send_callback_t msg_send_cb = [this](frame_buff::uptr buff,
send_link_if* link) {
uint32_t* data = (uint32_t*)buff->data();
data[ADDR_OFFSET] = this->_send_addr;
@@ -53,21 +54,25 @@ public:
link->release_send_buff(std::move(buff));
};
_msg_if = io_srv->make_send_client(
- send_link, MSG_BUFFS, msg_send_cb, recv_link_if::sptr(), 0, nullptr);
+ send_link, MSG_BUFFS, msg_send_cb, recv_link_if::sptr(), 0, nullptr, nullptr);
/* Make client for sending streaming data */
- send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff,
+ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr buff,
send_link_if* link) {
- this->send_buff(buff, link);
+ this->send_buff(std::move(buff), link);
};
recv_callback_t recv_cb = [this](frame_buff::uptr& buff,
recv_link_if* link,
send_link_if* /*send_link*/) {
return this->recv_buff(buff, link);
};
+ send_io_if::fc_callback_t fc_cb = [this](size_t) {
+ return this->_seqno < this->_ackno + this->_credits;
+ };
+
/* Pretend get 1 flow control message per sent packet */
_send_if = io_srv->make_send_client(
- send_link, credits, send_cb, recv_link, credits, recv_cb);
+ send_link, credits, send_cb, recv_link, credits, recv_cb, fc_cb);
}
~mock_send_transport() {}
@@ -99,6 +104,10 @@ public:
*/
frame_buff::uptr get_data_buff(int32_t timeout_ms)
{
+ if (!_send_if->wait_for_dest_ready(_frame_size, timeout_ms)) {
+ return frame_buff::uptr();
+ }
+
frame_buff::uptr buff = _send_if->get_send_buff(timeout_ms);
if (!buff) {
return frame_buff::uptr();
@@ -138,11 +147,8 @@ public:
* Callbacks execute on the I/O thread! Be careful about what state is
* touched. In addition, this callback should NOT sleep.
*/
- void send_buff(frame_buff::uptr& buff, send_link_if* send_link)
+ void send_buff(frame_buff::uptr buff, send_link_if* send_link)
{
- if (_seqno >= _ackno + _credits) {
- return;
- }
uint32_t* data = (uint32_t*)buff->data();
data[ADDR_OFFSET] = _send_addr;
data[SEQNO_OFFSET] = _seqno;
@@ -202,6 +208,7 @@ private:
send_io_if::sptr _send_if;
uint32_t _seqno = 0;
uint32_t _ackno = 0;
+ size_t _frame_size;
};
/*!
diff --git a/host/tests/offload_io_srv_test.cpp b/host/tests/offload_io_srv_test.cpp
index fbf9668be..806d4a3dc 100644
--- a/host/tests/offload_io_srv_test.cpp
+++ b/host/tests/offload_io_srv_test.cpp
@@ -76,6 +76,11 @@ public:
return _link->get_send_buff(timeout_ms);
}
+ bool wait_for_dest_ready(size_t, int32_t)
+ {
+ return true;
+ }
+
void release_send_buff(frame_buff::uptr buff)
{
_link->release_send_buff(std::move(buff));
@@ -108,7 +113,8 @@ public:
send_io_if::send_callback_t /*cb*/,
recv_link_if::sptr /*recv_link*/,
size_t /*num_recv_frames*/,
- recv_callback_t /*recv_cb*/)
+ recv_callback_t /*recv_cb*/,
+ send_io_if::fc_callback_t /*fc_cb*/)
{
return std::make_shared<mock_send_io>(send_link);
}
@@ -156,7 +162,7 @@ BOOST_AUTO_TEST_CASE(test_construction)
auto send_link = make_send_link(5);
io_srv->attach_send_link(send_link);
auto send_client =
- io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr, nullptr);
}
for (const auto wait_mode : wait_modes) {
params_t params{{}, RECV_ONLY, wait_mode};
@@ -181,7 +187,7 @@ BOOST_AUTO_TEST_CASE(test_construction_with_options)
auto recv_link = make_recv_link(5);
io_srv->attach_recv_link(recv_link);
auto send_client =
- io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr, nullptr);
auto recv_client =
io_srv->make_recv_client(recv_link, 5, nullptr, nullptr, 0, nullptr);
}
@@ -195,7 +201,7 @@ BOOST_AUTO_TEST_CASE(test_send)
auto send_link = make_send_link(5);
io_srv->attach_send_link(send_link);
auto send_client =
- io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr, nullptr);
for (size_t i = 0; i < 10; i++) {
auto buff = send_client->get_send_buff(100);
@@ -244,7 +250,7 @@ BOOST_AUTO_TEST_CASE(test_send_recv)
io_srv->attach_recv_link(recv_link);
auto send_client =
- io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr, nullptr);
auto recv_client =
io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr);
@@ -262,7 +268,7 @@ BOOST_AUTO_TEST_CASE(test_send_recv)
auto recv_client2 =
io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr);
auto send_client2 =
- io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr, nullptr);
for (size_t i = 0; i < 5; i++) {
mock_io_srv->allocate_recv_frames(1, 1);
recv_client2->release_recv_buff(recv_client2->get_recv_buff(100));
@@ -297,11 +303,11 @@ BOOST_AUTO_TEST_CASE(test_attach_detach)
auto recv_client0 =
io_srv->make_recv_client(recv_link0, 1, nullptr, nullptr, 0, nullptr);
auto send_client0 =
- io_srv->make_send_client(send_link0, 1, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link0, 1, nullptr, nullptr, 0, nullptr, nullptr);
auto recv_client1 =
io_srv->make_recv_client(recv_link1, 1, nullptr, nullptr, 0, nullptr);
auto send_client1 =
- io_srv->make_send_client(send_link1, 1, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link1, 1, nullptr, nullptr, 0, nullptr, nullptr);
recv_link0->push_back_recv_packet(
boost::shared_array<uint8_t>(new uint8_t[FRAME_SIZE]), FRAME_SIZE);
@@ -337,7 +343,7 @@ BOOST_AUTO_TEST_CASE(test_attach_detach)
auto recv_client2 =
io_srv->make_recv_client(recv_link0, 1, nullptr, nullptr, 0, nullptr);
auto send_client2 =
- io_srv->make_send_client(send_link0, 1, nullptr, nullptr, 0, nullptr);
+ io_srv->make_send_client(send_link0, 1, nullptr, nullptr, 0, nullptr, nullptr);
recv_link0->push_back_recv_packet(
boost::shared_array<uint8_t>(new uint8_t[FRAME_SIZE]), FRAME_SIZE);