diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-10-23 09:36:27 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:32 -0800 |
commit | 1cd874911db0d4c0463264edd0d46067d072eccb (patch) | |
tree | 24c99f0d0ef05041614c0795e75067b9bae50ead /host/lib/include/uhdlib | |
parent | 3e2cceeaed7c3a2a70318dab695d6adb9ee5602e (diff) | |
download | uhd-1cd874911db0d4c0463264edd0d46067d072eccb.tar.gz uhd-1cd874911db0d4c0463264edd0d46067d072eccb.tar.bz2 uhd-1cd874911db0d4c0463264edd0d46067d072eccb.zip |
rfnoc: Split up offload I/O service into multiple files
Diffstat (limited to 'host/lib/include/uhdlib')
3 files changed, 339 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp new file mode 100644 index 000000000..f0dd853a4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp @@ -0,0 +1,110 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP +#define INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <unordered_map> + +namespace uhd { namespace transport { + +/*! + * Helper class to keep track of the number of frames reserved from a pair of links + */ +class frame_reservation_mgr +{ +public: + struct frame_reservation_t + { + recv_link_if::sptr recv_link; + size_t num_recv_frames = 0; + send_link_if::sptr send_link; + size_t num_send_frames = 0; + }; + + void register_link(const recv_link_if::sptr& recv_link) + { + if (_recv_tbl[recv_link.get()] != 0) { + throw uhd::runtime_error("Recv link already attached to I/O service"); + } + _recv_tbl[recv_link.get()] = 0; + } + + void register_link(const send_link_if::sptr& send_link) + { + if (_send_tbl[send_link.get()] != 0) { + throw uhd::runtime_error("Send link already attached to I/O service"); + } + _send_tbl[send_link.get()] = 0; + } + + void unregister_link(const recv_link_if::sptr& recv_link) + { + auto link_ptr = recv_link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); + _recv_tbl.erase(link_ptr); + } + + void unregister_link(const send_link_if::sptr& send_link) + { + auto link_ptr = send_link.get(); + UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); + _send_tbl.erase(link_ptr); + } + + void reserve_frames(const frame_reservation_t& reservation) + { + if (reservation.recv_link) { + const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); + const size_t capacity = reservation.recv_link->get_num_recv_frames(); + if (rsvd_frames + reservation.num_recv_frames > capacity) { + throw uhd::runtime_error( + "Number of frames requested exceeds link recv frame capacity"); + } + + recv_link_if* link_ptr = reservation.recv_link.get(); + _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames; + } + + if (reservation.send_link) { + const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); + const size_t capacity = reservation.send_link->get_num_send_frames(); + if (rsvd_frames + reservation.num_send_frames > capacity) { + throw uhd::runtime_error( + "Number of frames requested exceeds link send frame capacity"); + } + + send_link_if* link_ptr = reservation.send_link.get(); + _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames; + } + } + + void unreserve_frames(const frame_reservation_t& reservation) + { + if (reservation.recv_link) { + const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); + recv_link_if* link_ptr = reservation.recv_link.get(); + _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames; + } + + if (reservation.send_link) { + const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); + send_link_if* link_ptr = reservation.send_link.get(); + _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames; + } + } + +private: + std::unordered_map<recv_link_if*, size_t> _recv_tbl; + std::unordered_map<send_link_if*, size_t> _send_tbl; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP */ diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp new file mode 100644 index 000000000..620e796ef --- /dev/null +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -0,0 +1,158 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP +#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP + +#include <uhd/transport/frame_buff.hpp> +#include <chrono> +#include <thread> + +namespace uhd { namespace transport { + +namespace detail { + +// Implementation of get_send_buff used below by send and recv clients +template <typename pop_func_t> +static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) +{ + using namespace std::chrono; + + if (timeout_ms == 0) { + return frame_buff::uptr(pop()); + } + + const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + bool last_check = false; + + while (true) { + if (frame_buff* buff = pop()) { + return frame_buff::uptr(buff); + } + + if (timeout_ms > 0 && steady_clock::now() > end_time) { + if (last_check) { + return nullptr; + } else { + last_check = true; + } + } + std::this_thread::yield(); + } +} + +} // namespace detail + +/*! + * Recv I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_recv_io : public recv_io_if +{ +public: + offload_recv_io(typename io_service_t::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + typename io_service_t::client_port_t::sptr& port) + : _io_srv(io_srv), _port(port) + { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; + } + + ~offload_recv_io() + { + assert(_num_frames_in_use == 0); + + if (_io_srv) { + _port->client_disconnect(); + } + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + return detail::client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); + } + + void release_recv_buff(frame_buff::uptr buff) + { + assert(buff); + _port->client_push(buff.release()); + _num_frames_in_use--; + } + +private: + offload_recv_io() = delete; + offload_recv_io(const offload_recv_io&) = delete; + + typename io_service_t::sptr _io_srv; + typename io_service_t::client_port_t::sptr _port; + size_t _num_frames_in_use = 0; +}; + +/*! + * Send I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_send_io : public send_io_if +{ +public: + offload_send_io(typename io_service_t::sptr io_srv, + size_t num_recv_frames, + size_t num_send_frames, + typename io_service_t::client_port_t::sptr& port) + : _io_srv(io_srv), _port(port) + { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; + } + + ~offload_send_io() + { + assert(_num_frames_in_use == 0); + + if (_io_srv) { + _port->client_disconnect(); + } + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + return detail::client_get_buff( + [this]() { + frame_buff* buff = _port->client_pop(); + _num_frames_in_use += buff ? 1 : 0; + return buff; + }, + timeout_ms); + } + + void release_send_buff(frame_buff::uptr buff) + { + assert(buff); + _port->client_push(buff.release()); + _num_frames_in_use--; + } + +private: + offload_send_io() = delete; + offload_send_io(const offload_send_io&) = delete; + + typename io_service_t::sptr _io_srv; + typename io_service_t::client_port_t::sptr _port; + size_t _num_frames_in_use = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP */ diff --git a/host/lib/include/uhdlib/utils/semaphore.hpp b/host/lib/include/uhdlib/utils/semaphore.hpp new file mode 100644 index 000000000..ae77ed102 --- /dev/null +++ b/host/lib/include/uhdlib/utils/semaphore.hpp @@ -0,0 +1,71 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <condition_variable> +#include <chrono> +#include <mutex> + +#ifndef INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP +#define INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP + +namespace uhd { + +/*! + * A sempahore built using std::condition_variable + */ +class semaphore +{ +public: + void notify() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + _count++; + _cv.notify_one(); + } + + void wait() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + _cv.wait(lock, [this]() { return this->_count != 0; }); + _count--; + } + + bool try_wait() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + if (_count != 0) { + _count--; + return true; + } + return false; + } + + bool wait_for(size_t timeout_ms) + { + std::chrono::milliseconds timeout(timeout_ms); + std::unique_lock<std::mutex> lock(_cv_mutex); + if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { + _count--; + return true; + } + return false; + } + + size_t count() + { + std::unique_lock<std::mutex> lock(_cv_mutex); + return _count; + } + +private: + std::condition_variable _cv; + std::mutex _cv_mutex; + size_t _count = 0; +}; + +} // namespace uhd + +#endif /* INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP */ |