aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/offload_io_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/offload_io_service.cpp')
-rw-r--r--host/lib/transport/offload_io_service.cpp309
1 files changed, 12 insertions, 297 deletions
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index 012c86868..c9b9af344 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -6,11 +6,12 @@
#include <uhd/config.hpp>
#include <uhd/exception.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/thread.hpp>
+#include <uhdlib/transport/frame_reservation_mgr.hpp>
#include <uhdlib/transport/offload_io_service.hpp>
+#include <uhdlib/transport/offload_io_service_client.hpp>
+#include <uhdlib/utils/semaphore.hpp>
#include <condition_variable>
-#include <unordered_map>
#include <boost/lockfree/queue.hpp>
#include <atomic>
#include <chrono>
@@ -25,141 +26,6 @@ namespace {
constexpr int32_t blocking_timeout_ms = 100;
-// Struct to help keep track of frames reserved for each link
-struct frame_reservation_t
-{
- recv_link_if::sptr recv_link;
- size_t num_recv_frames = 0;
- send_link_if::sptr send_link;
- size_t num_send_frames = 0;
-};
-
-// Helper class to keep track of frames reserved for each link
-class frame_reservation_mgr
-{
-public:
- void register_link(const recv_link_if::sptr& recv_link)
- {
- if (_recv_tbl[recv_link.get()] != 0) {
- throw uhd::runtime_error("Recv link already attached to I/O service");
- }
- _recv_tbl[recv_link.get()] = 0;
- }
-
- void register_link(const send_link_if::sptr& send_link)
- {
- if (_send_tbl[send_link.get()] != 0) {
- throw uhd::runtime_error("Send link already attached to I/O service");
- }
- _send_tbl[send_link.get()] = 0;
- }
-
- void unregister_link(const recv_link_if::sptr& recv_link)
- {
- auto link_ptr = recv_link.get();
- UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
- _recv_tbl.erase(link_ptr);
- }
-
- void unregister_link(const send_link_if::sptr& send_link)
- {
- auto link_ptr = send_link.get();
- UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0);
- _send_tbl.erase(link_ptr);
- }
-
- void reserve_frames(const frame_reservation_t& reservation)
- {
- if (reservation.recv_link) {
- const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
- const size_t capacity = reservation.recv_link->get_num_recv_frames();
- if (rsvd_frames + reservation.num_recv_frames > capacity) {
- throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity");
- }
-
- recv_link_if* link_ptr = reservation.recv_link.get();
- _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames;
- }
-
- if (reservation.send_link) {
- const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
- const size_t capacity = reservation.send_link->get_num_send_frames();
- if (rsvd_frames + reservation.num_send_frames > capacity) {
- throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity");
- }
-
- send_link_if* link_ptr = reservation.send_link.get();
- _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames;
- }
- }
-
- void unreserve_frames(const frame_reservation_t& reservation)
- {
- if (reservation.recv_link) {
- const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
- recv_link_if* link_ptr = reservation.recv_link.get();
- _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames;
- }
-
- if (reservation.send_link) {
- const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
- send_link_if* link_ptr = reservation.send_link.get();
- _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames;
- }
- }
-
-private:
- std::unordered_map<recv_link_if*, size_t> _recv_tbl;
- std::unordered_map<send_link_if*, size_t> _send_tbl;
-};
-
-
-// Semaphore used in blocking I/O for offload thread
-class semaphore
-{
-public:
- void notify() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- _count++;
- _cv.notify_one();
- }
-
- void wait() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- _cv.wait(lock, [this]() { return this->_count != 0; });
- _count--;
- }
-
- bool try_wait() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- if (_count != 0) {
- _count--;
- return true;
- }
- return false;
- }
-
- bool wait_for(size_t timeout_ms) {
- std::chrono::milliseconds timeout(timeout_ms);
- std::unique_lock<std::mutex> lock(_cv_mutex);
- if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) {
- _count--;
- return true;
- }
- return false;
- }
-
- size_t count() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- return _count;
- }
-
-private:
- std::condition_variable _cv;
- std::mutex _cv_mutex;
- size_t _count = 0;
-};
-
// Fixed-size queue that supports blocking semantics
template <typename queue_item_t>
class offload_thread_queue {
@@ -221,12 +87,12 @@ private:
};
// Object that implements the communication between client and offload thread
-struct client_port_t
+struct client_port_impl_t
{
public:
- using sptr = std::shared_ptr<client_port_t>;
+ using sptr = std::shared_ptr<client_port_impl_t>;
- client_port_t(size_t size)
+ client_port_impl_t(size_t size)
: _from_offload_thread(size)
, _to_offload_thread(size + 1) // add one for disconnect command
{
@@ -363,7 +229,8 @@ class offload_io_service_impl
public std::enable_shared_from_this<offload_io_service_impl>
{
public:
- using sptr = std::shared_ptr<offload_io_service_impl>;
+ using sptr = std::shared_ptr<offload_io_service_impl>;
+ using client_port_t = client_port_impl_t;
offload_io_service_impl(
io_service::sptr io_srv, const offload_io_service::params_t& params);
@@ -392,6 +259,8 @@ public:
private:
offload_io_service_impl(const offload_io_service_impl&) = delete;
+ using frame_reservation_t = frame_reservation_mgr::frame_reservation_t;
+
// Queue for new client creation, multiple producers allowed. Requests are
// passed as heap-allocated pointers because boost lockfree queues require
// simple types.
@@ -453,160 +322,6 @@ private:
frame_reservation_mgr _reservation_mgr;
};
-class offload_recv_io : public recv_io_if
-{
-public:
- offload_recv_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port);
-
- ~offload_recv_io();
-
- frame_buff::uptr get_recv_buff(int32_t timeout_ms);
- void release_recv_buff(frame_buff::uptr buff);
-
-private:
- offload_recv_io() = delete;
- offload_recv_io(const offload_recv_io&) = delete;
-
- offload_io_service_impl::sptr _io_srv;
- client_port_t::sptr _port;
- size_t _num_frames_in_use = 0;
-};
-
-class offload_send_io : public send_io_if
-{
-public:
- offload_send_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port);
-
- ~offload_send_io();
-
- frame_buff::uptr get_send_buff(int32_t timeout_ms);
- void release_send_buff(frame_buff::uptr buff);
-
-private:
- offload_send_io() = delete;
- offload_send_io(const offload_send_io&) = delete;
-
- offload_io_service_impl::sptr _io_srv;
- client_port_t::sptr _port;
- size_t _num_frames_in_use = 0;
-};
-
-// Implementation of get_send_buff used below by send and recv clients
-template <typename pop_func_t>
-static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms)
-{
- using namespace std::chrono;
-
- if (timeout_ms == 0) {
- return frame_buff::uptr(pop());
- }
-
- const auto end_time = steady_clock::now() + milliseconds(timeout_ms);
-
- bool last_check = false;
-
- while (true) {
- if (frame_buff* buff = pop()) {
- return frame_buff::uptr(buff);
- }
-
- if (timeout_ms > 0 && steady_clock::now() > end_time) {
- if (last_check) {
- return nullptr;
- } else {
- last_check = true;
- }
- }
- std::this_thread::yield();
- }
-}
-
-//
-// offload_recv_io methods
-//
-offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port)
- : _io_srv(io_srv), _port(port)
-{
- _num_recv_frames = num_recv_frames;
- _num_send_frames = num_send_frames;
-}
-
-offload_recv_io::~offload_recv_io()
-{
- assert(_num_frames_in_use == 0);
-
- if (_io_srv) {
- _port->client_disconnect();
- }
-}
-
-frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms)
-{
- return client_get_buff(
- [this]() {
- frame_buff* buff = _port->client_pop();
- _num_frames_in_use += buff ? 1 : 0;
- return buff;
- },
- timeout_ms);
-}
-
-void offload_recv_io::release_recv_buff(frame_buff::uptr buff)
-{
- assert(buff);
- _port->client_push(buff.release());
- _num_frames_in_use--;
-}
-
-//
-// offload_send_io methods
-//
-offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port)
- : _io_srv(io_srv), _port(port)
-{
- _num_recv_frames = num_recv_frames;
- _num_send_frames = num_send_frames;
-}
-
-offload_send_io::~offload_send_io()
-{
- assert(_num_frames_in_use == 0);
-
- if (_io_srv) {
- _port->client_disconnect();
- }
-}
-
-frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms)
-{
- return client_get_buff(
- [this]() {
- frame_buff* buff = _port->client_pop();
- _num_frames_in_use += buff ? 1 : 0;
- return buff;
- },
- timeout_ms);
-}
-
-void offload_send_io::release_send_buff(frame_buff::uptr buff)
-{
- assert(buff);
- _port->client_push(buff.release());
- _num_frames_in_use--;
-}
-
//
// offload_io_service methods
//
@@ -759,7 +474,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re
port->client_wait_until_connected();
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_recv_io>(
+ return std::make_shared<offload_recv_io<offload_io_service_impl>>(
shared_from_this(), num_recv_frames, num_send_frames, port);
}
@@ -813,7 +528,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
}
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_send_io>(
+ return std::make_shared<offload_send_io<offload_io_service_impl>>(
shared_from_this(), num_recv_frames, num_send_frames, port);
}