aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/offload_io_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/offload_io_service.cpp')
-rw-r--r--host/lib/transport/offload_io_service.cpp43
1 files changed, 35 insertions, 8 deletions
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index 53c8f017d..8a7895173 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -48,6 +48,16 @@ public:
_item_sem.notify();
}
+ bool peek(queue_item_t& item)
+ {
+ if (_item_sem.count()) {
+ item = _buffer[_read_index];
+ return true;
+ } else {
+ return false;
+ }
+ }
+
bool pop(queue_item_t& item)
{
if (_item_sem.try_wait()) {
@@ -153,6 +163,13 @@ public:
_from_offload_thread.push(queue_element);
}
+ std::tuple<frame_buff*, bool> offload_thread_peek()
+ {
+ to_offload_thread_t queue_element;
+ _to_offload_thread.peek(queue_element);
+ return std::make_tuple(queue_element.buff, queue_element.disconnect);
+ }
+
std::tuple<frame_buff*, bool> offload_thread_pop()
{
to_offload_thread_t queue_element;
@@ -261,7 +278,8 @@ public:
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb);
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb);
private:
offload_io_service_impl(const offload_io_service_impl&) = delete;
@@ -495,7 +513,8 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
send_io_if::send_callback_t send_cb,
recv_link_if::sptr recv_link,
size_t num_recv_frames,
- recv_callback_t recv_cb)
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(_offload_thread);
@@ -513,12 +532,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
recv_link,
num_recv_frames,
recv_cb,
+ fc_cb,
port]() {
frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames};
_reservation_mgr.reserve_frames(frames);
auto inline_send_io = _io_srv->make_send_client(
- send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb);
send_client_info_t client_info;
client_info.inline_io = inline_send_io;
@@ -666,10 +686,14 @@ void offload_io_service_impl::_do_work_polling()
for (auto it = _send_clients.begin(); it != _send_clients.end();) {
frame_buff* buff;
bool disconnect;
- std::tie(buff, disconnect) = it->port->offload_thread_pop();
+ std::tie(buff, disconnect) = it->port->offload_thread_peek();
if (buff) {
- _release_send_buff(*it, buff);
+ if (it->inline_io->wait_for_dest_ready(buff->packet_size(), 0)) {
+ _release_send_buff(*it, buff);
+ it->port->offload_thread_pop();
+ }
} else if (disconnect) {
+ it->port->offload_thread_pop();
_disconnect_send_client(*it);
it = _send_clients.erase(it); // increments it
continue;
@@ -735,11 +759,14 @@ void offload_io_service_impl::_do_work_blocking()
if (it->num_frames_in_use > 0) {
frame_buff* buff;
bool disconnect;
- std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms);
-
+ std::tie(buff, disconnect) = it->port->offload_thread_peek();
if (buff) {
- _release_send_buff(*it, buff);
+ if (it->inline_io->wait_for_dest_ready(buff->packet_size(), blocking_timeout_ms)) {
+ _release_send_buff(*it, buff);
+ it->port->offload_thread_pop();
+ }
} else if (disconnect) {
+ it->port->offload_thread_pop();
_disconnect_send_client(*it);
it = _send_clients.erase(it); // increments it
continue;