aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp110
-rw-r--r--host/lib/include/uhdlib/transport/offload_io_service_client.hpp158
-rw-r--r--host/lib/include/uhdlib/utils/semaphore.hpp71
-rw-r--r--host/lib/transport/offload_io_service.cpp309
4 files changed, 351 insertions, 297 deletions
diff --git a/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp
new file mode 100644
index 000000000..f0dd853a4
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp
@@ -0,0 +1,110 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP
+#define INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <unordered_map>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Helper class to keep track of the number of frames reserved from a pair of links
+ */
+class frame_reservation_mgr
+{
+public:
+ struct frame_reservation_t
+ {
+ recv_link_if::sptr recv_link;
+ size_t num_recv_frames = 0;
+ send_link_if::sptr send_link;
+ size_t num_send_frames = 0;
+ };
+
+ void register_link(const recv_link_if::sptr& recv_link)
+ {
+ if (_recv_tbl[recv_link.get()] != 0) {
+ throw uhd::runtime_error("Recv link already attached to I/O service");
+ }
+ _recv_tbl[recv_link.get()] = 0;
+ }
+
+ void register_link(const send_link_if::sptr& send_link)
+ {
+ if (_send_tbl[send_link.get()] != 0) {
+ throw uhd::runtime_error("Send link already attached to I/O service");
+ }
+ _send_tbl[send_link.get()] = 0;
+ }
+
+ void unregister_link(const recv_link_if::sptr& recv_link)
+ {
+ auto link_ptr = recv_link.get();
+ UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
+ _recv_tbl.erase(link_ptr);
+ }
+
+ void unregister_link(const send_link_if::sptr& send_link)
+ {
+ auto link_ptr = send_link.get();
+ UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0);
+ _send_tbl.erase(link_ptr);
+ }
+
+ void reserve_frames(const frame_reservation_t& reservation)
+ {
+ if (reservation.recv_link) {
+ const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
+ const size_t capacity = reservation.recv_link->get_num_recv_frames();
+ if (rsvd_frames + reservation.num_recv_frames > capacity) {
+ throw uhd::runtime_error(
+ "Number of frames requested exceeds link recv frame capacity");
+ }
+
+ recv_link_if* link_ptr = reservation.recv_link.get();
+ _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames;
+ }
+
+ if (reservation.send_link) {
+ const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
+ const size_t capacity = reservation.send_link->get_num_send_frames();
+ if (rsvd_frames + reservation.num_send_frames > capacity) {
+ throw uhd::runtime_error(
+ "Number of frames requested exceeds link send frame capacity");
+ }
+
+ send_link_if* link_ptr = reservation.send_link.get();
+ _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames;
+ }
+ }
+
+ void unreserve_frames(const frame_reservation_t& reservation)
+ {
+ if (reservation.recv_link) {
+ const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
+ recv_link_if* link_ptr = reservation.recv_link.get();
+ _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames;
+ }
+
+ if (reservation.send_link) {
+ const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
+ send_link_if* link_ptr = reservation.send_link.get();
+ _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames;
+ }
+ }
+
+private:
+ std::unordered_map<recv_link_if*, size_t> _recv_tbl;
+ std::unordered_map<send_link_if*, size_t> _send_tbl;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP */
diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
new file mode 100644
index 000000000..620e796ef
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
@@ -0,0 +1,158 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP
+#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP
+
+#include <uhd/transport/frame_buff.hpp>
+#include <chrono>
+#include <thread>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+// Implementation of get_send_buff used below by send and recv clients
+template <typename pop_func_t>
+static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms)
+{
+ using namespace std::chrono;
+
+ if (timeout_ms == 0) {
+ return frame_buff::uptr(pop());
+ }
+
+ const auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+
+ bool last_check = false;
+
+ while (true) {
+ if (frame_buff* buff = pop()) {
+ return frame_buff::uptr(buff);
+ }
+
+ if (timeout_ms > 0 && steady_clock::now() > end_time) {
+ if (last_check) {
+ return nullptr;
+ } else {
+ last_check = true;
+ }
+ }
+ std::this_thread::yield();
+ }
+}
+
+} // namespace detail
+
+/*!
+ * Recv I/O client for offload I/O service
+ */
+template <typename io_service_t>
+class offload_recv_io : public recv_io_if
+{
+public:
+ offload_recv_io(typename io_service_t::sptr io_srv,
+ size_t num_recv_frames,
+ size_t num_send_frames,
+ typename io_service_t::client_port_t::sptr& port)
+ : _io_srv(io_srv), _port(port)
+ {
+ _num_recv_frames = num_recv_frames;
+ _num_send_frames = num_send_frames;
+ }
+
+ ~offload_recv_io()
+ {
+ assert(_num_frames_in_use == 0);
+
+ if (_io_srv) {
+ _port->client_disconnect();
+ }
+ }
+
+ frame_buff::uptr get_recv_buff(int32_t timeout_ms)
+ {
+ return detail::client_get_buff(
+ [this]() {
+ frame_buff* buff = _port->client_pop();
+ _num_frames_in_use += buff ? 1 : 0;
+ return buff;
+ },
+ timeout_ms);
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ assert(buff);
+ _port->client_push(buff.release());
+ _num_frames_in_use--;
+ }
+
+private:
+ offload_recv_io() = delete;
+ offload_recv_io(const offload_recv_io&) = delete;
+
+ typename io_service_t::sptr _io_srv;
+ typename io_service_t::client_port_t::sptr _port;
+ size_t _num_frames_in_use = 0;
+};
+
+/*!
+ * Send I/O client for offload I/O service
+ */
+template <typename io_service_t>
+class offload_send_io : public send_io_if
+{
+public:
+ offload_send_io(typename io_service_t::sptr io_srv,
+ size_t num_recv_frames,
+ size_t num_send_frames,
+ typename io_service_t::client_port_t::sptr& port)
+ : _io_srv(io_srv), _port(port)
+ {
+ _num_recv_frames = num_recv_frames;
+ _num_send_frames = num_send_frames;
+ }
+
+ ~offload_send_io()
+ {
+ assert(_num_frames_in_use == 0);
+
+ if (_io_srv) {
+ _port->client_disconnect();
+ }
+ }
+
+ frame_buff::uptr get_send_buff(int32_t timeout_ms)
+ {
+ return detail::client_get_buff(
+ [this]() {
+ frame_buff* buff = _port->client_pop();
+ _num_frames_in_use += buff ? 1 : 0;
+ return buff;
+ },
+ timeout_ms);
+ }
+
+ void release_send_buff(frame_buff::uptr buff)
+ {
+ assert(buff);
+ _port->client_push(buff.release());
+ _num_frames_in_use--;
+ }
+
+private:
+ offload_send_io() = delete;
+ offload_send_io(const offload_send_io&) = delete;
+
+ typename io_service_t::sptr _io_srv;
+ typename io_service_t::client_port_t::sptr _port;
+ size_t _num_frames_in_use = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP */
diff --git a/host/lib/include/uhdlib/utils/semaphore.hpp b/host/lib/include/uhdlib/utils/semaphore.hpp
new file mode 100644
index 000000000..ae77ed102
--- /dev/null
+++ b/host/lib/include/uhdlib/utils/semaphore.hpp
@@ -0,0 +1,71 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <condition_variable>
+#include <chrono>
+#include <mutex>
+
+#ifndef INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP
+#define INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP
+
+namespace uhd {
+
+/*!
+ * A sempahore built using std::condition_variable
+ */
+class semaphore
+{
+public:
+ void notify()
+ {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ _count++;
+ _cv.notify_one();
+ }
+
+ void wait()
+ {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ _cv.wait(lock, [this]() { return this->_count != 0; });
+ _count--;
+ }
+
+ bool try_wait()
+ {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ if (_count != 0) {
+ _count--;
+ return true;
+ }
+ return false;
+ }
+
+ bool wait_for(size_t timeout_ms)
+ {
+ std::chrono::milliseconds timeout(timeout_ms);
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) {
+ _count--;
+ return true;
+ }
+ return false;
+ }
+
+ size_t count()
+ {
+ std::unique_lock<std::mutex> lock(_cv_mutex);
+ return _count;
+ }
+
+private:
+ std::condition_variable _cv;
+ std::mutex _cv_mutex;
+ size_t _count = 0;
+};
+
+} // namespace uhd
+
+#endif /* INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP */
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index 012c86868..c9b9af344 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -6,11 +6,12 @@
#include <uhd/config.hpp>
#include <uhd/exception.hpp>
-#include <uhd/utils/log.hpp>
#include <uhd/utils/thread.hpp>
+#include <uhdlib/transport/frame_reservation_mgr.hpp>
#include <uhdlib/transport/offload_io_service.hpp>
+#include <uhdlib/transport/offload_io_service_client.hpp>
+#include <uhdlib/utils/semaphore.hpp>
#include <condition_variable>
-#include <unordered_map>
#include <boost/lockfree/queue.hpp>
#include <atomic>
#include <chrono>
@@ -25,141 +26,6 @@ namespace {
constexpr int32_t blocking_timeout_ms = 100;
-// Struct to help keep track of frames reserved for each link
-struct frame_reservation_t
-{
- recv_link_if::sptr recv_link;
- size_t num_recv_frames = 0;
- send_link_if::sptr send_link;
- size_t num_send_frames = 0;
-};
-
-// Helper class to keep track of frames reserved for each link
-class frame_reservation_mgr
-{
-public:
- void register_link(const recv_link_if::sptr& recv_link)
- {
- if (_recv_tbl[recv_link.get()] != 0) {
- throw uhd::runtime_error("Recv link already attached to I/O service");
- }
- _recv_tbl[recv_link.get()] = 0;
- }
-
- void register_link(const send_link_if::sptr& send_link)
- {
- if (_send_tbl[send_link.get()] != 0) {
- throw uhd::runtime_error("Send link already attached to I/O service");
- }
- _send_tbl[send_link.get()] = 0;
- }
-
- void unregister_link(const recv_link_if::sptr& recv_link)
- {
- auto link_ptr = recv_link.get();
- UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
- _recv_tbl.erase(link_ptr);
- }
-
- void unregister_link(const send_link_if::sptr& send_link)
- {
- auto link_ptr = send_link.get();
- UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0);
- _send_tbl.erase(link_ptr);
- }
-
- void reserve_frames(const frame_reservation_t& reservation)
- {
- if (reservation.recv_link) {
- const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
- const size_t capacity = reservation.recv_link->get_num_recv_frames();
- if (rsvd_frames + reservation.num_recv_frames > capacity) {
- throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity");
- }
-
- recv_link_if* link_ptr = reservation.recv_link.get();
- _recv_tbl[link_ptr] = rsvd_frames + reservation.num_recv_frames;
- }
-
- if (reservation.send_link) {
- const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
- const size_t capacity = reservation.send_link->get_num_send_frames();
- if (rsvd_frames + reservation.num_send_frames > capacity) {
- throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity");
- }
-
- send_link_if* link_ptr = reservation.send_link.get();
- _send_tbl[link_ptr] = rsvd_frames + reservation.num_send_frames;
- }
- }
-
- void unreserve_frames(const frame_reservation_t& reservation)
- {
- if (reservation.recv_link) {
- const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get());
- recv_link_if* link_ptr = reservation.recv_link.get();
- _recv_tbl[link_ptr] = rsvd_frames - reservation.num_recv_frames;
- }
-
- if (reservation.send_link) {
- const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get());
- send_link_if* link_ptr = reservation.send_link.get();
- _send_tbl[link_ptr] = rsvd_frames - reservation.num_send_frames;
- }
- }
-
-private:
- std::unordered_map<recv_link_if*, size_t> _recv_tbl;
- std::unordered_map<send_link_if*, size_t> _send_tbl;
-};
-
-
-// Semaphore used in blocking I/O for offload thread
-class semaphore
-{
-public:
- void notify() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- _count++;
- _cv.notify_one();
- }
-
- void wait() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- _cv.wait(lock, [this]() { return this->_count != 0; });
- _count--;
- }
-
- bool try_wait() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- if (_count != 0) {
- _count--;
- return true;
- }
- return false;
- }
-
- bool wait_for(size_t timeout_ms) {
- std::chrono::milliseconds timeout(timeout_ms);
- std::unique_lock<std::mutex> lock(_cv_mutex);
- if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) {
- _count--;
- return true;
- }
- return false;
- }
-
- size_t count() {
- std::unique_lock<std::mutex> lock(_cv_mutex);
- return _count;
- }
-
-private:
- std::condition_variable _cv;
- std::mutex _cv_mutex;
- size_t _count = 0;
-};
-
// Fixed-size queue that supports blocking semantics
template <typename queue_item_t>
class offload_thread_queue {
@@ -221,12 +87,12 @@ private:
};
// Object that implements the communication between client and offload thread
-struct client_port_t
+struct client_port_impl_t
{
public:
- using sptr = std::shared_ptr<client_port_t>;
+ using sptr = std::shared_ptr<client_port_impl_t>;
- client_port_t(size_t size)
+ client_port_impl_t(size_t size)
: _from_offload_thread(size)
, _to_offload_thread(size + 1) // add one for disconnect command
{
@@ -363,7 +229,8 @@ class offload_io_service_impl
public std::enable_shared_from_this<offload_io_service_impl>
{
public:
- using sptr = std::shared_ptr<offload_io_service_impl>;
+ using sptr = std::shared_ptr<offload_io_service_impl>;
+ using client_port_t = client_port_impl_t;
offload_io_service_impl(
io_service::sptr io_srv, const offload_io_service::params_t& params);
@@ -392,6 +259,8 @@ public:
private:
offload_io_service_impl(const offload_io_service_impl&) = delete;
+ using frame_reservation_t = frame_reservation_mgr::frame_reservation_t;
+
// Queue for new client creation, multiple producers allowed. Requests are
// passed as heap-allocated pointers because boost lockfree queues require
// simple types.
@@ -453,160 +322,6 @@ private:
frame_reservation_mgr _reservation_mgr;
};
-class offload_recv_io : public recv_io_if
-{
-public:
- offload_recv_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port);
-
- ~offload_recv_io();
-
- frame_buff::uptr get_recv_buff(int32_t timeout_ms);
- void release_recv_buff(frame_buff::uptr buff);
-
-private:
- offload_recv_io() = delete;
- offload_recv_io(const offload_recv_io&) = delete;
-
- offload_io_service_impl::sptr _io_srv;
- client_port_t::sptr _port;
- size_t _num_frames_in_use = 0;
-};
-
-class offload_send_io : public send_io_if
-{
-public:
- offload_send_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port);
-
- ~offload_send_io();
-
- frame_buff::uptr get_send_buff(int32_t timeout_ms);
- void release_send_buff(frame_buff::uptr buff);
-
-private:
- offload_send_io() = delete;
- offload_send_io(const offload_send_io&) = delete;
-
- offload_io_service_impl::sptr _io_srv;
- client_port_t::sptr _port;
- size_t _num_frames_in_use = 0;
-};
-
-// Implementation of get_send_buff used below by send and recv clients
-template <typename pop_func_t>
-static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms)
-{
- using namespace std::chrono;
-
- if (timeout_ms == 0) {
- return frame_buff::uptr(pop());
- }
-
- const auto end_time = steady_clock::now() + milliseconds(timeout_ms);
-
- bool last_check = false;
-
- while (true) {
- if (frame_buff* buff = pop()) {
- return frame_buff::uptr(buff);
- }
-
- if (timeout_ms > 0 && steady_clock::now() > end_time) {
- if (last_check) {
- return nullptr;
- } else {
- last_check = true;
- }
- }
- std::this_thread::yield();
- }
-}
-
-//
-// offload_recv_io methods
-//
-offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port)
- : _io_srv(io_srv), _port(port)
-{
- _num_recv_frames = num_recv_frames;
- _num_send_frames = num_send_frames;
-}
-
-offload_recv_io::~offload_recv_io()
-{
- assert(_num_frames_in_use == 0);
-
- if (_io_srv) {
- _port->client_disconnect();
- }
-}
-
-frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms)
-{
- return client_get_buff(
- [this]() {
- frame_buff* buff = _port->client_pop();
- _num_frames_in_use += buff ? 1 : 0;
- return buff;
- },
- timeout_ms);
-}
-
-void offload_recv_io::release_recv_buff(frame_buff::uptr buff)
-{
- assert(buff);
- _port->client_push(buff.release());
- _num_frames_in_use--;
-}
-
-//
-// offload_send_io methods
-//
-offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv,
- size_t num_recv_frames,
- size_t num_send_frames,
- client_port_t::sptr& port)
- : _io_srv(io_srv), _port(port)
-{
- _num_recv_frames = num_recv_frames;
- _num_send_frames = num_send_frames;
-}
-
-offload_send_io::~offload_send_io()
-{
- assert(_num_frames_in_use == 0);
-
- if (_io_srv) {
- _port->client_disconnect();
- }
-}
-
-frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms)
-{
- return client_get_buff(
- [this]() {
- frame_buff* buff = _port->client_pop();
- _num_frames_in_use += buff ? 1 : 0;
- return buff;
- },
- timeout_ms);
-}
-
-void offload_send_io::release_send_buff(frame_buff::uptr buff)
-{
- assert(buff);
- _port->client_push(buff.release());
- _num_frames_in_use--;
-}
-
//
// offload_io_service methods
//
@@ -759,7 +474,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re
port->client_wait_until_connected();
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_recv_io>(
+ return std::make_shared<offload_recv_io<offload_io_service_impl>>(
shared_from_this(), num_recv_frames, num_send_frames, port);
}
@@ -813,7 +528,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
}
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_send_io>(
+ return std::make_shared<offload_send_io<offload_io_service_impl>>(
shared_from_this(), num_recv_frames, num_send_frames, port);
}