diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 81 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 27 |
2 files changed, 99 insertions, 9 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 93967e09a..6449bbda8 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -144,6 +144,41 @@ public: } } + void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + { + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + bool rcvr_found = false; + for (auto& rcvr : _callbacks) { + if (rcvr->callback(buff, recv_link)) { + rcvr_found = true; + if (rcvr == cb) { + assert(!buff); + return; + } else if (buff) { + /* NOTE: Should not overflow, by construction + * Every queue can hold link->get_num_recv_frames() + */ + _queues[rcvr]->push_back(buff.release()); + } else { + /* Continue looping if buffer was consumed and + receiver is not the requested one */ + break; + } + } + } + if (not rcvr_found) { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return; + } + } + } + private: recv_link_if* _link; std::list<inline_recv_cb*> _callbacks; @@ -248,11 +283,18 @@ public: void release_send_buff(frame_buff::uptr buff) { - while (buff) { /* TODO: Possibly don't loop indefinitely here */ + while (buff) { + // Try to send a packet + _send_cb(buff, _send_link.get()); if (_recv_link) { - _io_srv->recv(this, _recv_link.get(), 0); + // If the buffer was not released, use a timeout to receive + // the flow control packet, to avoid wasting CPU. + if (!buff) { + _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0); + } else { + _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100); + } } - _send_cb(buff, _send_link.get()); } _num_frames_in_use--; } @@ -430,7 +472,38 @@ frame_buff::uptr inline_io_service::recv( return frame_buff::uptr(); } } - return frame_buff::uptr(); +} + +void inline_io_service::recv_flow_ctrl( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + std::tie(mux, rcvr) = _recv_tbl.at(recv_link); + + if (mux) { + /* Defer to mux's recv_flow_ctrl() if present */ + mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); + return; + } else { + assert(recv_io_cb == rcvr); + } + + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + if (rcvr->callback(buff, recv_link)) { + assert(!buff); + return; + } else { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return; + } + } } }} // namespace uhd::transport diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index c9b9af344..53c8f017d 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -24,7 +24,7 @@ namespace uhd { namespace transport { namespace { -constexpr int32_t blocking_timeout_ms = 100; +constexpr int32_t blocking_timeout_ms = 10; // Fixed-size queue that supports blocking semantics template <typename queue_item_t> @@ -108,6 +108,13 @@ public: return queue_element.buff; } + frame_buff* client_pop(int32_t timeout_ms) + { + from_offload_thread_t queue_element; + _from_offload_thread.pop(queue_element, timeout_ms); + return queue_element.buff; + } + size_t client_read_available() { return _from_offload_thread.read_available(); @@ -474,8 +481,13 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re port->client_wait_until_connected(); // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_recv_io<offload_io_service_impl>>( - shared_from_this(), num_recv_frames, num_send_frames, port); + if (_offload_thread_params.wait_mode == POLL) { + return std::make_shared<offload_recv_io<offload_io_service_impl, true>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } else { + return std::make_shared<offload_recv_io<offload_io_service_impl, false>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } } send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link, @@ -528,8 +540,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se } // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_send_io<offload_io_service_impl>>( - shared_from_this(), num_recv_frames, num_send_frames, port); + if (_offload_thread_params.wait_mode == POLL) { + return std::make_shared<offload_send_io<offload_io_service_impl, true>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } else { + return std::make_shared<offload_send_io<offload_io_service_impl, false>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } } void offload_io_service_impl::_queue_client_req(std::function<void()> fn) |