aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-09-11 14:48:50 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:31 -0800
commit650c07cbcb74b88e1a561a85e25035e553e00f14 (patch)
tree4085d562123f3ea4e20e70a2b326dcfb3b3674da /host
parentf3a86a32944ae68047e6f64369e93a6830742601 (diff)
downloaduhd-650c07cbcb74b88e1a561a85e25035e553e00f14.tar.gz
uhd-650c07cbcb74b88e1a561a85e25035e553e00f14.tar.bz2
uhd-650c07cbcb74b88e1a561a85e25035e553e00f14.zip
transport: Implement an I/O service that uses an offload thread
The offload_io_service executes another I/O service instance within an offload thread, and provides synchronization mechanisms to communicate with clients. Frame buffers are passed from the offload thread to the client and back via single-producer, single-consumer queues.
Diffstat (limited to 'host')
-rw-r--r--host/lib/include/uhdlib/transport/offload_io_service.hpp66
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/offload_io_service.cpp998
-rw-r--r--host/tests/CMakeLists.txt6
-rw-r--r--host/tests/offload_io_srv_test.cpp278
5 files changed, 1349 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/offload_io_service.hpp b/host/lib/include/uhdlib/transport/offload_io_service.hpp
new file mode 100644
index 000000000..a7d9d211d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/offload_io_service.hpp
@@ -0,0 +1,66 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP
+#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP
+
+#include <uhdlib/transport/io_service.hpp>
+
+namespace uhd { namespace transport {
+
+/*!
+ * I/O service with offload thread
+ *
+ * Note: This I/O service can only be used with transports that allow releasing
+ * frame buffers out of order, since flow control packets are handled entirely
+ * within the offload thread.
+ */
+class offload_io_service : public io_service
+{
+public:
+ enum client_type_t
+ {
+ RECV_ONLY,
+ SEND_ONLY,
+ BOTH_SEND_AND_RECV
+ };
+
+ enum wait_mode_t
+ {
+ POLL,
+ BLOCK
+ };
+
+ /*!
+ * Options for configuring offload I/O service
+ */
+ struct params_t
+ {
+ //! Array of CPU numbers to which to affinitize the offload thread.
+ std::vector<size_t> cpu_affinity_list;
+ //! The types of client that the I/O service needs to support.
+ client_type_t client_type = BOTH_SEND_AND_RECV;
+ //! The thread behavior when waiting for incoming packets If set to
+ //! BLOCK, the client type must be set to either RECV_ONLY or SEND_ONLY.
+ wait_mode_t wait_mode = POLL;
+ };
+
+ /*!
+ * Creates an io service that offloads I/O to a worker thread and
+ * passes configuration parameters to it.
+ *
+ * \param io_srv The io service to perform the actual work in the worker
+ * thread.
+ * \param params Parameters to pass to the offload I/O service.
+ * \return A composite I/O service that executes the provided io service
+ * in its own thread.
+ */
+ static sptr make(io_service::sptr io_srv, const params_t& params);
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index d21644f01..d39ca7336 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -123,6 +123,7 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/offload_io_service.cpp
${CMAKE_CURRENT_SOURCE_DIR}/adapter.cpp
)
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
new file mode 100644
index 000000000..ed28a93f9
--- /dev/null
+++ b/host/lib/transport/offload_io_service.cpp
@@ -0,0 +1,998 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/thread.hpp>
+#include <uhdlib/transport/offload_io_service.hpp>
+#include <condition_variable>
+#include <unordered_map>
+#include <boost/lockfree/queue.hpp>
+#include <atomic>
+#include <chrono>
+#include <functional>
+#include <list>
+#include <memory>
+#include <thread>
+
+namespace uhd { namespace transport {
+
+namespace {
+
+constexpr int32_t blocking_timeout_ms = 100;
+
+// Struct to help keep track of frames reserved for each link
+struct frame_reservation_t
+{
+ recv_link_if::sptr recv_link;
+ size_t num_recv_frames = 0;
+ send_link_if::sptr send_link;
+ size_t num_send_frames = 0;
+};
+
+// Helper class to keep track of frames reserved for each link
+class frame_reservation_mgr
+{
+public:
+ void register_link(const recv_link_if::sptr& recv_link)
+ {
+ if (_recv_tbl[recv_link.get()] != 0) {
+ throw uhd::runtime_error("Recv link already attached to I/O service");
+ }
+ _recv_tbl[recv_link.get()] = 0;
+ }
+
+ void register_link(const send_link_if::sptr& send_link)
+ {
+ if (_send_tbl[send_link.get()] != 0) {
+ throw uhd::runtime_error("Send link already attached to I/O service");
+ }
+ _send_tbl[send_link.get()] = 0;
+ }
+
+ void reserve_frames(const frame_reservation_t& reservation)
+ {
+ if (reservation.recv_link) {
+ const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
+ const size_t capacity = reservation.recv_link->get_num_recv_frames();
+ if (rsvd_frames + reservation.num_recv_frames > capacity) {
+ throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity");
+ }
+
+ recv_link_if* link_ptr = reservation.recv_link.get();
+ _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames;
+ }
+
+ if (reservation.send_link) {
+ const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
+ const size_t capacity = reservation.send_link->get_num_send_frames();
+ if (rsvd_frames + reservation.num_send_frames > capacity) {
+ throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity");
+ }
+
+ send_link_if* link_ptr = reservation.send_link.get();
+ _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames;
+ }
+ }
+
+ void unreserve_frames(const frame_reservation_t& reservation)
+ {
+ if (reservation.recv_link) {
+ const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
+ recv_link_if* link_ptr = reservation.recv_link.get();
+ _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames;
+ }
+
+ if (reservation.send_link) {
+ const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
+ send_link_if* link_ptr = reservation.send_link.get();
+ _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames;
+ }
+ }
+
+private:
+ std::unordered_map<recv_link_if*, size_t> _recv_tbl;
+ std::unordered_map<send_link_if*, size_t> _send_tbl;
+};
+
+
+// Semaphore used in blocking I/O for offload thread
+class semaphore
+{
+public:
+ void notify() {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ _count++;
+ _cv.notify_one();
+ }
+
+ void wait() {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ _cv.wait(lock, [this]() { return this->_count != 0; });
+ _count--;
+ }
+
+ bool try_wait() {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ if (_count != 0) {
+ _count--;
+ return true;
+ }
+ return false;
+ }
+
+ bool wait_for(size_t timeout_ms) {
+ std::chrono::milliseconds timeout(timeout_ms);
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) {
+ _count--;
+ return true;
+ }
+ return false;
+ }
+
+ size_t count() {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ return _count;
+ }
+
+private:
+ std::condition_variable _cv;
+ std::mutex _cv_mutex;
+ size_t _count = 0;
+};
+
+// Fixed-size queue that supports blocking semantics
+template <typename queue_item_t>
+class offload_thread_queue {
+public:
+ offload_thread_queue(size_t size)
+ : _buffer(new queue_item_t[size])
+ , _capacity(size)
+ {
+ }
+
+ ~offload_thread_queue()
+ {
+ delete [] _buffer;
+ }
+
+ void push(const queue_item_t& item)
+ {
+ _buffer[_write_index++] = item;
+ _write_index %= _capacity;
+ _item_sem.notify();
+ }
+
+ bool pop(queue_item_t& item)
+ {
+ if (_item_sem.try_wait()) {
+ item = _buffer[_read_index++];
+ _read_index %= _capacity;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool pop(queue_item_t& item, int32_t timeout_ms)
+ {
+ if (_item_sem.wait_for(timeout_ms)) {
+ item = _buffer[_read_index++];
+ _read_index %= _capacity;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ size_t read_available()
+ {
+ return _item_sem.count();
+ }
+
+private:
+ queue_item_t* _buffer;
+ const size_t _capacity;
+
+ size_t _read_index = 0;
+ size_t _write_index = 0;
+
+ // Semaphore gating number of items available to read
+ semaphore _item_sem;
+};
+
+// Object that implements the communication between client and offload thread
+struct client_port_t
+{
+public:
+ using sptr = std::shared_ptr<client_port_t>;
+
+ client_port_t(size_t size)
+ : _from_offload_thread(size)
+ , _to_offload_thread(size + 1) // add one for disconnect command
+ {
+ }
+
+ //
+ // Client methods
+ //
+ frame_buff* client_pop()
+ {
+ from_offload_thread_t queue_element;
+ _from_offload_thread.pop(queue_element);
+ return queue_element.buff;
+ }
+
+ size_t client_read_available()
+ {
+ return _from_offload_thread.read_available();
+ }
+
+ void client_push(frame_buff* buff)
+ {
+ to_offload_thread_t queue_element{buff, false};
+ _to_offload_thread.push(queue_element);
+ }
+
+ void client_wait_until_connected()
+ {
+ std::unique_lock<std::mutex> lock(_connect_cv_mutex);
+ _connect_cv.wait(lock, [this]() { return _connected; });
+ }
+
+ void client_disconnect()
+ {
+ to_offload_thread_t queue_element{nullptr, true};
+ _to_offload_thread.push(queue_element);
+
+ // Need to wait for the disconnect to occur before returning, since the
+ // caller (the xport object) has callbacks installed in the inline I/O
+ // service. After this method returns, the caller can be deallocated.
+ std::unique_lock<std::mutex> lock(_connect_cv_mutex);
+ _connect_cv.wait(lock, [this]() { return !_connected; });
+ }
+
+ //
+ // Offload thread methods
+ //
+ void offload_thread_push(frame_buff* buff)
+ {
+ from_offload_thread_t queue_element{buff};
+ _from_offload_thread.push(queue_element);
+ }
+
+ std::tuple<frame_buff*, bool> offload_thread_pop()
+ {
+ to_offload_thread_t queue_element;
+ _to_offload_thread.pop(queue_element);
+ return std::make_tuple(queue_element.buff, queue_element.disconnect);
+ }
+
+ std::tuple<frame_buff*, bool> offload_thread_pop(int32_t timeout_ms)
+ {
+ to_offload_thread_t queue_element;
+ _to_offload_thread.pop(queue_element, timeout_ms);
+ return std::make_tuple(queue_element.buff, queue_element.disconnect);
+ }
+
+ void offload_thread_set_connected(const bool value)
+ {
+ {
+ std::lock_guard<std::mutex> lock(_connect_cv_mutex);
+ _connected = value;
+ }
+ _connect_cv.notify_one();
+ }
+
+ // Flush should only be called once the client is no longer accessing the
+ // queue going from the offload thread to the client, since it drains that
+ // queue from the offload thread.
+ template <typename fn_t>
+ size_t offload_thread_flush(fn_t f)
+ {
+ size_t count = 0;
+ from_offload_thread_t queue_element;
+ while (_from_offload_thread.pop(queue_element)) {
+ f(queue_element.buff);
+ count++;
+ }
+ return count;
+ }
+
+private:
+ // Queue for frame buffers coming from the offload thread
+ struct from_offload_thread_t
+ {
+ frame_buff* buff = nullptr;
+ };
+
+ using from_offload_thread_queue_t = offload_thread_queue<from_offload_thread_t>;
+
+ // Queue for frame buffers and disconnect requests to offload thread. Disconnect
+ // requests must be inline with incoming buffers to avoid any race conditions
+ // between the two.
+ struct to_offload_thread_t
+ {
+ frame_buff* buff = nullptr;
+ bool disconnect = false;
+ };
+
+ using to_offload_thread_queue_t = offload_thread_queue<to_offload_thread_t>;
+
+ // Queues to carry frame buffers in both directions
+ from_offload_thread_queue_t _from_offload_thread;
+ to_offload_thread_queue_t _to_offload_thread;
+
+ // Mutex and condition variable to wait for connect and disconnect
+ std::condition_variable _connect_cv;
+ std::mutex _connect_cv_mutex;
+ bool _connected = false;
+};
+
+} // namespace
+
+// Implementation of io service that executes an inline io service in an offload
+// thread. The offload thread communicates with send and recv clients using a
+// pair of spsc queues. One queue carries buffers from the offload thread to the
+// client, and the other carries buffers in the opposite direction.
+//
+// Requests to create new clients are handled using a separate mpsc queue. Client
+// requests to disconnect are sent in the same spsc queue as the buffers so that
+// they are processed only after all buffer release requestss have been processed.
+class offload_io_service_impl
+ : public offload_io_service,
+ public std::enable_shared_from_this<offload_io_service_impl>
+{
+public:
+ using sptr = std::shared_ptr<offload_io_service_impl>;
+
+ offload_io_service_impl(
+ io_service::sptr io_srv, const offload_io_service::params_t& params);
+ ~offload_io_service_impl();
+
+ void attach_recv_link(recv_link_if::sptr link);
+ void attach_send_link(send_link_if::sptr link);
+
+ recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t cb,
+ send_link_if::sptr fc_link,
+ size_t num_send_frames,
+ recv_io_if::fc_callback_t fc_cb);
+
+ send_io_if::sptr make_send_client(send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_io_if::send_callback_t send_cb,
+ recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb);
+
+private:
+ offload_io_service_impl(const offload_io_service_impl&) = delete;
+
+ // Queue for new client creation, multiple producers allowed. Requests are
+ // passed as heap-allocated pointers because boost lockfree queues require
+ // simple types.
+ struct client_req_t
+ {
+ std::function<void()>* req = nullptr;
+ };
+ using client_req_queue_t = boost::lockfree::queue<client_req_t>;
+
+ // Values used by offload thread for each client
+ struct recv_client_info_t
+ {
+ client_port_t::sptr port;
+ recv_io_if::sptr inline_io;
+ size_t num_frames_in_use = 0;
+ frame_reservation_t frames_reserved;
+ };
+ struct send_client_info_t
+ {
+ client_port_t::sptr port;
+ send_io_if::sptr inline_io;
+ size_t num_frames_in_use = 0;
+ frame_reservation_t frames_reserved;
+ };
+
+ void _get_recv_buff(recv_client_info_t& info, int32_t timeout_ms);
+ void _get_send_buff(send_client_info_t& info);
+ void _release_recv_buff(recv_client_info_t& info, frame_buff* buff);
+ void _release_send_buff(send_client_info_t& info, frame_buff* buff);
+ void _disconnect_recv_client(recv_client_info_t& info);
+ void _disconnect_send_client(send_client_info_t& info);
+
+ template <bool allow_recv, bool allow_send>
+ void _do_work_polling();
+
+ template <bool allow_recv, bool allow_send>
+ void _do_work_blocking();
+
+ // The I/O service that executes within the offload thread
+ io_service::sptr _io_srv;
+
+ // Type of clients supported by this I/O service
+ client_type_t _client_type;
+
+ // Offload thread, its stop flag, and thread-related parameters
+ std::unique_ptr<std::thread> _offload_thread;
+ std::atomic<bool> _stop_offload_thread{false};
+ offload_io_service::params_t _offload_thread_params;
+
+ // Lists of clients and their respective queues
+ std::list<recv_client_info_t> _recv_clients;
+ std::list<send_client_info_t> _send_clients;
+
+ // Queue for connect and disconnect client requests
+ client_req_queue_t _client_connect_queue;
+
+ // Keep track of frame reservations
+ frame_reservation_mgr _reservation_mgr;
+};
+
+class offload_recv_io : public recv_io_if
+{
+public:
+ offload_recv_io(offload_io_service_impl::sptr io_srv,
+ size_t num_recv_frames,
+ size_t num_send_frames,
+ client_port_t::sptr& port);
+
+ ~offload_recv_io();
+
+ frame_buff::uptr get_recv_buff(int32_t timeout_ms);
+ void release_recv_buff(frame_buff::uptr buff);
+
+private:
+ offload_recv_io() = delete;
+ offload_recv_io(const offload_recv_io&) = delete;
+
+ offload_io_service_impl::sptr _io_srv;
+ client_port_t::sptr _port;
+ size_t _num_frames_in_use = 0;
+};
+
+class offload_send_io : public send_io_if
+{
+public:
+ offload_send_io(offload_io_service_impl::sptr io_srv,
+ size_t num_recv_frames,
+ size_t num_send_frames,
+ client_port_t::sptr& port);
+
+ ~offload_send_io();
+
+ frame_buff::uptr get_send_buff(int32_t timeout_ms);
+ void release_send_buff(frame_buff::uptr buff);
+
+private:
+ offload_send_io() = delete;
+ offload_send_io(const offload_send_io&) = delete;
+
+ offload_io_service_impl::sptr _io_srv;
+ client_port_t::sptr _port;
+ size_t _num_frames_in_use = 0;
+};
+
+// Implementation of get_send_buff used below by send and recv clients
+template <typename pop_func_t>
+static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms)
+{
+ using namespace std::chrono;
+
+ if (timeout_ms == 0) {
+ return frame_buff::uptr(pop());
+ }
+
+ const auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+
+ bool last_check = false;
+
+ while (true) {
+ if (frame_buff* buff = pop()) {
+ return frame_buff::uptr(buff);
+ }
+
+ if (timeout_ms > 0 && steady_clock::now() > end_time) {
+ if (last_check) {
+ return nullptr;
+ } else {
+ last_check = true;
+ }
+ }
+ std::this_thread::yield();
+ }
+}
+
+//
+// offload_recv_io methods
+//
+offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv,
+ size_t num_recv_frames,
+ size_t num_send_frames,
+ client_port_t::sptr& port)
+ : _io_srv(io_srv), _port(port)
+{
+ _num_recv_frames = num_recv_frames;
+ _num_send_frames = num_send_frames;
+}
+
+offload_recv_io::~offload_recv_io()
+{
+ assert(_num_frames_in_use == 0);
+
+ if (_io_srv) {
+ _port->client_disconnect();
+ }
+}
+
+frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms)
+{
+ return client_get_buff(
+ [this]() {
+ frame_buff* buff = _port->client_pop();
+ _num_frames_in_use += buff ? 1 : 0;
+ return buff;
+ },
+ timeout_ms);
+}
+
+void offload_recv_io::release_recv_buff(frame_buff::uptr buff)
+{
+ assert(buff);
+ _port->client_push(buff.release());
+ _num_frames_in_use--;
+}
+
+//
+// offload_send_io methods
+//
+offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv,
+ size_t num_recv_frames,
+ size_t num_send_frames,
+ client_port_t::sptr& port)
+ : _io_srv(io_srv), _port(port)
+{
+ _num_recv_frames = num_recv_frames;
+ _num_send_frames = num_send_frames;
+}
+
+offload_send_io::~offload_send_io()
+{
+ assert(_num_frames_in_use == 0);
+
+ if (_io_srv) {
+ _port->client_disconnect();
+ }
+}
+
+frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms)
+{
+ return client_get_buff(
+ [this]() {
+ frame_buff* buff = _port->client_pop();
+ _num_frames_in_use += buff ? 1 : 0;
+ return buff;
+ },
+ timeout_ms);
+}
+
+void offload_send_io::release_send_buff(frame_buff::uptr buff)
+{
+ assert(buff);
+ _port->client_push(buff.release());
+ _num_frames_in_use--;
+}
+
+//
+// offload_io_service methods
+//
+offload_io_service::sptr offload_io_service::make(
+ io_service::sptr io_srv, const offload_io_service::params_t& params)
+{
+ return std::make_shared<offload_io_service_impl>(io_srv, params);
+}
+
+//
+// offload_io_service_impl methods
+//
+offload_io_service_impl::offload_io_service_impl(
+ io_service::sptr io_srv, const offload_io_service::params_t& params)
+ : _io_srv(io_srv)
+ , _offload_thread_params(params)
+ , _client_connect_queue(10) // arbitrary initial size
+{
+ if (params.wait_mode == BLOCK && params.client_type == BOTH_SEND_AND_RECV) {
+ throw uhd::value_error(
+ "An I/O service configured to block should only service either "
+ "send or recv clients to prevent one client type from starving "
+ "the other");
+ }
+
+ std::function<void()> thread_fn;
+
+ if (params.wait_mode == BLOCK) {
+ if (params.client_type == RECV_ONLY) {
+ thread_fn = [this]() { _do_work_blocking<true, false>(); };
+ } else if (params.client_type == SEND_ONLY) {
+ thread_fn = [this]() { _do_work_blocking<false, true>(); };
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ } else if (params.wait_mode == POLL) {
+ if (params.client_type == RECV_ONLY) {
+ thread_fn = [this]() { _do_work_polling<true, false>(); };
+ } else if (params.client_type == SEND_ONLY) {
+ thread_fn = [this]() { _do_work_polling<false, true>(); };
+ } else if (params.client_type == BOTH_SEND_AND_RECV) {
+ thread_fn = [this]() { _do_work_polling<true, true>(); };
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ } else {
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ _offload_thread = std::make_unique<std::thread>(thread_fn);
+}
+
+offload_io_service_impl::~offload_io_service_impl()
+{
+ _stop_offload_thread = true;
+
+ if (_offload_thread) {
+ _offload_thread->join();
+ }
+
+ assert(_recv_clients.empty());
+ assert(_send_clients.empty());
+}
+
+void offload_io_service_impl::attach_recv_link(recv_link_if::sptr link)
+{
+ // Create a request to attach link in the offload thread
+ auto req_fn = [this, link]() {
+ _reservation_mgr.register_link(link);
+ _io_srv->attach_recv_link(link);
+ };
+
+ client_req_t queue_element;
+ queue_element.req = {new std::function<void()>(req_fn)};
+ const bool success = _client_connect_queue.push(queue_element);
+ if (!success) {
+ throw uhd::runtime_error("Failed to push attach_recv_link request");
+ }
+}
+
+void offload_io_service_impl::attach_send_link(send_link_if::sptr link)
+{
+ // Create a request to attach link in the offload thread
+ auto req_fn = [this, link]() {
+ _reservation_mgr.register_link(link);
+ _io_srv->attach_send_link(link);
+ };
+
+ client_req_t queue_element;
+ queue_element.req = {new std::function<void()>(req_fn)};
+ const bool success = _client_connect_queue.push(queue_element);
+ if (!success) {
+ throw uhd::runtime_error("Failed to push attach_send_link request");
+ }
+}
+
+recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t cb,
+ send_link_if::sptr fc_link,
+ size_t num_send_frames,
+ recv_io_if::fc_callback_t fc_cb)
+{
+ UHD_ASSERT_THROW(_offload_thread);
+
+ if (_client_type == SEND_ONLY) {
+ throw uhd::runtime_error("Recv client not supported by this I/O service");
+ }
+
+ auto port = std::make_shared<client_port_t>(num_recv_frames);
+
+ // Create a request to create a new receiver in the offload thread
+ auto req_fn =
+ [this, recv_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb, port]() {
+ frame_reservation_t frames = {recv_link, num_recv_frames, fc_link, num_send_frames};
+ _reservation_mgr.reserve_frames(frames);
+
+ auto inline_recv_io = _io_srv->make_recv_client(
+ recv_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb);
+
+ recv_client_info_t client_info;
+ client_info.inline_io = inline_recv_io;
+ client_info.port = port;
+ client_info.frames_reserved = frames;
+
+ _recv_clients.push_back(client_info);
+
+ // Notify that the connection is created
+ port->offload_thread_set_connected(true);
+ };
+
+ client_req_t queue_element;
+ queue_element.req = {new std::function<void()>(req_fn)};
+ const bool success = _client_connect_queue.push(queue_element);
+ if (!success) {
+ throw uhd::runtime_error("Failed to push make_recv_client request");
+ }
+
+ port->client_wait_until_connected();
+
+ // Return a new recv client to the caller that just operates on the queues
+ return std::make_shared<offload_recv_io>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+}
+
+send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_io_if::send_callback_t send_cb,
+ recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb)
+{
+ UHD_ASSERT_THROW(_offload_thread);
+
+ if (_client_type == RECV_ONLY) {
+ throw uhd::runtime_error("Send client not supported by this I/O service");
+ }
+
+ auto port = std::make_shared<client_port_t>(num_send_frames);
+
+ // Create a request to create a new receiver in the offload thread
+ auto req_fn = [this,
+ send_link,
+ num_send_frames,
+ send_cb,
+ recv_link,
+ num_recv_frames,
+ recv_cb,
+ port]() {
+ frame_reservation_t frames = {recv_link, num_recv_frames, send_link, num_send_frames};
+ _reservation_mgr.reserve_frames(frames);
+
+ auto inline_send_io = _io_srv->make_send_client(
+ send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+
+ send_client_info_t client_info;
+ client_info.inline_io = inline_send_io;
+ client_info.port = port;
+ client_info.frames_reserved = frames;
+
+ _send_clients.push_back(client_info);
+
+ // Notify that the connection is created
+ port->offload_thread_set_connected(true);
+ };
+
+ client_req_t queue_element;
+ queue_element.req = {new std::function<void()>(req_fn)};
+ const bool success = _client_connect_queue.push(queue_element);
+ if (!success) {
+ throw uhd::runtime_error("Failed to push make_send_client request");
+ }
+
+ port->client_wait_until_connected();
+
+ // Wait for buffer queue to be full
+ while (port->client_read_available() != num_send_frames) {
+ std::this_thread::sleep_for(std::chrono::microseconds(100));
+ }
+
+ // Return a new recv client to the caller that just operates on the queues
+ return std::make_shared<offload_send_io>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+}
+
+// Get a single receive buffer if available and update client info
+void offload_io_service_impl::_get_recv_buff(recv_client_info_t& info, int32_t timeout_ms)
+{
+ if (info.num_frames_in_use < info.frames_reserved.num_recv_frames) {
+ if (frame_buff::uptr buff = info.inline_io->get_recv_buff(timeout_ms)) {
+ info.port->offload_thread_push(buff.release());
+ info.num_frames_in_use++;
+ }
+ }
+}
+
+// Get a single send buffer if available and update client info
+void offload_io_service_impl::_get_send_buff(send_client_info_t& info)
+{
+ if (info.num_frames_in_use < info.frames_reserved.num_send_frames) {
+ if (frame_buff::uptr buff = info.inline_io->get_send_buff(0)) {
+ info.port->offload_thread_push(buff.release());
+ info.num_frames_in_use++;
+ }
+ }
+}
+
+// Release a single recv buffer and update client info
+void offload_io_service_impl::_release_recv_buff(recv_client_info_t& info, frame_buff* buff)
+{
+ info.inline_io->release_recv_buff(frame_buff::uptr(buff));
+ assert(info.num_frames_in_use > 0);
+ info.num_frames_in_use--;
+}
+
+// Release a single send info
+void offload_io_service_impl::_release_send_buff(send_client_info_t& info, frame_buff* buff)
+{
+ info.inline_io->release_send_buff(frame_buff::uptr(buff));
+ assert(info.num_frames_in_use > 0);
+ info.num_frames_in_use--;
+}
+
+// Flush client queues and unreserve its frames
+void offload_io_service_impl::_disconnect_recv_client(recv_client_info_t& info)
+{
+ auto release_buff = [&info](frame_buff* buff) {
+ info.inline_io->release_recv_buff(frame_buff::uptr(buff));
+ };
+
+ info.num_frames_in_use -= info.port->offload_thread_flush(release_buff);
+ assert(info.num_frames_in_use == 0);
+ _reservation_mgr.unreserve_frames(info.frames_reserved);
+
+ // Client waits for a notification after requesting disconnect, so notify it
+ info.port->offload_thread_set_connected(false);
+}
+
+// Flush client queues and unreserve its frames
+void offload_io_service_impl::_disconnect_send_client(send_client_info_t& info)
+{
+ auto release_buff = [&info](frame_buff* buff) {
+ info.inline_io->release_send_buff(frame_buff::uptr(buff));
+ };
+ info.num_frames_in_use -= info.port->offload_thread_flush(release_buff);
+ assert(info.num_frames_in_use == 0);
+ _reservation_mgr.unreserve_frames(info.frames_reserved);
+
+ // Client waits for a notification after requesting disconnect, so notify it
+ info.port->offload_thread_set_connected(false);
+}
+
+template <bool allow_recv, bool allow_send>
+void offload_io_service_impl::_do_work_polling()
+{
+ uhd::set_thread_affinity(_offload_thread_params.cpu_affinity_list);
+
+ client_req_t client_req;
+
+ while (!_stop_offload_thread) {
+ if (allow_recv) {
+ // Get recv buffers
+ for (auto& recv_info : _recv_clients) {
+ _get_recv_buff(recv_info, 0);
+ }
+
+ // Release recv buffers
+ for (auto it = _recv_clients.begin(); it != _recv_clients.end();) {
+ frame_buff* buff;
+ bool disconnect;
+ std::tie(buff, disconnect) = it->port->offload_thread_pop();
+ if (buff) {
+ _release_recv_buff(*it, buff);
+ } else if (disconnect) {
+ _disconnect_recv_client(*it);
+ it = _recv_clients.erase(it); // increments it
+ continue;
+ }
+ ++it;
+ }
+ }
+
+ if (allow_send) {
+ // Get send buffers
+ for (auto& send_info : _send_clients) {
+ _get_send_buff(send_info);
+ }
+
+ // Release send buffers
+ for (auto it = _send_clients.begin(); it != _send_clients.end();) {
+ frame_buff* buff;
+ bool disconnect;
+ std::tie(buff, disconnect) = it->port->offload_thread_pop();
+ if (buff) {
+ _release_send_buff(*it, buff);
+ } else if (disconnect) {
+ _disconnect_send_client(*it);
+ it = _send_clients.erase(it); // increments it
+ continue;
+ }
+ ++it;
+ }
+ }
+
+ // Execute one client connect command per main loop iteration
+ if (_client_connect_queue.pop(client_req)) {
+ (*client_req.req)();
+ delete client_req.req;
+ }
+ }
+}
+
+template <bool allow_recv, bool allow_send>
+void offload_io_service_impl::_do_work_blocking()
+{
+ uhd::set_thread_affinity(_offload_thread_params.cpu_affinity_list);
+
+ client_req_t client_req;
+
+ while (!_stop_offload_thread) {
+ if (allow_recv) {
+ // Get recv buffers
+ for (auto& recv_info : _recv_clients) {
+ _get_recv_buff(recv_info, blocking_timeout_ms);
+ }
+
+ // Release recv buffers
+ for (auto it = _recv_clients.begin(); it != _recv_clients.end();) {
+ frame_buff* buff;
+ bool disconnect;
+
+ if (it->num_frames_in_use == it->frames_reserved.num_recv_frames) {
+ // If all buffers are in use, block to avoid excessive CPU usage
+ std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms);
+ } else {
+ // Otherwise, just check current status
+ std::tie(buff, disconnect) = it->port->offload_thread_pop();
+ }
+
+ if (buff) {
+ _release_recv_buff(*it, buff);
+ } else if (disconnect) {
+ _disconnect_recv_client(*it);
+ it = _recv_clients.erase(it); // increments it
+ continue;
+ }
+ ++it;
+ }
+ }
+
+ if (allow_send) {
+ // Get send buffers
+ for (auto& send_info : _send_clients) {
+ _get_send_buff(send_info);
+ }
+
+ // Release send buffers
+ for (auto it = _send_clients.begin(); it != _send_clients.end();) {
+ if (it->num_frames_in_use > 0) {
+ frame_buff* buff;
+ bool disconnect;
+ std::tie(buff, disconnect) = it->port->offload_thread_pop(blocking_timeout_ms);
+
+ if (buff) {
+ _release_send_buff(*it, buff);
+ } else if (disconnect) {
+ _disconnect_send_client(*it);
+ it = _send_clients.erase(it); // increments it
+ continue;
+ }
+ }
+ ++it;
+ }
+ }
+
+ // Execute one client connect command per main loop iteration
+ // TODO: In a blocking I/O strategy, the loop can take a long time to
+ // service these requests. Need to configure all clients up-front,
+ // before starting the offload thread to avoid this.
+ if (_client_connect_queue.pop(client_req)) {
+ (*client_req.req)();
+ delete client_req.req;
+ }
+ }
+}
+
+}} // namespace uhd::transport
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index 8a477b181..89d0926fa 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -231,6 +231,12 @@ UHD_ADD_NONAPI_TEST(
${CMAKE_SOURCE_DIR}/lib/transport/inline_io_service.cpp
)
+UHD_ADD_NONAPI_TEST(
+ TARGET "offload_io_srv_test.cpp"
+ EXTRA_SOURCES
+ ${CMAKE_SOURCE_DIR}/lib/transport/offload_io_service.cpp
+)
+
########################################################################
# demo of a loadable module
########################################################################
diff --git a/host/tests/offload_io_srv_test.cpp b/host/tests/offload_io_srv_test.cpp
new file mode 100644
index 000000000..99bd6dd53
--- /dev/null
+++ b/host/tests/offload_io_srv_test.cpp
@@ -0,0 +1,278 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "common/mock_link.hpp"
+#include <uhdlib/transport/offload_io_service.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+#include <atomic>
+
+using namespace uhd::transport;
+
+class mock_recv_io;
+constexpr size_t FRAME_SIZE = 1000;
+
+static mock_send_link::sptr make_send_link(size_t num_frames)
+{
+ const mock_send_link::link_params params = {FRAME_SIZE, num_frames};
+ return std::make_shared<mock_send_link>(params);
+}
+
+static mock_recv_link::sptr make_recv_link(size_t num_frames)
+{
+ const mock_recv_link::link_params params = {FRAME_SIZE, num_frames};
+ return std::make_shared<mock_recv_link>(params);
+}
+
+class mock_recv_io : public recv_io_if
+{
+public:
+ mock_recv_io(recv_link_if::sptr link) : _link(link) {}
+
+ frame_buff::uptr get_recv_buff(int32_t timeout_ms)
+ {
+ if (_frames_allocated > 0) {
+ _frames_allocated--;
+ return _link->get_recv_buff(timeout_ms);
+ }
+ return nullptr;
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ _link->release_recv_buff(std::move(buff));
+ }
+
+ size_t get_num_send_frames() const
+ {
+ return 0;
+ }
+
+ size_t get_num_recv_frames() const
+ {
+ return _link->get_num_recv_frames();
+ }
+
+ void allocate_frames(const size_t num_frames)
+ {
+ _frames_allocated += num_frames;
+ }
+
+private:
+ std::atomic<size_t> _frames_allocated{0};
+ recv_link_if::sptr _link;
+};
+
+class mock_send_io : public send_io_if
+{
+public:
+ mock_send_io(send_link_if::sptr link) : _link(link) {}
+
+ frame_buff::uptr get_send_buff(int32_t timeout_ms)
+ {
+ return _link->get_send_buff(timeout_ms);
+ }
+
+ void release_send_buff(frame_buff::uptr buff)
+ {
+ _link->release_send_buff(std::move(buff));
+ }
+
+ size_t get_num_send_frames() const
+ {
+ return _link->get_num_send_frames();
+ }
+
+ size_t get_num_recv_frames() const
+ {
+ return 0;
+ }
+
+private:
+ send_link_if::sptr _link;
+};
+
+class mock_io_service : public io_service
+{
+public:
+ void attach_recv_link(recv_link_if::sptr /*link*/) {}
+
+ void attach_send_link(send_link_if::sptr /*link*/) {}
+
+ send_io_if::sptr make_send_client(send_link_if::sptr send_link,
+ size_t /*num_send_frames*/,
+ send_io_if::send_callback_t /*cb*/,
+ recv_link_if::sptr /*recv_link*/,
+ size_t /*num_recv_frames*/,
+ recv_callback_t /*recv_cb*/)
+ {
+ return std::make_shared<mock_send_io>(send_link);
+ }
+
+ recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link,
+ size_t /*num_recv_frames*/,
+ recv_callback_t /*cb*/,
+ send_link_if::sptr /*fc_link*/,
+ size_t /*num_send_frames*/,
+ recv_io_if::fc_callback_t /*fc_cb*/)
+ {
+ auto io = std::make_shared<mock_recv_io>(recv_link);
+ _recv_io.push_back(io);
+ return io;
+ }
+
+ void allocate_recv_frames(const size_t client_idx, const size_t num_frames)
+ {
+ assert(client_idx < _recv_io.size());
+ _recv_io[client_idx]->allocate_frames(num_frames);
+ }
+
+private:
+ std::vector<std::shared_ptr<mock_recv_io>> _recv_io;
+};
+
+constexpr auto RECV_ONLY = offload_io_service::RECV_ONLY;
+constexpr auto SEND_ONLY = offload_io_service::SEND_ONLY;
+constexpr auto BOTH_SEND_AND_RECV = offload_io_service::BOTH_SEND_AND_RECV;
+
+constexpr auto POLL = offload_io_service::POLL;
+constexpr auto BLOCK = offload_io_service::BLOCK;
+using params_t = offload_io_service::params_t;
+
+std::vector<offload_io_service::wait_mode_t> wait_modes({POLL, BLOCK});
+
+BOOST_AUTO_TEST_CASE(test_construction)
+{
+ for (const auto wait_mode : wait_modes) {
+ params_t params {{}, SEND_ONLY, wait_mode};
+ auto mock_io_srv = std::make_shared<mock_io_service>();
+ auto io_srv = offload_io_service::make(mock_io_srv, params_t());
+ auto send_link = make_send_link(5);
+ io_srv->attach_send_link(send_link);
+ auto send_client =
+ io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr);
+ }
+ for (const auto wait_mode : wait_modes) {
+ params_t params {{}, RECV_ONLY, wait_mode};
+ auto mock_io_srv = std::make_shared<mock_io_service>();
+ auto io_srv = offload_io_service::make(mock_io_srv, params_t());
+ auto recv_link = make_recv_link(5);
+ io_srv->attach_recv_link(recv_link);
+ auto recv_client =
+ io_srv->make_recv_client(recv_link, 5, nullptr, nullptr, 0, nullptr);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_construction_with_options)
+{
+ offload_io_service::params_t params;
+ params.cpu_affinity_list = {0};
+
+ auto mock_io_srv = std::make_shared<mock_io_service>();
+ auto io_srv = offload_io_service::make(mock_io_srv, params);
+ auto send_link = make_send_link(5);
+ io_srv->attach_send_link(send_link);
+ auto recv_link = make_recv_link(5);
+ io_srv->attach_recv_link(recv_link);
+ auto send_client =
+ io_srv->make_send_client(send_link, 5, nullptr, nullptr, 0, nullptr);
+ auto recv_client =
+ io_srv->make_recv_client(recv_link, 5, nullptr, nullptr, 0, nullptr);
+}
+
+BOOST_AUTO_TEST_CASE(test_send)
+{
+ for (const auto wait_mode : wait_modes) {
+ params_t params = {{}, SEND_ONLY, wait_mode};
+ auto mock_io_srv = std::make_shared<mock_io_service>();
+ auto io_srv = offload_io_service::make(mock_io_srv, params);
+ auto send_link = make_send_link(5);
+ io_srv->attach_send_link(send_link);
+ auto send_client =
+ io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr);
+
+ for (size_t i = 0; i < 10; i++) {
+ auto buff = send_client->get_send_buff(100);
+ BOOST_CHECK(buff != nullptr);
+ send_client->release_send_buff(std::move(buff));
+ }
+ send_client.reset();
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv)
+{
+ for (const auto wait_mode : wait_modes) {
+ params_t params = {{}, RECV_ONLY, wait_mode};
+ auto mock_io_srv = std::make_shared<mock_io_service>();
+ auto io_srv = offload_io_service::make(mock_io_srv, params);
+ auto recv_link = make_recv_link(5);
+ io_srv->attach_recv_link(recv_link);
+
+ auto recv_client =
+ io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr);
+
+ for (size_t i = 0; i < 10; i++) {
+ recv_link->push_back_recv_packet(
+ boost::shared_array<uint8_t>(new uint8_t[FRAME_SIZE]), FRAME_SIZE);
+ }
+ BOOST_CHECK(recv_client);
+ mock_io_srv->allocate_recv_frames(0, 10);
+
+ for (size_t i = 0; i < 10; i++) {
+ auto buff = recv_client->get_recv_buff(100);
+ BOOST_CHECK(buff != nullptr);
+ recv_client->release_recv_buff(std::move(buff));
+ }
+ recv_client.reset();
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_recv)
+{
+ auto mock_io_srv = std::make_shared<mock_io_service>();
+ auto io_srv = offload_io_service::make(mock_io_srv, params_t());
+ auto send_link = make_send_link(5);
+ io_srv->attach_send_link(send_link);
+ auto recv_link = make_recv_link(5);
+ io_srv->attach_recv_link(recv_link);
+
+ auto send_client =
+ io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr);
+ auto recv_client =
+ io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr);
+
+ for (size_t i = 0; i < 20; i++) {
+ recv_link->push_back_recv_packet(
+ boost::shared_array<uint8_t>(new uint8_t[FRAME_SIZE]), FRAME_SIZE);
+ }
+
+ for (size_t i = 0; i < 10; i++) {
+ send_client->release_send_buff(send_client->get_send_buff(100));
+ mock_io_srv->allocate_recv_frames(0, 1);
+ recv_client->release_recv_buff(recv_client->get_recv_buff(100));
+ }
+
+ auto recv_client2 =
+ io_srv->make_recv_client(recv_link, 1, nullptr, nullptr, 0, nullptr);
+ auto send_client2 =
+ io_srv->make_send_client(send_link, 1, nullptr, nullptr, 0, nullptr);
+ for (size_t i = 0; i < 5; i++) {
+ mock_io_srv->allocate_recv_frames(1, 1);
+ recv_client2->release_recv_buff(recv_client2->get_recv_buff(100));
+ send_client2->release_send_buff(send_client2->get_send_buff(100));
+ }
+ send_client2.reset();
+ recv_client2.reset();
+
+ for (size_t i = 0; i < 5; i++) {
+ mock_io_srv->allocate_recv_frames(0, 1);
+ recv_client->release_recv_buff(recv_client->get_recv_buff(100));
+ }
+
+ send_client.reset();
+ recv_client.reset();
+}