diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-09-11 14:48:50 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:31 -0800 |
commit | 650c07cbcb74b88e1a561a85e25035e553e00f14 (patch) | |
tree | 4085d562123f3ea4e20e70a2b326dcfb3b3674da | |
parent | f3a86a32944ae68047e6f64369e93a6830742601 (diff) | |
download | uhd-650c07cbcb74b88e1a561a85e25035e553e00f14.tar.gz uhd-650c07cbcb74b88e1a561a85e25035e553e00f14.tar.bz2 uhd-650c07cbcb74b88e1a561a85e25035e553e00f14.zip |
transport: Implement an I/O service that uses an offload thread
The offload_io_service executes another I/O service instance within an
offload thread, and provides synchronization mechanisms to communicate
with clients. Frame buffers are passed from the offload thread to the
client and back via single-producer, single-consumer queues.
-rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service.hpp | 66 | ||||
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 998 | ||||
-rw-r--r-- | host/tests/CMakeLists.txt | 6 | ||||
-rw-r--r-- | host/tests/offload_io_srv_test.cpp | 278 |
5 files changed, 1349 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/offload_io_service.hpp b/host/lib/include/uhdlib/transport/offload_io_service.hpp new file mode 100644 index 000000000..a7d9d211d --- /dev/null +++ b/host/lib/include/uhdlib/transport/offload_io_service.hpp @@ -0,0 +1,66 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP +#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP + +#include <uhdlib/transport/io_service.hpp> + +namespace uhd { namespace transport { + +/*! + * I/O service with offload thread + * + * Note: This I/O service can only be used with transports that allow releasing + * frame buffers out of order, since flow control packets are handled entirely + * within the offload thread. + */ +class offload_io_service : public io_service +{ +public: + enum client_type_t + { + RECV_ONLY, + SEND_ONLY, + BOTH_SEND_AND_RECV + }; + + enum wait_mode_t + { + POLL, + BLOCK + }; + + /*! + * Options for configuring offload I/O service + */ + struct params_t + { + //! Array of CPU numbers to which to affinitize the offload thread. + std::vector<size_t> cpu_affinity_list; + //! The types of client that the I/O service needs to support. + client_type_t client_type = BOTH_SEND_AND_RECV; + //! The thread behavior when waiting for incoming packets If set to + //! BLOCK, the client type must be set to either RECV_ONLY or SEND_ONLY. + wait_mode_t wait_mode = POLL; + }; + + /*! + * Creates an io service that offloads I/O to a worker thread and + * passes configuration parameters to it. + * + * \param io_srv The io service to perform the actual work in the worker + * thread. + * \param params Parameters to pass to the offload I/O service. + * \return A composite I/O service that executes the provided io service + * in its own thread. + */ + static sptr make(io_service::sptr io_srv, const params_t& params); +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index d21644f01..d39ca7336 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -123,6 +123,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/offload_io_service.cpp ${CMAKE_CURRENT_SOURCE_DIR}/adapter.cpp ) diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp new file mode 100644 index 000000000..ed28a93f9 --- /dev/null +++ b/host/lib/transport/offload_io_service.cpp @@ -0,0 +1,998 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/offload_io_service.hpp> +#include <condition_variable> +#include <unordered_map> +#include <boost/lockfree/queue.hpp> +#include <atomic> +#include <chrono> +#include <functional> +#include <list> +#include <memory> +#include <thread> + +namespace uhd { namespace transport { + +namespace { + +constexpr int32_t blocking_timeout_ms = 100; + +// Struct to help keep track of frames reserved for each link +struct frame_reservation_t +{ + recv_link_if::sptr recv_link; + size_t num_recv_frames = 0; + send_link_if::sptr send_link; + size_t num_send_frames = 0; +}; + +// Helper class to keep track of frames reserved for each link +class frame_reservation_mgr +{ +public: + void register_link(const recv_link_if::sptr& recv_link) + { + if (_recv_tbl[recv_link.get()] != 0) { + throw uhd::runtime_error("Recv link already attached to I/O service"); + } + _recv_tbl[recv_link.get()] = 0; + } + + void register_link(const send_link_if::sptr& send_link) + { + if (_send_tbl[send_link.get()] != 0) { + throw uhd::runtime_error("Send link already attached to I/O service"); + } + _send_tbl[send_link.get()] = 0; + } + + void reserve_frames(const frame_reservation_t& reservation) + { + if (reservation.recv_link) { + const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); + const size_t capacity = reservation.recv_link->get_num_recv_frames(); + if (rsvd_frames + reservation.num_recv_frames > capacity) { + throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity"); + } + + recv_link_if* link_ptr = reservation.recv_link.get(); + _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames; + } + + if (reservation.send_link) { + const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); + const size_t capacity = reservation.send_link->get_num_send_frames(); + if (rsvd_frames + reservation.num_send_frames > capacity) { + throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity"); + } + + send_link_if* link_ptr = reservation.send_link.get(); + _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames; + } + } + + void unreserve_frames(const frame_reservation_t& reservation) + { + if (reservation.recv_link) { + const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); + recv_link_if* link_ptr = reservation.recv_link.get(); + _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames; + } + + if (reservation.send_link) { + const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); + send_link_if* link_ptr = reservation.send_link.get(); + _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames; + } + } + +private: + std::unordered_map<recv_link_if*, size_t> _recv_tbl; + std::unordered_map<send_link_if*, size_t> _send_tbl; +}; + + +// Semaphore used in blocking I/O for offload thread +class semaphore +{ +public: + void notify() { + std::unique_lock<std::mutex> lock(_cv_mutex); + _count++; + _cv.notify_one(); + } + + void wait() { + std::unique_lock<std::mutex> lock(_cv_mutex); + _cv.wait(lock, [this]() { return this->_count != 0; }); + _count--; + } + + bool try_wait() { + std::unique_lock<std::mutex> lock(_cv_mutex); + if (_count != 0) { + _count--; + return true; + } + return false; + } + + bool wait_for(size_t timeout_ms) { + std::chrono::milliseconds timeout(timeout_ms); + std::unique_lock<std::mutex> lock(_cv_mutex); + if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { + _count--; + return true; + } + return false; + } + + size_t count() { + std::unique_lock<std::mutex> lock(_cv_mutex); + return _count; + } + +private: + std::condition_variable _cv; + std::mutex _cv_mutex; + size_t _count = 0; +}; + +// Fixed-size queue that supports blocking semantics +template <typename queue_item_t> +class offload_thread_queue { +public: + offload_thread_queue(size_t size) + : _buffer(new queue_item_t[size]) + , _capacity(size) + { + } + + ~offload_thread_queue() + { + delete [] _buffer; + } + + void push(const queue_item_t& item) + { + _buffer[_write_index++] = item; + _write_index %= _capacity; + _item_sem.notify(); + } + + bool pop(queue_item_t& item) + { + if (_item_sem.try_wait()) { + item = _buffer[_read_index++]; + _read_index %= _capacity; + return true; + } else { + return false; + } + } + + bool pop(queue_item_t& item, int32_t timeout_ms) + { + if (_item_sem.wait_for(timeout_ms)) { + item = _buffer[_read_index++]; + _read_index %= _capacity; + return true; + } else { + return false; + } + } + + size_t read_available() + { + return _item_sem.count(); + } + +private: + queue_item_t* _buffer; + const size_t _capacity; + + size_t _read_index = 0; + size_t _write_index = 0; + + // Semaphore gating number of items available to read + semaphore _item_sem; +}; + +// Object that implements the communication between client and offload thread +struct client_port_t +{ +public: + using sptr = std::shared_ptr<client_port_t>; + + client_port_t(size_t size) + : _from_offload_thread(size) + , _to_offload_thread(size + 1) // add one for disconnect command + { + } + + // + // Client methods + // + frame_buff* client_pop() + { + from_offload_thread_t queue_element; + _from_offload_thread.pop(queue_element); + return queue_element.buff; + } + + size_t client_read_available() + { + return _from_offload_thread.read_available(); + } + + void client_push(frame_buff* buff) + { + to_offload_thread_t queue_element{buff, false}; + _to_offload_thread.push(queue_element); + } + + void client_wait_until_connected() + { + std::unique_lock<std::mutex> lock(_connect_cv_mutex); + _connect_cv.wait(lock, [this]() { return _connected; }); + } + + void client_disconnect() + { + to_offload_thread_t queue_element{nullptr, true}; + _to_offload_thread.push(queue_element); + + // Need to wait for the disconnect to occur before returning, since the + // caller (the xport object) has callbacks installed in the inline I/O + // service. After this method returns, the caller can be deallocated. + std::unique_lock<std::mutex> lock(_connect_cv_mutex); + _connect_cv.wait(lock, [this]() { return !_connected; }); + } + + // + // Offload thread methods + // + void offload_thread_push(frame_buff* buff) + { + from_offload_thread_t queue_element{buff}; + _from_offload_thread.push(queue_element); + } + + std::tuple<frame_buff*, bool> offload_thread_pop() + { + to_offload_thread_t queue_element; + _to_offload_thread.pop(queue_element); + return std::make_tuple(queue_element.buff, queue_element.disconnect); + } + + std::tuple<frame_buff*, bool> offload_thread_pop(int32_t timeout_ms) + { + to_offload_thread_t queue_element; + _to_offload_thread.pop(queue_element, timeout_ms); + return std::make_tuple(queue_element.buff, queue_element.disconnect); + } + + void offload_thread_set_connected(const bool value) + { + { + std::lock_guard<std::mutex> lock(_connect_cv_mutex); + _connected = value; + } + _connect_cv.notify_one(); + } + + // Flush should only be called once the client is no longer accessing the + // queue going from the offload thread to the client, since it drains that + // queue from the offload thread. + template <typename fn_t> + size_t offload_thread_flush(fn_t f) + { + size_t count = 0; + from_offload_thread_t queue_element; + while (_from_offload_thread.pop(queue_element)) { + f(queue_element.buff); + count++; + } + return count; + } + +private: + // Queue for frame buffers coming from the offload thread + struct from_offload_thread_t + { + frame_buff* buff = nullptr; + }; + + using from_offload_thread_queue_t = offload_thread_queue<from_offload_thread_t>; + + // Queue for frame buffers and disconnect requests to offload thread. Disconnect + // requests must be inline with incoming buffers to avoid any race conditions + // between the two. + struct to_offload_thread_t + { + frame_buff* buff = nullptr; + bool disconnect = false; + }; + + using to_offload_thread_queue_t = offload_thread_queue<to_offload_thread_t>; + + // Queues to carry frame buffers in both directions + from_offload_thread_queue_t _from_offload_thread; + to_offload_thread_queue_t _to_offload_thread; + + // Mutex and condition variable to wait for connect and disconnect + std::condition_variable _connect_cv; + std::mutex _connect_cv_mutex; + bool _connected = false; +}; + +} // namespace + +// Implementation of io service that executes an inline io service in an offload +// thread. The offload thread communicates with send and recv clients using a +// pair of spsc queues. One queue carries buffers from the offload thread to the +// client, and the other carries buffers in the opposite direction. +// +// Requests to create new clients are handled using a separate mpsc queue. Client +// requests to disconnect are sent in the same spsc queue as the buffers so that +// they are processed only after all buffer release requestss have been processed. +class offload_io_service_impl + : public offload_io_service, + public std::enable_shared_from_this<offload_io_service_impl> +{ +public: + using sptr = std::shared_ptr<offload_io_service_impl>; + + offload_io_service_impl( + io_service::sptr io_srv, const offload_io_service::params_t& params); + ~offload_io_service_impl(); + + void attach_recv_link(recv_link_if::sptr link); + void attach_send_link(send_link_if::sptr link); + + recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb); + + send_io_if::sptr make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb); + +private: + offload_io_service_impl(const offload_io_service_impl&) = delete; + + // Queue for new client creation, multiple producers allowed. Requests are + // passed as heap-allocated pointers because boost lockfree queues require + // simple types. + struct client_req_t + { + std::function<void()>* req = nullptr; + }; + using client_req_queue_t = boost::lockfree::queue<client_req_t>; + + // Values used by offload thread for each client + struct recv_client_info_t + { + client_port_t::sptr port; + recv_io_if::sptr inline_io; + size_t num_frames_in_use = 0; + frame_reservation_t frames_reserved; + }; + struct send_client_info_t + { + client_port_t::sptr port; + send_io_if::sptr inline_io; + size_t num_frames_in_use = 0; + frame_reservation_t frames_reserved; + }; + + void _get_recv_buff(recv_client_info_t& info, int32_t timeout_ms); + void _get_send_buff(send_client_info_t& info); + void _release_recv_buff(recv_client_info_t& info, frame_buff* buff); + void _release_send_buff(send_client_info_t& info, frame_buff* buff); + void _disconnect_recv_client(recv_client_info_t& info); + void _disconnect_send_client(send_client_info_t& info); + + template <bool allow_recv, bool allow_send> + void _do_work_polling(); + + template <bool allow_recv, bool allow_send> + void _do_work_blocking(); + + // The I/O service that executes within the offload thread + io_service::sptr _io_srv; + + // Type of clients supported by this I/O service + client_type_t _client_type; + + // Offload thread, its stop flag, and thread-related parameters + std::unique_ptr<std::thread> _offload_thread; + std::atomic<bool> _stop_offload_thread{false}; + offload_io_service::params_t _offload_thread_params; + + // Lists of clients and their respective queues + std::list<recv_client_info_t> _recv_clients; + std::list<send_client_info_t> _send_clients; + + // Queue for connect and disconnect client requests + client_req_queue_t _client_connect_queue; + + // Keep track of frame reservations + frame_reservation_mgr _reservation_mgr; +}; + +class offload_recv_io : public recv_io_if +{ +public: + offload_recv_io(offload_io_service_impl::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + client_port_t::sptr& port); + + ~offload_recv_io(); + + frame_buff::uptr get_recv_buff(int32_t timeout_ms); + void release_recv_buff(frame_buff::uptr buff); + +private: + offload_recv_io() = delete; + offload_recv_io(const offload_recv_io&) = delete; + + offload_io_service_impl::sptr _io_srv; + client_port_t::sptr _port; + size_t _num_frames_in_use = 0; +}; + +class offload_send_io : public send_io_if +{ +public: + offload_send_io(offload_io_service_impl::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + client_port_t::sptr& port); + + ~offload_send_io(); + + frame_buff::uptr get_send_buff(int32_t timeout_ms); + void release_send_buff(frame_buff::uptr buff); + +private: + offload_send_io() = delete; + offload_send_io(const offload_send_io&) = delete; + + offload_io_service_impl::sptr _io_srv; + client_port_t::sptr _port; + size_t _num_frames_in_use = 0; +}; + +// Implementation of get_send_buff used below by send and recv clients +template <typename pop_func_t> +static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) +{ + using namespace std::chrono; + + if (timeout_ms == 0) { + return frame_buff::uptr(pop()); + } + + const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + bool last_check = false; + + while (true) { + if (frame_buff* buff = pop()) { + return frame_buff::uptr(buff); + } + + if (timeout_ms > 0 && steady_clock::now() > end_time) { + if (last_check) { + return nullptr; + } else { + last_check = true; + } + } + std::this_thread::yield(); + } +} + +// +// offload_recv_io methods +// +offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + client_port_t::sptr& port) + : _io_srv(io_srv), _port(port) +{ + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; +} + +offload_recv_io::~offload_recv_io() +{ + assert(_num_frames_in_use == 0); + + if (_io_srv) { + _port->client_disconnect(); + } +} + +frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms) +{ + return client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); +} + +void offload_recv_io::release_recv_buff(frame_buff::uptr buff) +{ + assert(buff); + _port->client_push(buff.release()); + _num_frames_in_use--; +} + +// +// offload_send_io methods +// +offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + client_port_t::sptr& port) + : _io_srv(io_srv), _port(port) +{ + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; +} + +offload_send_io::~offload_send_io() +{ + assert(_num_frames_in_use == 0); + + if (_io_srv) { + _port->client_disconnect(); + } +} + +frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms) +{ + return client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); +} + +void offload_send_io::release_send_buff(frame_buff::uptr buff) +{ + assert(buff); + _port->client_push(buff.release()); + _num_frames_in_use--; +} + +// +// offload_io_service methods +// +offload_io_service::sptr offload_io_service::make( + io_service::sptr io_srv, const offload_io_service::params_t& params) +{ + return std::make_shared<offload_io_service_impl>(io_srv, params); +} + +// +// offload_io_service_impl methods +// +offload_io_service_impl::offload_io_service_impl( + io_service::sptr io_srv, const offload_io_service::params_t& params) + : _io_srv(io_srv) + , _offload_thread_params(params) + , _client_connect_queue(10) // arbitrary initial size +{ + if (params.wait_mode == BLOCK && params.client_type == BOTH_SEND_AND_RECV) { + throw uhd::value_error( + "An I/O service configured to block should only service either " + "send or recv clients to prevent one client type from starving " + "the other"); + } + + std::function<void()> thread_fn; + + if (params.wait_mode == BLOCK) { + if (params.client_type == RECV_ONLY) { + thread_fn = [this]() { _do_work_blocking<true, false>(); }; + } else if (params.client_type == SEND_ONLY) { + thread_fn = [this]() { _do_work_blocking<false, true>(); }; + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + } else if (params.wait_mode == POLL) { + if (params.client_type == RECV_ONLY) { + thread_fn = [this]() { _do_work_polling<true, false>(); }; + } else if (params.client_type == SEND_ONLY) { + thread_fn = [this]() { _do_work_polling<false, true>(); }; + } else if (params.client_type == BOTH_SEND_AND_RECV) { + thread_fn = [this]() { _do_work_polling<true, true>(); }; + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + + _offload_thread = std::make_unique<std::thread>(thread_fn); +} + +offload_io_service_impl::~offload_io_service_impl() +{ + _stop_offload_thread = true; + + if (_offload_thread) { + _offload_thread->join(); + } + + assert(_recv_clients.empty()); + assert(_send_clients.empty()); +} + +void offload_io_service_impl::attach_recv_link(recv_link_if::sptr link) +{ + // Create a request to attach link in the offload thread + auto req_fn = [this, link]() { + _reservation_mgr.register_link(link); + _io_srv->attach_recv_link(link); + }; + + client_req_t queue_element; + queue_element.req = {new std::function<void()>(req_fn)}; + const bool success = _client_connect_queue.push(queue_element); + if (!success) { + throw uhd::runtime_error("Failed to push attach_recv_link request"); + } +} + +void offload_io_service_impl::attach_send_link(send_link_if::sptr link) +{ + // Create a request to attach link in the offload thread + auto req_fn = [this, link]() { + _reservation_mgr.register_link(link); + _io_srv->attach_send_link(link); + }; + + client_req_t queue_element; + queue_element.req = {new std::function<void()>(req_fn)}; + const bool success = _client_connect_queue.push(queue_element); + if (!success) { + throw uhd::runtime_error("Failed to push attach_send_link request"); + } +} + +recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + UHD_ASSERT_THROW(_offload_thread); + + if (_client_type == SEND_ONLY) { + throw uhd::runtime_error("Recv client not supported by this I/O service"); + } + + auto port = std::make_shared<client_port_t>(num_recv_frames); + + // Create a request to create a new receiver in the offload thread + auto req_fn = + [this, recv_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb, port]() { + frame_reservation_t frames = {recv_link, num_recv_frames, fc_link, num_send_frames}; + _reservation_mgr.reserve_frames(frames); + + auto inline_recv_io = _io_srv->make_recv_client( + recv_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb); + + recv_client_info_t client_info; + client_info.inline_io = inline_recv_io; + client_info.port = port; + client_info.frames_reserved = frames; + + _recv_clients.push_back(client_info); + + // Notify that the connection is created + port->offload_thread_set_connected(true); + }; + + client_req_t queue_element; + queue_element.req = {new std::function<void()>(req_fn)}; + const bool success = _client_connect_queue.push(queue_element); + if (!success) { + throw uhd::runtime_error("Failed to push make_recv_client request"); + } + + port->client_wait_until_connected(); + + // Return a new recv client to the caller that just operates on the queues + return std::make_shared<offload_recv_io>( + shared_from_this(), num_recv_frames, num_send_frames, port); +} + +send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb) +{ + UHD_ASSERT_THROW(_offload_thread); + + if (_client_type == RECV_ONLY) { + throw uhd::runtime_error("Send client not supported by this I/O service"); + } + + auto port = std::make_shared<client_port_t>(num_send_frames); + + // Create a request to create a new receiver in the offload thread + auto req_fn = [this, + send_link, + num_send_frames, + send_cb, + recv_link, + num_recv_frames, + recv_cb, + port]() { + frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames}; + _reservation_mgr.reserve_frames(frames); + + auto inline_send_io = _io_srv->make_send_client( + send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + + send_client_info_t client_info; + client_info.inline_io = inline_send_io; + client_info.port = port; + client_info.frames_reserved = frames; + + _send_clients.push_back(client_info); + + // Notify that the connection is created + port->offload_thread_set_connected(true); + }; + + client_req_t queue_element; + queue_element.req = {new std::function<void()>(req_fn)}; + const bool success = _client_connect_queue.push(queue_element); + if (!success) { + throw uhd::runtime_error("Failed to push make_send_client request"); + } + + port->client_wait_until_connected(); + + // Wait for buffer queue to be full + while (port->client_read_available() != num_send_frames) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + // Return a new recv client to the caller that just operates on the queues + return std::make_shared<offload_send_io>( + shared_from_this(), num_recv_frames, num_send_frames, port); +} + +// Get a single receive buffer if available and update client info +void offload_io_service_impl::_get_recv_buff(recv_client_info_t& info, int32_t timeout_ms) +{ + if (info.num_frames_in_use < info.frames_reserved.num_recv_frames) { + if (frame_buff::uptr buff = info.inline_io->get_recv_buff(timeout_ms)) { + info.port->offload_thread_push(buff.release()); + info.num_frames_in_use++; + } + } +} + +// Get a single send buffer if available and update client info +void offload_io_service_impl::_get_send_buff(send_client_info_t& info) +{ + if (info.num_frames_in_use < info.frames_reserved.num_send_frames) { + if (frame_buff::uptr buff = info.inline_io->get_send_buff(0)) { + info.port->offload_thread_push(buff.release()); + info.num_frames_in_use++; + } + } +} + +// Release a single recv buffer and update client info +void offload_io_service_impl::_release_recv_buff(recv_client_info_t& info, frame_buff* buff) +{ + info.inline_io->release_recv_buff(frame_buff::uptr(buff)); + assert(info.num_frames_in_use > 0); + info.num_frames_in_use--; +} + +// Release a single send info +void offload_io_service_impl::_release_send_buff(send_client_info_t& info, frame_buff* buff) +{ + info.inline_io->release_send_buff(frame_buff::uptr(buff)); + assert(info.num_frames_in_use > 0); + info.num_frames_in_use--; +} + +// Flush client queues and unreserve its frames +void offload_io_service_impl::_disconnect_recv_client(recv_client_info_t& info) +{ + auto release_buff = [&info](frame_buff* buff) { + info.inline_io->release_recv_buff(frame_buff::uptr(buff)); + }; + + info.num_frames_in_use -= info.port->offload_thread_flush(release_buff); + assert(info.num_frames_in_use == 0); + _reservation_mgr.unreserve_frames(info.frames_reserved); + + // Client waits for a notification after requesting disconnect, so notify it + info.port->offload_thread_set_connected(false); +} + +// Flush client queues and unreserve its frames +void offload_io_service_impl::_disconnect_send_client(send_client_info_t& info) +{ + auto release_buff = [&info](frame_buff* buff) { + info.inline_io->release_send_buff(frame_buff::uptr(buff)); + }; + info.num_frames_in_use -= info.port->offload_thread_flush(release_buff); + assert(info.num_frames_in_use == 0); + _reservation_mgr.unreserve_frames(info.frames_reserved); + + // Client waits for a notification after requesting disconnect, so notify it + info.port->offload_thread_set_connected(false); +} + +template <bool allow_recv, bool allow_send> +void offload_io_service_impl::_do_work_polling() +{ + uhd::set_thread_affinity(_offload_thread_params.cpu_affinity_list); + + client_req_t client_req; + + while (!_stop_offload_thread) { + if (allow_recv) { + // Get recv buffers + for (auto& recv_info : _recv_clients) { + _get_recv_buff(recv_info, 0); + } + + // Release recv buffers + for (auto it = _recv_clients.begin(); it != _recv_clients.end();) { + frame_buff* buff; + bool disconnect; + std::tie(buff, disconnect) = it->port->offload_thread_pop(); + if (buff) { + _release_recv_buff(*it, buff); + } else if (disconnect) { + _disconnect_recv_client(*it); + it = _recv_clients.erase(it); // increments it + continue; + } + ++it; + } + } + + if (allow_send) { + // Get send buffers + for (auto& send_info : _send_clients) { + _get_send_buff(send_info); + } + + // Release send buffers + for (auto it = _send_clients.begin(); it != _send_clients.end();) { + frame_buff* buff; + bool disconnect; + std::tie(buff, disconnect) = it->port->offload_thread_pop(); + if (buff) { + _release_send_buff(*it, buff); + } else if (disconnect) { + _disconnect_send_client(*it); + it = _send_clients.erase(it); // increments it + continue; + } + ++it; + } + } + + // Execute one client connect command per main loop iteration + if (_client_connect_queue.pop(client_req)) { + (*client_req.req)(); + delete client_req.req; + } + } +} + +template <bool allow_recv, bool allow_send> +void offload_io_service_impl::_do_work_blocking() +{ + uhd::set_thread_affinity(_offload_thread_params.cpu_affinity_list); + + client_req_t client_req; + + while (!_stop_offload_thread) { + if (allow_recv) { + // Get recv buffers + for (auto& recv_info : _recv_clients) { + _get_recv_buff(recv_info, blocking_timeout_ms); + } + + // Release recv buffers + for (auto it = _recv_clients.begin(); it != _recv_clients.end();) { + frame_buff* buff; + bool disconnect; + + if (it->num_frames_in_use == it->frames_reserved.num_recv_frames) { + // If all buffers are in use, block to avoid excessive CPU usage + std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); + } else { + // Otherwise, just check current status + std::tie(buff, disconnect) = it->port->offload_thread_pop(); + } + + if (buff) { + _release_recv_buff(*it, buff); + } else if (disconnect) { + _disconnect_recv_client(*it); + it = _recv_clients.erase(it); // increments it + continue; + } + ++it; + } + } + + if (allow_send) { + // Get send buffers + for (auto& send_info : _send_clients) { + _get_send_buff(send_info); + } + + // Release send buffers + for (auto it = _send_clients.begin(); it != _send_clients.end();) { + if (it->num_frames_in_use > 0) { + frame_buff* buff; + bool disconnect; + std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms); + + if (buff) { + _release_send_buff(*it, buff); + } else if (disconnect) { + _disconnect_send_client(*it); + it = _send_clients.erase(it); // increments it + continue; + } + } + ++it; + } + } + + // Execute one client connect command per main loop iteration + // TODO: In a blocking I/O strategy, the loop can take a long time to + // service these requests. Need to configure all clients up-front, + // before starting the offload thread to avoid this. + if (_client_connect_queue.pop(client_req)) { + (*client_req.req)(); + delete client_req.req; + } + } +} + +}} // namespace uhd::transport diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 8a477b181..89d0926fa 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -231,6 +231,12 @@ UHD_ADD_NONAPI_TEST( ${CMAKE_SOURCE_DIR}/lib/transport/inline_io_service.cpp ) +UHD_ADD_NONAPI_TEST( + TARGET "offload_io_srv_test.cpp" + EXTRA_SOURCES + ${CMAKE_SOURCE_DIR}/lib/transport/offload_io_service.cpp +) + ######################################################################## # demo of a loadable module ######################################################################## diff --git a/host/tests/offload_io_srv_test.cpp b/host/tests/offload_io_srv_test.cpp new file mode 100644 index 000000000..99bd6dd53 --- /dev/null +++ b/host/tests/offload_io_srv_test.cpp @@ -0,0 +1,278 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "common/mock_link.hpp" +#include <uhdlib/transport/offload_io_service.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> +#include <atomic> + +using namespace uhd::transport; + +class mock_recv_io; +constexpr size_t FRAME_SIZE = 1000; + +static mock_send_link::sptr make_send_link(size_t num_frames) +{ + const mock_send_link::link_params params = {FRAME_SIZE, num_frames}; + return std::make_shared<mock_send_link>(params); +} + +static mock_recv_link::sptr make_recv_link(size_t num_frames) +{ + const mock_recv_link::link_params params = {FRAME_SIZE, num_frames}; + return std::make_shared<mock_recv_link>(params); +} + +class mock_recv_io : public recv_io_if +{ +public: + mock_recv_io(recv_link_if::sptr link) : _link(link) {} + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + if (_frames_allocated > 0) { + _frames_allocated--; + return _link->get_recv_buff(timeout_ms); + } + return nullptr; + } + + void release_recv_buff(frame_buff::uptr buff) + { + _link->release_recv_buff(std::move(buff)); + } + + size_t get_num_send_frames() const + { + return 0; + } + + size_t get_num_recv_frames() const + { + return _link->get_num_recv_frames(); + } + + void allocate_frames(const size_t num_frames) + { + _frames_allocated += num_frames; + } + +private: + std::atomic<size_t> _frames_allocated{0}; + recv_link_if::sptr _link; +}; + +class mock_send_io : public send_io_if +{ +public: + mock_send_io(send_link_if::sptr link) : _link(link) {} + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + return _link->get_send_buff(timeout_ms); + } + + void release_send_buff(frame_buff::uptr buff) + { + _link->release_send_buff(std::move(buff)); + } + + size_t get_num_send_frames() const + { + return _link->get_num_send_frames(); + } + + size_t get_num_recv_frames() const + { + return 0; + } + +private: + send_link_if::sptr _link; +}; + +class mock_io_service : public io_service +{ +public: + void attach_recv_link(recv_link_if::sptr /*link*/) {} + + void attach_send_link(send_link_if::sptr /*link*/) {} + + send_io_if::sptr make_send_client(send_link_if::sptr send_link, + size_t /*num_send_frames*/, + send_io_if::send_callback_t /*cb*/, + recv_link_if::sptr /*recv_link*/, + size_t /*num_recv_frames*/, + recv_callback_t /*recv_cb*/) + { + return std::make_shared<mock_send_io>(send_link); + } + + recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link, + size_t /*num_recv_frames*/, + recv_callback_t /*cb*/, + send_link_if::sptr /*fc_link*/, + size_t /*num_send_frames*/, + recv_io_if::fc_callback_t /*fc_cb*/) + { + auto io = std::make_shared<mock_recv_io>(recv_link); + _recv_io.push_back(io); + return io; + } + + void allocate_recv_frames(const size_t client_idx, const size_t num_frames) + { + assert(client_idx < _recv_io.size()); + _recv_io[client_idx]->allocate_frames(num_frames); + } + +private: + std::vector<std::shared_ptr<mock_recv_io>> _recv_io; +}; + +constexpr auto RECV_ONLY = offload_io_service::RECV_ONLY; +constexpr auto SEND_ONLY = offload_io_service::SEND_ONLY; +constexpr auto BOTH_SEND_AND_RECV = offload_io_service::BOTH_SEND_AND_RECV; + +constexpr auto POLL = offload_io_service::POLL; +constexpr auto BLOCK = offload_io_service::BLOCK; +using params_t = offload_io_service::params_t; + +std::vector<offload_io_service::wait_mode_t> wait_modes({POLL, BLOCK}); + +BOOST_AUTO_TEST_CASE(test_construction) +{ + for (const auto wait_mode : wait_modes) { + params_t params {{}, SEND_ONLY, wait_mode}; + auto mock_io_srv = std::make_shared<mock_io_service>(); + auto io_srv = offload_io_service::make(mock_io_srv, params_t()); + auto send_link = make_send_link(5); + io_srv->attach_send_link(send_link); + auto send_client = + io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr); + } + for (const auto wait_mode : wait_modes) { + params_t params {{}, RECV_ONLY, wait_mode}; + auto mock_io_srv = std::make_shared<mock_io_service>(); + auto io_srv = offload_io_service::make(mock_io_srv, params_t()); + auto recv_link = make_recv_link(5); + io_srv->attach_recv_link(recv_link); + auto recv_client = + io_srv->make_recv_client(recv_link, 5, nullptr, nullptr, 0, nullptr); + } +} + +BOOST_AUTO_TEST_CASE(test_construction_with_options) +{ + offload_io_service::params_t params; + params.cpu_affinity_list = {0}; + + auto mock_io_srv = std::make_shared<mock_io_service>(); + auto io_srv = offload_io_service::make(mock_io_srv, params); + auto send_link = make_send_link(5); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(5); + io_srv->attach_recv_link(recv_link); + auto send_client = + io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr); + auto recv_client = + io_srv->make_recv_client(recv_link, 5, nullptr, nullptr, 0, nullptr); +} + +BOOST_AUTO_TEST_CASE(test_send) +{ + for (const auto wait_mode : wait_modes) { + params_t params = {{}, SEND_ONLY, wait_mode}; + auto mock_io_srv = std::make_shared<mock_io_service>(); + auto io_srv = offload_io_service::make(mock_io_srv, params); + auto send_link = make_send_link(5); + io_srv->attach_send_link(send_link); + auto send_client = + io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr); + + for (size_t i = 0; i < 10; i++) { + auto buff = send_client->get_send_buff(100); + BOOST_CHECK(buff != nullptr); + send_client->release_send_buff(std::move(buff)); + } + send_client.reset(); + } +} + +BOOST_AUTO_TEST_CASE(test_recv) +{ + for (const auto wait_mode : wait_modes) { + params_t params = {{}, RECV_ONLY, wait_mode}; + auto mock_io_srv = std::make_shared<mock_io_service>(); + auto io_srv = offload_io_service::make(mock_io_srv, params); + auto recv_link = make_recv_link(5); + io_srv->attach_recv_link(recv_link); + + auto recv_client = + io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr); + + for (size_t i = 0; i < 10; i++) { + recv_link->push_back_recv_packet( + boost::shared_array<uint8_t>(new uint8_t[FRAME_SIZE]), FRAME_SIZE); + } + BOOST_CHECK(recv_client); + mock_io_srv->allocate_recv_frames(0, 10); + + for (size_t i = 0; i < 10; i++) { + auto buff = recv_client->get_recv_buff(100); + BOOST_CHECK(buff != nullptr); + recv_client->release_recv_buff(std::move(buff)); + } + recv_client.reset(); + } +} + +BOOST_AUTO_TEST_CASE(test_send_recv) +{ + auto mock_io_srv = std::make_shared<mock_io_service>(); + auto io_srv = offload_io_service::make(mock_io_srv, params_t()); + auto send_link = make_send_link(5); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(5); + io_srv->attach_recv_link(recv_link); + + auto send_client = + io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr); + auto recv_client = + io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr); + + for (size_t i = 0; i < 20; i++) { + recv_link->push_back_recv_packet( + boost::shared_array<uint8_t>(new uint8_t[FRAME_SIZE]), FRAME_SIZE); + } + + for (size_t i = 0; i < 10; i++) { + send_client->release_send_buff(send_client->get_send_buff(100)); + mock_io_srv->allocate_recv_frames(0, 1); + recv_client->release_recv_buff(recv_client->get_recv_buff(100)); + } + + auto recv_client2 = + io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr); + auto send_client2 = + io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr); + for (size_t i = 0; i < 5; i++) { + mock_io_srv->allocate_recv_frames(1, 1); + recv_client2->release_recv_buff(recv_client2->get_recv_buff(100)); + send_client2->release_send_buff(send_client2->get_send_buff(100)); + } + send_client2.reset(); + recv_client2.reset(); + + for (size_t i = 0; i < 5; i++) { + mock_io_srv->allocate_recv_frames(0, 1); + recv_client->release_recv_buff(recv_client->get_recv_buff(100)); + } + + send_client.reset(); + recv_client.reset(); +} |