diff options
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/include/uhdlib/transport/inline_io_service.hpp | 13 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service_client.hpp | 44 | ||||
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 81 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 27 |
4 files changed, 140 insertions, 25 deletions
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp index fe41b96b6..d4a6dbbae 100644 --- a/host/lib/include/uhdlib/transport/inline_io_service.hpp +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -104,6 +104,19 @@ private: frame_buff::uptr recv( inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); + /* + * Function to perform recv operations on a link, which is potentially + * muxed. This function is only called from send_io::release_send_buff, and + * always expects recv_io_cb to release its incoming buffer. Packets are + * forwarded to the appropriate mux or callback. + * + * \param recv_io_cb the callback+interface initiating the operation + * \param recv_link link to perform receive on + * \param timeout_ms timeout to wait for a buffer on the link + */ + void recv_flow_ctrl( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); + /* Track whether link is muxed and the callback */ std::unordered_map<recv_link_if*, std::tuple<inline_recv_mux*, inline_recv_cb*>> _recv_tbl; diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp index 620e796ef..2f606878c 100644 --- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -50,7 +50,7 @@ static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms /*! * Recv I/O client for offload I/O service */ -template <typename io_service_t> +template <typename io_service_t, bool polling> class offload_recv_io : public recv_io_if { public: @@ -75,13 +75,19 @@ public: frame_buff::uptr get_recv_buff(int32_t timeout_ms) { - return detail::client_get_buff( - [this]() { - frame_buff* buff = _port->client_pop(); - _num_frames_in_use += buff ? 1 : 0; - return buff; - }, - timeout_ms); + if (polling) { + return detail::client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); + } else { + frame_buff* buff = _port->client_pop(timeout_ms); + _num_frames_in_use += buff ? 1 : 0; + return frame_buff::uptr(buff); + } } void release_recv_buff(frame_buff::uptr buff) @@ -103,7 +109,7 @@ private: /*! * Send I/O client for offload I/O service */ -template <typename io_service_t> +template <typename io_service_t, bool polling> class offload_send_io : public send_io_if { public: @@ -128,13 +134,19 @@ public: frame_buff::uptr get_send_buff(int32_t timeout_ms) { - return detail::client_get_buff( - [this]() { - frame_buff* buff = _port->client_pop(); - _num_frames_in_use += buff ? 1 : 0; - return buff; - }, - timeout_ms); + if (polling) { + return detail::client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); + } else { + frame_buff* buff = _port->client_pop(timeout_ms); + _num_frames_in_use += buff ? 1 : 0; + return frame_buff::uptr(buff); + } } void release_send_buff(frame_buff::uptr buff) diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 93967e09a..6449bbda8 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -144,6 +144,41 @@ public: } } + void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + { + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + bool rcvr_found = false; + for (auto& rcvr : _callbacks) { + if (rcvr->callback(buff, recv_link)) { + rcvr_found = true; + if (rcvr == cb) { + assert(!buff); + return; + } else if (buff) { + /* NOTE: Should not overflow, by construction + * Every queue can hold link->get_num_recv_frames() + */ + _queues[rcvr]->push_back(buff.release()); + } else { + /* Continue looping if buffer was consumed and + receiver is not the requested one */ + break; + } + } + } + if (not rcvr_found) { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return; + } + } + } + private: recv_link_if* _link; std::list<inline_recv_cb*> _callbacks; @@ -248,11 +283,18 @@ public: void release_send_buff(frame_buff::uptr buff) { - while (buff) { /* TODO: Possibly don't loop indefinitely here */ + while (buff) { + // Try to send a packet + _send_cb(buff, _send_link.get()); if (_recv_link) { - _io_srv->recv(this, _recv_link.get(), 0); + // If the buffer was not released, use a timeout to receive + // the flow control packet, to avoid wasting CPU. + if (!buff) { + _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0); + } else { + _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100); + } } - _send_cb(buff, _send_link.get()); } _num_frames_in_use--; } @@ -430,7 +472,38 @@ frame_buff::uptr inline_io_service::recv( return frame_buff::uptr(); } } - return frame_buff::uptr(); +} + +void inline_io_service::recv_flow_ctrl( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + std::tie(mux, rcvr) = _recv_tbl.at(recv_link); + + if (mux) { + /* Defer to mux's recv_flow_ctrl() if present */ + mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms); + return; + } else { + assert(recv_io_cb == rcvr); + } + + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + if (rcvr->callback(buff, recv_link)) { + assert(!buff); + return; + } else { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return; + } + } } }} // namespace uhd::transport diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index c9b9af344..53c8f017d 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -24,7 +24,7 @@ namespace uhd { namespace transport { namespace { -constexpr int32_t blocking_timeout_ms = 100; +constexpr int32_t blocking_timeout_ms = 10; // Fixed-size queue that supports blocking semantics template <typename queue_item_t> @@ -108,6 +108,13 @@ public: return queue_element.buff; } + frame_buff* client_pop(int32_t timeout_ms) + { + from_offload_thread_t queue_element; + _from_offload_thread.pop(queue_element, timeout_ms); + return queue_element.buff; + } + size_t client_read_available() { return _from_offload_thread.read_available(); @@ -474,8 +481,13 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re port->client_wait_until_connected(); // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_recv_io<offload_io_service_impl>>( - shared_from_this(), num_recv_frames, num_send_frames, port); + if (_offload_thread_params.wait_mode == POLL) { + return std::make_shared<offload_recv_io<offload_io_service_impl, true>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } else { + return std::make_shared<offload_recv_io<offload_io_service_impl, false>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } } send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link, @@ -528,8 +540,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se } // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_send_io<offload_io_service_impl>>( - shared_from_this(), num_recv_frames, num_send_frames, port); + if (_offload_thread_params.wait_mode == POLL) { + return std::make_shared<offload_send_io<offload_io_service_impl, true>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } else { + return std::make_shared<offload_send_io<offload_io_service_impl, false>>( + shared_from_this(), num_recv_frames, num_send_frames, port); + } } void offload_io_service_impl::_queue_client_req(std::function<void()> fn) |