diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp | 110 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service_client.hpp | 158 | ||||
-rw-r--r-- | host/lib/include/uhdlib/utils/semaphore.hpp | 71 | ||||
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 309 |
4 files changed, 351 insertions, 297 deletions
diff --git a/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp new file mode 100644 index 000000000..f0dd853a4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp @@ -0,0 +1,110 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP +#define INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <unordered_map> + +namespace uhd { namespace transport { + +/*! + * Helper class to keep track of the number of frames reserved from a pair of links + */ +class frame_reservation_mgr +{ +public: + struct frame_reservation_t + { + recv_link_if::sptr recv_link; + size_t num_recv_frames = 0; + send_link_if::sptr send_link; + size_t num_send_frames = 0; + }; + + void register_link(const recv_link_if::sptr& recv_link) + { + if (_recv_tbl[recv_link.get()] != 0) { + throw uhd::runtime_error("Recv link already attached to I/O service"); + } + _recv_tbl[recv_link.get()] = 0; + } + + void register_link(const send_link_if::sptr& send_link) + { + if (_send_tbl[send_link.get()] != 0) { + throw uhd::runtime_error("Send link already attached to I/O service"); + } + _send_tbl[send_link.get()] = 0; + } + + void unregister_link(const recv_link_if::sptr& recv_link) + { + auto link_ptr = recv_link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); + _recv_tbl.erase(link_ptr); + } + + void unregister_link(const send_link_if::sptr& send_link) + { + auto link_ptr = send_link.get(); + UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); + _send_tbl.erase(link_ptr); + } + + void reserve_frames(const frame_reservation_t& reservation) + { + if (reservation.recv_link) { + const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); + const size_t capacity = reservation.recv_link->get_num_recv_frames(); + if (rsvd_frames + reservation.num_recv_frames > capacity) { + throw uhd::runtime_error( + "Number of frames requested exceeds link recv frame capacity"); + } + + recv_link_if* link_ptr = reservation.recv_link.get(); + _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames; + } + + if (reservation.send_link) { + const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); + const size_t capacity = reservation.send_link->get_num_send_frames(); + if (rsvd_frames + reservation.num_send_frames > capacity) { + throw uhd::runtime_error( + "Number of frames requested exceeds link send frame capacity"); + } + + send_link_if* link_ptr = reservation.send_link.get(); + _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames; + } + } + + void unreserve_frames(const frame_reservation_t& reservation) + { + if (reservation.recv_link) { + const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); + recv_link_if* link_ptr = reservation.recv_link.get(); + _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames; + } + + if (reservation.send_link) { + const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); + send_link_if* link_ptr = reservation.send_link.get(); + _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames; + } + } + +private: + std::unordered_map<recv_link_if*, size_t> _recv_tbl; + std::unordered_map<send_link_if*, size_t> _send_tbl; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP */ diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp new file mode 100644 index 000000000..620e796ef --- /dev/null +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -0,0 +1,158 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP +#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP + +#include <uhd/transport/frame_buff.hpp> +#include <chrono> +#include <thread> + +namespace uhd { namespace transport { + +namespace detail { + +// Implementation of get_send_buff used below by send and recv clients +template <typename pop_func_t> +static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) +{ + using namespace std::chrono; + + if (timeout_ms == 0) { + return frame_buff::uptr(pop()); + } + + const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + bool last_check = false; + + while (true) { + if (frame_buff* buff = pop()) { + return frame_buff::uptr(buff); + } + + if (timeout_ms > 0 && steady_clock::now() > end_time) { + if (last_check) { + return nullptr; + } else { + last_check = true; + } + } + std::this_thread::yield(); + } +} + +} // namespace detail + +/*! + * Recv I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_recv_io : public recv_io_if +{ +public: + offload_recv_io(typename io_service_t::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + typename io_service_t::client_port_t::sptr& port) + : _io_srv(io_srv), _port(port) + { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; + } + + ~offload_recv_io() + { + assert(_num_frames_in_use == 0); + + if (_io_srv) { + _port->client_disconnect(); + } + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + return detail::client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); + } + + void release_recv_buff(frame_buff::uptr buff) + { + assert(buff); + _port->client_push(buff.release()); + _num_frames_in_use--; + } + +private: + offload_recv_io() = delete; + offload_recv_io(const offload_recv_io&) = delete; + + typename io_service_t::sptr _io_srv; + typename io_service_t::client_port_t::sptr _port; + size_t _num_frames_in_use = 0; +}; + +/*! + * Send I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_send_io : public send_io_if +{ +public: + offload_send_io(typename io_service_t::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + typename io_service_t::client_port_t::sptr& port) + : _io_srv(io_srv), _port(port) + { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; + } + + ~offload_send_io() + { + assert(_num_frames_in_use == 0); + + if (_io_srv) { + _port->client_disconnect(); + } + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + return detail::client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); + } + + void release_send_buff(frame_buff::uptr buff) + { + assert(buff); + _port->client_push(buff.release()); + _num_frames_in_use--; + } + +private: + offload_send_io() = delete; + offload_send_io(const offload_send_io&) = delete; + + typename io_service_t::sptr _io_srv; + typename io_service_t::client_port_t::sptr _port; + size_t _num_frames_in_use = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP */ diff --git a/host/lib/include/uhdlib/utils/semaphore.hpp b/host/lib/include/uhdlib/utils/semaphore.hpp new file mode 100644 index 000000000..ae77ed102 --- /dev/null +++ b/host/lib/include/uhdlib/utils/semaphore.hpp @@ -0,0 +1,71 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <condition_variable> +#include <chrono> +#include <mutex> + +#ifndef INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP +#define INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP + +namespace uhd { + +/*! + * A sempahore built using std::condition_variable + */ +class semaphore +{ +public: + void notify() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + _count++; + _cv.notify_one(); + } + + void wait() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + _cv.wait(lock, [this]() { return this->_count != 0; }); + _count--; + } + + bool try_wait() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + if (_count != 0) { + _count--; + return true; + } + return false; + } + + bool wait_for(size_t timeout_ms) + { + std::chrono::milliseconds timeout(timeout_ms); + std::unique_lock<std::mutex> lock(_cv_mutex); + if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { + _count--; + return true; + } + return false; + } + + size_t count() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + return _count; + } + +private: + std::condition_variable _cv; + std::mutex _cv_mutex; + size_t _count = 0; +}; + +} // namespace uhd + +#endif /* INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP */ diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index 012c86868..c9b9af344 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -6,11 +6,12 @@ #include <uhd/config.hpp> #include <uhd/exception.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/thread.hpp> +#include <uhdlib/transport/frame_reservation_mgr.hpp> #include <uhdlib/transport/offload_io_service.hpp> +#include <uhdlib/transport/offload_io_service_client.hpp> +#include <uhdlib/utils/semaphore.hpp> #include <condition_variable> -#include <unordered_map> #include <boost/lockfree/queue.hpp> #include <atomic> #include <chrono> @@ -25,141 +26,6 @@ namespace { constexpr int32_t blocking_timeout_ms = 100; -// Struct to help keep track of frames reserved for each link -struct frame_reservation_t -{ - recv_link_if::sptr recv_link; - size_t num_recv_frames = 0; - send_link_if::sptr send_link; - size_t num_send_frames = 0; -}; - -// Helper class to keep track of frames reserved for each link -class frame_reservation_mgr -{ -public: - void register_link(const recv_link_if::sptr& recv_link) - { - if (_recv_tbl[recv_link.get()] != 0) { - throw uhd::runtime_error("Recv link already attached to I/O service"); - } - _recv_tbl[recv_link.get()] = 0; - } - - void register_link(const send_link_if::sptr& send_link) - { - if (_send_tbl[send_link.get()] != 0) { - throw uhd::runtime_error("Send link already attached to I/O service"); - } - _send_tbl[send_link.get()] = 0; - } - - void unregister_link(const recv_link_if::sptr& recv_link) - { - auto link_ptr = recv_link.get(); - UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); - _recv_tbl.erase(link_ptr); - } - - void unregister_link(const send_link_if::sptr& send_link) - { - auto link_ptr = send_link.get(); - UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); - _send_tbl.erase(link_ptr); - } - - void reserve_frames(const frame_reservation_t& reservation) - { - if (reservation.recv_link) { - const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); - const size_t capacity = reservation.recv_link->get_num_recv_frames(); - if (rsvd_frames + reservation.num_recv_frames > capacity) { - throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity"); - } - - recv_link_if* link_ptr = reservation.recv_link.get(); - _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames; - } - - if (reservation.send_link) { - const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); - const size_t capacity = reservation.send_link->get_num_send_frames(); - if (rsvd_frames + reservation.num_send_frames > capacity) { - throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity"); - } - - send_link_if* link_ptr = reservation.send_link.get(); - _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames; - } - } - - void unreserve_frames(const frame_reservation_t& reservation) - { - if (reservation.recv_link) { - const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); - recv_link_if* link_ptr = reservation.recv_link.get(); - _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames; - } - - if (reservation.send_link) { - const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); - send_link_if* link_ptr = reservation.send_link.get(); - _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames; - } - } - -private: - std::unordered_map<recv_link_if*, size_t> _recv_tbl; - std::unordered_map<send_link_if*, size_t> _send_tbl; -}; - - -// Semaphore used in blocking I/O for offload thread -class semaphore -{ -public: - void notify() { - std::unique_lock<std::mutex> lock(_cv_mutex); - _count++; - _cv.notify_one(); - } - - void wait() { - std::unique_lock<std::mutex> lock(_cv_mutex); - _cv.wait(lock, [this]() { return this->_count != 0; }); - _count--; - } - - bool try_wait() { - std::unique_lock<std::mutex> lock(_cv_mutex); - if (_count != 0) { - _count--; - return true; - } - return false; - } - - bool wait_for(size_t timeout_ms) { - std::chrono::milliseconds timeout(timeout_ms); - std::unique_lock<std::mutex> lock(_cv_mutex); - if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { - _count--; - return true; - } - return false; - } - - size_t count() { - std::unique_lock<std::mutex> lock(_cv_mutex); - return _count; - } - -private: - std::condition_variable _cv; - std::mutex _cv_mutex; - size_t _count = 0; -}; - // Fixed-size queue that supports blocking semantics template <typename queue_item_t> class offload_thread_queue { @@ -221,12 +87,12 @@ private: }; // Object that implements the communication between client and offload thread -struct client_port_t +struct client_port_impl_t { public: - using sptr = std::shared_ptr<client_port_t>; + using sptr = std::shared_ptr<client_port_impl_t>; - client_port_t(size_t size) + client_port_impl_t(size_t size) : _from_offload_thread(size) , _to_offload_thread(size + 1) // add one for disconnect command { @@ -363,7 +229,8 @@ class offload_io_service_impl public std::enable_shared_from_this<offload_io_service_impl> { public: - using sptr = std::shared_ptr<offload_io_service_impl>; + using sptr = std::shared_ptr<offload_io_service_impl>; + using client_port_t = client_port_impl_t; offload_io_service_impl( io_service::sptr io_srv, const offload_io_service::params_t& params); @@ -392,6 +259,8 @@ public: private: offload_io_service_impl(const offload_io_service_impl&) = delete; + using frame_reservation_t = frame_reservation_mgr::frame_reservation_t; + // Queue for new client creation, multiple producers allowed. Requests are // passed as heap-allocated pointers because boost lockfree queues require // simple types. @@ -453,160 +322,6 @@ private: frame_reservation_mgr _reservation_mgr; }; -class offload_recv_io : public recv_io_if -{ -public: - offload_recv_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port); - - ~offload_recv_io(); - - frame_buff::uptr get_recv_buff(int32_t timeout_ms); - void release_recv_buff(frame_buff::uptr buff); - -private: - offload_recv_io() = delete; - offload_recv_io(const offload_recv_io&) = delete; - - offload_io_service_impl::sptr _io_srv; - client_port_t::sptr _port; - size_t _num_frames_in_use = 0; -}; - -class offload_send_io : public send_io_if -{ -public: - offload_send_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port); - - ~offload_send_io(); - - frame_buff::uptr get_send_buff(int32_t timeout_ms); - void release_send_buff(frame_buff::uptr buff); - -private: - offload_send_io() = delete; - offload_send_io(const offload_send_io&) = delete; - - offload_io_service_impl::sptr _io_srv; - client_port_t::sptr _port; - size_t _num_frames_in_use = 0; -}; - -// Implementation of get_send_buff used below by send and recv clients -template <typename pop_func_t> -static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) -{ - using namespace std::chrono; - - if (timeout_ms == 0) { - return frame_buff::uptr(pop()); - } - - const auto end_time = steady_clock::now() + milliseconds(timeout_ms); - - bool last_check = false; - - while (true) { - if (frame_buff* buff = pop()) { - return frame_buff::uptr(buff); - } - - if (timeout_ms > 0 && steady_clock::now() > end_time) { - if (last_check) { - return nullptr; - } else { - last_check = true; - } - } - std::this_thread::yield(); - } -} - -// -// offload_recv_io methods -// -offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port) - : _io_srv(io_srv), _port(port) -{ - _num_recv_frames = num_recv_frames; - _num_send_frames = num_send_frames; -} - -offload_recv_io::~offload_recv_io() -{ - assert(_num_frames_in_use == 0); - - if (_io_srv) { - _port->client_disconnect(); - } -} - -frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms) -{ - return client_get_buff( - [this]() { - frame_buff* buff = _port->client_pop(); - _num_frames_in_use += buff ? 1 : 0; - return buff; - }, - timeout_ms); -} - -void offload_recv_io::release_recv_buff(frame_buff::uptr buff) -{ - assert(buff); - _port->client_push(buff.release()); - _num_frames_in_use--; -} - -// -// offload_send_io methods -// -offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port) - : _io_srv(io_srv), _port(port) -{ - _num_recv_frames = num_recv_frames; - _num_send_frames = num_send_frames; -} - -offload_send_io::~offload_send_io() -{ - assert(_num_frames_in_use == 0); - - if (_io_srv) { - _port->client_disconnect(); - } -} - -frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms) -{ - return client_get_buff( - [this]() { - frame_buff* buff = _port->client_pop(); - _num_frames_in_use += buff ? 1 : 0; - return buff; - }, - timeout_ms); -} - -void offload_send_io::release_send_buff(frame_buff::uptr buff) -{ - assert(buff); - _port->client_push(buff.release()); - _num_frames_in_use--; -} - // // offload_io_service methods // @@ -759,7 +474,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re port->client_wait_until_connected(); // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_recv_io>( + return std::make_shared<offload_recv_io<offload_io_service_impl>>( shared_from_this(), num_recv_frames, num_send_frames, port); } @@ -813,7 +528,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se } // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_send_io>( + return std::make_shared<offload_send_io<offload_io_service_impl>>( shared_from_this(), num_recv_frames, num_send_frames, port); } |