diff options
Diffstat (limited to 'host/lib/transport/offload_io_service.cpp')
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 43 |
1 files changed, 35 insertions, 8 deletions
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index 53c8f017d..8a7895173 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -48,6 +48,16 @@ public: _item_sem.notify(); } + bool peek(queue_item_t& item) + { + if (_item_sem.count()) { + item = _buffer[_read_index]; + return true; + } else { + return false; + } + } + bool pop(queue_item_t& item) { if (_item_sem.try_wait()) { @@ -153,6 +163,13 @@ public: _from_offload_thread.push(queue_element); } + std::tuple<frame_buff*, bool> offload_thread_peek() + { + to_offload_thread_t queue_element; + _to_offload_thread.peek(queue_element); + return std::make_tuple(queue_element.buff, queue_element.disconnect); + } + std::tuple<frame_buff*, bool> offload_thread_pop() { to_offload_thread_t queue_element; @@ -261,7 +278,8 @@ public: send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb); + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); private: offload_io_service_impl(const offload_io_service_impl&) = delete; @@ -495,7 +513,8 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(_offload_thread); @@ -513,12 +532,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se recv_link, num_recv_frames, recv_cb, + fc_cb, port]() { frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames}; _reservation_mgr.reserve_frames(frames); auto inline_send_io = _io_srv->make_send_client( - send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb); send_client_info_t client_info; client_info.inline_io = inline_send_io; @@ -666,10 +686,14 @@ void offload_io_service_impl::_do_work_polling() for (auto it = _send_clients.begin(); it != _send_clients.end();) { frame_buff* buff; bool disconnect; - std::tie(buff, disconnect) = it->port->offload_thread_pop(); + std::tie(buff, disconnect) = it->port->offload_thread_peek(); if (buff) { - _release_send_buff(*it, buff); + if (it->inline_io->wait_for_dest_ready(buff->packet_size(), 0)) { + _release_send_buff(*it, buff); + it->port->offload_thread_pop(); + } } else if (disconnect) { + it->port->offload_thread_pop(); _disconnect_send_client(*it); it = _send_clients.erase(it); // increments it continue; @@ -735,11 +759,14 @@ void offload_io_service_impl::_do_work_blocking() if (it->num_frames_in_use > 0) { frame_buff* buff; bool disconnect; - std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); - + std::tie(buff, disconnect) = it->port->offload_thread_peek(); if (buff) { - _release_send_buff(*it, buff); + if (it->inline_io->wait_for_dest_ready(buff->packet_size(), blocking_timeout_ms)) { + _release_send_buff(*it, buff); + it->port->offload_thread_pop(); + } } else if (disconnect) { + it->port->offload_thread_pop(); _disconnect_send_client(*it); it = _send_clients.erase(it); // increments it continue; |