aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/inline_io_service.cpp63
-rw-r--r--host/lib/transport/offload_io_service.cpp43
2 files changed, 72 insertions, 34 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 6449bbda8..1438699b7 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -144,7 +144,7 @@ public:
}
}
- void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
+ bool recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
{
while (true) {
frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
@@ -156,7 +156,7 @@ public:
rcvr_found = true;
if (rcvr == cb) {
assert(!buff);
- return;
+ return true;
} else if (buff) {
/* NOTE: Should not overflow, by construction
* Every queue can hold link->get_num_recv_frames()
@@ -174,7 +174,7 @@ public:
recv_link->release_recv_buff(std::move(buff));
}
} else { /* Timeout */
- return;
+ return false;
}
}
}
@@ -250,12 +250,14 @@ public:
send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t fc_cb)
- : inline_recv_cb(fc_cb, send_link.get())
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
+ : inline_recv_cb(recv_cb, send_link.get())
, _io_srv(io_srv)
, _send_link(send_link)
, _send_cb(send_cb)
, _recv_link(recv_link)
+ , _fc_cb(fc_cb)
{
_num_recv_frames = num_recv_frames;
_num_send_frames = num_send_frames;
@@ -269,9 +271,27 @@ public:
}
}
+ bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms)
+ {
+ if (!_recv_link) {
+ // If there is no flow control link, then the destination must
+ // always be ready for more data.
+ return true;
+ }
+
+ while (!_fc_cb(num_bytes)) {
+ const bool updated =
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), timeout_ms);
+
+ if (!updated) {
+ return false;
+ }
+ }
+ return true;
+ }
+
frame_buff::uptr get_send_buff(int32_t timeout_ms)
{
- /* Check initial flow control result */
frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms);
if (buff) {
_num_frames_in_use++;
@@ -283,19 +303,8 @@ public:
void release_send_buff(frame_buff::uptr buff)
{
- while (buff) {
- // Try to send a packet
- _send_cb(buff, _send_link.get());
- if (_recv_link) {
- // If the buffer was not released, use a timeout to receive
- // the flow control packet, to avoid wasting CPU.
- if (!buff) {
- _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0);
- } else {
- _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100);
- }
- }
- }
+ // Send the packet using callback
+ _send_cb(std::move(buff), _send_link.get());
_num_frames_in_use--;
}
@@ -305,6 +314,7 @@ private:
send_callback_t _send_cb;
recv_link_if::sptr _recv_link;
recv_callback_t _recv_cb;
+ fc_callback_t _fc_cb;
size_t _num_frames_in_use = 0;
};
@@ -369,7 +379,8 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb)
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(send_link);
UHD_ASSERT_THROW(num_send_frames > 0);
@@ -377,9 +388,10 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
connect_sender(send_link.get(), num_send_frames);
sptr io_srv = shared_from_this();
auto send_io = std::make_shared<inline_send_io>(
- io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb);
if (recv_link) {
UHD_ASSERT_THROW(recv_cb);
+ UHD_ASSERT_THROW(fc_cb);
UHD_ASSERT_THROW(num_recv_frames > 0);
connect_receiver(recv_link.get(), send_io.get(), num_recv_frames);
}
@@ -474,7 +486,7 @@ frame_buff::uptr inline_io_service::recv(
}
}
-void inline_io_service::recv_flow_ctrl(
+bool inline_io_service::recv_flow_ctrl(
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms)
{
inline_recv_mux* mux;
@@ -483,8 +495,7 @@ void inline_io_service::recv_flow_ctrl(
if (mux) {
/* Defer to mux's recv_flow_ctrl() if present */
- mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
- return;
+ return mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
} else {
assert(recv_io_cb == rcvr);
}
@@ -495,13 +506,13 @@ void inline_io_service::recv_flow_ctrl(
if (buff) {
if (rcvr->callback(buff, recv_link)) {
assert(!buff);
- return;
+ return true;
} else {
UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
recv_link->release_recv_buff(std::move(buff));
}
} else { /* Timeout */
- return;
+ return false;
}
}
}
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index 53c8f017d..8a7895173 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -48,6 +48,16 @@ public:
_item_sem.notify();
}
+ bool peek(queue_item_t& item)
+ {
+ if (_item_sem.count()) {
+ item = _buffer[_read_index];
+ return true;
+ } else {
+ return false;
+ }
+ }
+
bool pop(queue_item_t& item)
{
if (_item_sem.try_wait()) {
@@ -153,6 +163,13 @@ public:
_from_offload_thread.push(queue_element);
}
+ std::tuple<frame_buff*, bool> offload_thread_peek()
+ {
+ to_offload_thread_t queue_element;
+ _to_offload_thread.peek(queue_element);
+ return std::make_tuple(queue_element.buff, queue_element.disconnect);
+ }
+
std::tuple<frame_buff*, bool> offload_thread_pop()
{
to_offload_thread_t queue_element;
@@ -261,7 +278,8 @@ public:
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb);
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb);
private:
offload_io_service_impl(const offload_io_service_impl&) = delete;
@@ -495,7 +513,8 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb)
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(_offload_thread);
@@ -513,12 +532,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
recv_link,
num_recv_frames,
recv_cb,
+ fc_cb,
port]() {
frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames};
_reservation_mgr.reserve_frames(frames);
auto inline_send_io = _io_srv->make_send_client(
- send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb);
send_client_info_t client_info;
client_info.inline_io = inline_send_io;
@@ -666,10 +686,14 @@ void offload_io_service_impl::_do_work_polling()
for (auto it = _send_clients.begin(); it != _send_clients.end();) {
frame_buff* buff;
bool disconnect;
- std::tie(buff, disconnect) = it->port->offload_thread_pop();
+ std::tie(buff, disconnect) = it->port->offload_thread_peek();
if (buff) {
- _release_send_buff(*it, buff);
+ if (it->inline_io->wait_for_dest_ready(buff->packet_size(), 0)) {
+ _release_send_buff(*it, buff);
+ it->port->offload_thread_pop();
+ }
} else if (disconnect) {
+ it->port->offload_thread_pop();
_disconnect_send_client(*it);
it = _send_clients.erase(it); // increments it
continue;
@@ -735,11 +759,14 @@ void offload_io_service_impl::_do_work_blocking()
if (it->num_frames_in_use > 0) {
frame_buff* buff;
bool disconnect;
- std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms);
-
+ std::tie(buff, disconnect) = it->port->offload_thread_peek();
if (buff) {
- _release_send_buff(*it, buff);
+ if (it->inline_io->wait_for_dest_ready(buff->packet_size(), blocking_timeout_ms)) {
+ _release_send_buff(*it, buff);
+ it->port->offload_thread_pop();
+ }
} else if (disconnect) {
+ it->port->offload_thread_pop();
_disconnect_send_client(*it);
it = _send_clients.erase(it); // increments it
continue;