diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 63 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 43 |
2 files changed, 72 insertions, 34 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 6449bbda8..1438699b7 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -144,7 +144,7 @@ public: } } - void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + bool recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) { while (true) { frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); @@ -156,7 +156,7 @@ public: rcvr_found = true; if (rcvr == cb) { assert(!buff); - return; + return true; } else if (buff) { /* NOTE: Should not overflow, by construction * Every queue can hold link->get_num_recv_frames() @@ -174,7 +174,7 @@ public: recv_link->release_recv_buff(std::move(buff)); } } else { /* Timeout */ - return; + return false; } } } @@ -250,12 +250,14 @@ public: send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t fc_cb) - : inline_recv_cb(fc_cb, send_link.get()) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) + : inline_recv_cb(recv_cb, send_link.get()) , _io_srv(io_srv) , _send_link(send_link) , _send_cb(send_cb) , _recv_link(recv_link) + , _fc_cb(fc_cb) { _num_recv_frames = num_recv_frames; _num_send_frames = num_send_frames; @@ -269,9 +271,27 @@ public: } } + bool wait_for_dest_ready(size_t num_bytes, int32_t timeout_ms) + { + if (!_recv_link) { + // If there is no flow control link, then the destination must + // always be ready for more data. + return true; + } + + while (!_fc_cb(num_bytes)) { + const bool updated = + _io_srv->recv_flow_ctrl(this, _recv_link.get(), timeout_ms); + + if (!updated) { + return false; + } + } + return true; + } + frame_buff::uptr get_send_buff(int32_t timeout_ms) { - /* Check initial flow control result */ frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); if (buff) { _num_frames_in_use++; @@ -283,19 +303,8 @@ public: void release_send_buff(frame_buff::uptr buff) { - while (buff) { - // Try to send a packet - _send_cb(buff, _send_link.get()); - if (_recv_link) { - // If the buffer was not released, use a timeout to receive - // the flow control packet, to avoid wasting CPU. - if (!buff) { - _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0); - } else { - _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100); - } - } - } + // Send the packet using callback + _send_cb(std::move(buff), _send_link.get()); _num_frames_in_use--; } @@ -305,6 +314,7 @@ private: send_callback_t _send_cb; recv_link_if::sptr _recv_link; recv_callback_t _recv_cb; + fc_callback_t _fc_cb; size_t _num_frames_in_use = 0; }; @@ -369,7 +379,8 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(send_link); UHD_ASSERT_THROW(num_send_frames > 0); @@ -377,9 +388,10 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin connect_sender(send_link.get(), num_send_frames); sptr io_srv = shared_from_this(); auto send_io = std::make_shared<inline_send_io>( - io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb); if (recv_link) { UHD_ASSERT_THROW(recv_cb); + UHD_ASSERT_THROW(fc_cb); UHD_ASSERT_THROW(num_recv_frames > 0); connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); } @@ -474,7 +486,7 @@ frame_buff::uptr inline_io_service::recv( } } -void inline_io_service::recv_flow_ctrl( +bool inline_io_service::recv_flow_ctrl( inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) { inline_recv_mux* mux; @@ -483,8 +495,7 @@ void inline_io_service::recv_flow_ctrl( if (mux) { /* Defer to mux's recv_flow_ctrl() if present */ - mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); - return; + return mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); } else { assert(recv_io_cb == rcvr); } @@ -495,13 +506,13 @@ void inline_io_service::recv_flow_ctrl( if (buff) { if (rcvr->callback(buff, recv_link)) { assert(!buff); - return; + return true; } else { UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); recv_link->release_recv_buff(std::move(buff)); } } else { /* Timeout */ - return; + return false; } } } diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index 53c8f017d..8a7895173 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -48,6 +48,16 @@ public: _item_sem.notify(); } + bool peek(queue_item_t& item) + { + if (_item_sem.count()) { + item = _buffer[_read_index]; + return true; + } else { + return false; + } + } + bool pop(queue_item_t& item) { if (_item_sem.try_wait()) { @@ -153,6 +163,13 @@ public: _from_offload_thread.push(queue_element); } + std::tuple<frame_buff*, bool> offload_thread_peek() + { + to_offload_thread_t queue_element; + _to_offload_thread.peek(queue_element); + return std::make_tuple(queue_element.buff, queue_element.disconnect); + } + std::tuple<frame_buff*, bool> offload_thread_pop() { to_offload_thread_t queue_element; @@ -261,7 +278,8 @@ public: send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb); + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); private: offload_io_service_impl(const offload_io_service_impl&) = delete; @@ -495,7 +513,8 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se send_io_if::send_callback_t send_cb, recv_link_if::sptr recv_link, size_t num_recv_frames, - recv_callback_t recv_cb) + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(_offload_thread); @@ -513,12 +532,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se recv_link, num_recv_frames, recv_cb, + fc_cb, port]() { frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames}; _reservation_mgr.reserve_frames(frames); auto inline_send_io = _io_srv->make_send_client( - send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb, fc_cb); send_client_info_t client_info; client_info.inline_io = inline_send_io; @@ -666,10 +686,14 @@ void offload_io_service_impl::_do_work_polling() for (auto it = _send_clients.begin(); it != _send_clients.end();) { frame_buff* buff; bool disconnect; - std::tie(buff, disconnect) = it->port->offload_thread_pop(); + std::tie(buff, disconnect) = it->port->offload_thread_peek(); if (buff) { - _release_send_buff(*it, buff); + if (it->inline_io->wait_for_dest_ready(buff->packet_size(), 0)) { + _release_send_buff(*it, buff); + it->port->offload_thread_pop(); + } } else if (disconnect) { + it->port->offload_thread_pop(); _disconnect_send_client(*it); it = _send_clients.erase(it); // increments it continue; @@ -735,11 +759,14 @@ void offload_io_service_impl::_do_work_blocking() if (it->num_frames_in_use > 0) { frame_buff* buff; bool disconnect; - std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); - + std::tie(buff, disconnect) = it->port->offload_thread_peek(); if (buff) { - _release_send_buff(*it, buff); + if (it->inline_io->wait_for_dest_ready(buff->packet_size(), blocking_timeout_ms)) { + _release_send_buff(*it, buff); + it->port->offload_thread_pop(); + } } else if (disconnect) { + it->port->offload_thread_pop(); _disconnect_send_client(*it); it = _send_clients.erase(it); // increments it continue; |