diff options
Diffstat (limited to 'host/lib/transport/offload_io_service.cpp')
-rw-r--r-- | host/lib/transport/offload_io_service.cpp | 309 |
1 files changed, 12 insertions, 297 deletions
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index 012c86868..c9b9af344 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -6,11 +6,12 @@ #include <uhd/config.hpp> #include <uhd/exception.hpp> -#include <uhd/utils/log.hpp> #include <uhd/utils/thread.hpp> +#include <uhdlib/transport/frame_reservation_mgr.hpp> #include <uhdlib/transport/offload_io_service.hpp> +#include <uhdlib/transport/offload_io_service_client.hpp> +#include <uhdlib/utils/semaphore.hpp> #include <condition_variable> -#include <unordered_map> #include <boost/lockfree/queue.hpp> #include <atomic> #include <chrono> @@ -25,141 +26,6 @@ namespace { constexpr int32_t blocking_timeout_ms = 100; -// Struct to help keep track of frames reserved for each link -struct frame_reservation_t -{ - recv_link_if::sptr recv_link; - size_t num_recv_frames = 0; - send_link_if::sptr send_link; - size_t num_send_frames = 0; -}; - -// Helper class to keep track of frames reserved for each link -class frame_reservation_mgr -{ -public: - void register_link(const recv_link_if::sptr& recv_link) - { - if (_recv_tbl[recv_link.get()] != 0) { - throw uhd::runtime_error("Recv link already attached to I/O service"); - } - _recv_tbl[recv_link.get()] = 0; - } - - void register_link(const send_link_if::sptr& send_link) - { - if (_send_tbl[send_link.get()] != 0) { - throw uhd::runtime_error("Send link already attached to I/O service"); - } - _send_tbl[send_link.get()] = 0; - } - - void unregister_link(const recv_link_if::sptr& recv_link) - { - auto link_ptr = recv_link.get(); - UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); - _recv_tbl.erase(link_ptr); - } - - void unregister_link(const send_link_if::sptr& send_link) - { - auto link_ptr = send_link.get(); - UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); - _send_tbl.erase(link_ptr); - } - - void reserve_frames(const frame_reservation_t& reservation) - { - if (reservation.recv_link) { - const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); - const size_t capacity = reservation.recv_link->get_num_recv_frames(); - if (rsvd_frames + reservation.num_recv_frames > capacity) { - throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity"); - } - - recv_link_if* link_ptr = reservation.recv_link.get(); - _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames; - } - - if (reservation.send_link) { - const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); - const size_t capacity = reservation.send_link->get_num_send_frames(); - if (rsvd_frames + reservation.num_send_frames > capacity) { - throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity"); - } - - send_link_if* link_ptr = reservation.send_link.get(); - _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames; - } - } - - void unreserve_frames(const frame_reservation_t& reservation) - { - if (reservation.recv_link) { - const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); - recv_link_if* link_ptr = reservation.recv_link.get(); - _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames; - } - - if (reservation.send_link) { - const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); - send_link_if* link_ptr = reservation.send_link.get(); - _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames; - } - } - -private: - std::unordered_map<recv_link_if*, size_t> _recv_tbl; - std::unordered_map<send_link_if*, size_t> _send_tbl; -}; - - -// Semaphore used in blocking I/O for offload thread -class semaphore -{ -public: - void notify() { - std::unique_lock<std::mutex> lock(_cv_mutex); - _count++; - _cv.notify_one(); - } - - void wait() { - std::unique_lock<std::mutex> lock(_cv_mutex); - _cv.wait(lock, [this]() { return this->_count != 0; }); - _count--; - } - - bool try_wait() { - std::unique_lock<std::mutex> lock(_cv_mutex); - if (_count != 0) { - _count--; - return true; - } - return false; - } - - bool wait_for(size_t timeout_ms) { - std::chrono::milliseconds timeout(timeout_ms); - std::unique_lock<std::mutex> lock(_cv_mutex); - if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { - _count--; - return true; - } - return false; - } - - size_t count() { - std::unique_lock<std::mutex> lock(_cv_mutex); - return _count; - } - -private: - std::condition_variable _cv; - std::mutex _cv_mutex; - size_t _count = 0; -}; - // Fixed-size queue that supports blocking semantics template <typename queue_item_t> class offload_thread_queue { @@ -221,12 +87,12 @@ private: }; // Object that implements the communication between client and offload thread -struct client_port_t +struct client_port_impl_t { public: - using sptr = std::shared_ptr<client_port_t>; + using sptr = std::shared_ptr<client_port_impl_t>; - client_port_t(size_t size) + client_port_impl_t(size_t size) : _from_offload_thread(size) , _to_offload_thread(size + 1) // add one for disconnect command { @@ -363,7 +229,8 @@ class offload_io_service_impl public std::enable_shared_from_this<offload_io_service_impl> { public: - using sptr = std::shared_ptr<offload_io_service_impl>; + using sptr = std::shared_ptr<offload_io_service_impl>; + using client_port_t = client_port_impl_t; offload_io_service_impl( io_service::sptr io_srv, const offload_io_service::params_t& params); @@ -392,6 +259,8 @@ public: private: offload_io_service_impl(const offload_io_service_impl&) = delete; + using frame_reservation_t = frame_reservation_mgr::frame_reservation_t; + // Queue for new client creation, multiple producers allowed. Requests are // passed as heap-allocated pointers because boost lockfree queues require // simple types. @@ -453,160 +322,6 @@ private: frame_reservation_mgr _reservation_mgr; }; -class offload_recv_io : public recv_io_if -{ -public: - offload_recv_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port); - - ~offload_recv_io(); - - frame_buff::uptr get_recv_buff(int32_t timeout_ms); - void release_recv_buff(frame_buff::uptr buff); - -private: - offload_recv_io() = delete; - offload_recv_io(const offload_recv_io&) = delete; - - offload_io_service_impl::sptr _io_srv; - client_port_t::sptr _port; - size_t _num_frames_in_use = 0; -}; - -class offload_send_io : public send_io_if -{ -public: - offload_send_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port); - - ~offload_send_io(); - - frame_buff::uptr get_send_buff(int32_t timeout_ms); - void release_send_buff(frame_buff::uptr buff); - -private: - offload_send_io() = delete; - offload_send_io(const offload_send_io&) = delete; - - offload_io_service_impl::sptr _io_srv; - client_port_t::sptr _port; - size_t _num_frames_in_use = 0; -}; - -// Implementation of get_send_buff used below by send and recv clients -template <typename pop_func_t> -static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) -{ - using namespace std::chrono; - - if (timeout_ms == 0) { - return frame_buff::uptr(pop()); - } - - const auto end_time = steady_clock::now() + milliseconds(timeout_ms); - - bool last_check = false; - - while (true) { - if (frame_buff* buff = pop()) { - return frame_buff::uptr(buff); - } - - if (timeout_ms > 0 && steady_clock::now() > end_time) { - if (last_check) { - return nullptr; - } else { - last_check = true; - } - } - std::this_thread::yield(); - } -} - -// -// offload_recv_io methods -// -offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port) - : _io_srv(io_srv), _port(port) -{ - _num_recv_frames = num_recv_frames; - _num_send_frames = num_send_frames; -} - -offload_recv_io::~offload_recv_io() -{ - assert(_num_frames_in_use == 0); - - if (_io_srv) { - _port->client_disconnect(); - } -} - -frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms) -{ - return client_get_buff( - [this]() { - frame_buff* buff = _port->client_pop(); - _num_frames_in_use += buff ? 1 : 0; - return buff; - }, - timeout_ms); -} - -void offload_recv_io::release_recv_buff(frame_buff::uptr buff) -{ - assert(buff); - _port->client_push(buff.release()); - _num_frames_in_use--; -} - -// -// offload_send_io methods -// -offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv, - size_t num_recv_frames, - size_t num_send_frames, - client_port_t::sptr& port) - : _io_srv(io_srv), _port(port) -{ - _num_recv_frames = num_recv_frames; - _num_send_frames = num_send_frames; -} - -offload_send_io::~offload_send_io() -{ - assert(_num_frames_in_use == 0); - - if (_io_srv) { - _port->client_disconnect(); - } -} - -frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms) -{ - return client_get_buff( - [this]() { - frame_buff* buff = _port->client_pop(); - _num_frames_in_use += buff ? 1 : 0; - return buff; - }, - timeout_ms); -} - -void offload_send_io::release_send_buff(frame_buff::uptr buff) -{ - assert(buff); - _port->client_push(buff.release()); - _num_frames_in_use--; -} - // // offload_io_service methods // @@ -759,7 +474,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re port->client_wait_until_connected(); // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_recv_io>( + return std::make_shared<offload_recv_io<offload_io_service_impl>>( shared_from_this(), num_recv_frames, num_send_frames, port); } @@ -813,7 +528,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se } // Return a new recv client to the caller that just operates on the queues - return std::make_shared<offload_send_io>( + return std::make_shared<offload_send_io<offload_io_service_impl>>( shared_from_this(), num_recv_frames, num_send_frames, port); } |