aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/transport/inline_io_service.hpp13
-rw-r--r--host/lib/include/uhdlib/transport/offload_io_service_client.hpp44
-rw-r--r--host/lib/transport/inline_io_service.cpp81
-rw-r--r--host/lib/transport/offload_io_service.cpp27
4 files changed, 140 insertions, 25 deletions
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp
index fe41b96b6..d4a6dbbae 100644
--- a/host/lib/include/uhdlib/transport/inline_io_service.hpp
+++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp
@@ -104,6 +104,19 @@ private:
frame_buff::uptr recv(
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms);
+ /*
+ * Function to perform recv operations on a link, which is potentially
+ * muxed. This function is only called from send_io::release_send_buff, and
+ * always expects recv_io_cb to release its incoming buffer. Packets are
+ * forwarded to the appropriate mux or callback.
+ *
+ * \param recv_io_cb the callback+interface initiating the operation
+ * \param recv_link link to perform receive on
+ * \param timeout_ms timeout to wait for a buffer on the link
+ */
+ void recv_flow_ctrl(
+ inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms);
+
/* Track whether link is muxed and the callback */
std::unordered_map<recv_link_if*, std::tuple<inline_recv_mux*, inline_recv_cb*>>
_recv_tbl;
diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
index 620e796ef..2f606878c 100644
--- a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
+++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp
@@ -50,7 +50,7 @@ static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms
/*!
* Recv I/O client for offload I/O service
*/
-template <typename io_service_t>
+template <typename io_service_t, bool polling>
class offload_recv_io : public recv_io_if
{
public:
@@ -75,13 +75,19 @@ public:
frame_buff::uptr get_recv_buff(int32_t timeout_ms)
{
- return detail::client_get_buff(
- [this]() {
- frame_buff* buff = _port->client_pop();
- _num_frames_in_use += buff ? 1 : 0;
- return buff;
- },
- timeout_ms);
+ if (polling) {
+ return detail::client_get_buff(
+ [this]() {
+ frame_buff* buff = _port->client_pop();
+ _num_frames_in_use += buff ? 1 : 0;
+ return buff;
+ },
+ timeout_ms);
+ } else {
+ frame_buff* buff = _port->client_pop(timeout_ms);
+ _num_frames_in_use += buff ? 1 : 0;
+ return frame_buff::uptr(buff);
+ }
}
void release_recv_buff(frame_buff::uptr buff)
@@ -103,7 +109,7 @@ private:
/*!
* Send I/O client for offload I/O service
*/
-template <typename io_service_t>
+template <typename io_service_t, bool polling>
class offload_send_io : public send_io_if
{
public:
@@ -128,13 +134,19 @@ public:
frame_buff::uptr get_send_buff(int32_t timeout_ms)
{
- return detail::client_get_buff(
- [this]() {
- frame_buff* buff = _port->client_pop();
- _num_frames_in_use += buff ? 1 : 0;
- return buff;
- },
- timeout_ms);
+ if (polling) {
+ return detail::client_get_buff(
+ [this]() {
+ frame_buff* buff = _port->client_pop();
+ _num_frames_in_use += buff ? 1 : 0;
+ return buff;
+ },
+ timeout_ms);
+ } else {
+ frame_buff* buff = _port->client_pop(timeout_ms);
+ _num_frames_in_use += buff ? 1 : 0;
+ return frame_buff::uptr(buff);
+ }
}
void release_send_buff(frame_buff::uptr buff)
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 93967e09a..6449bbda8 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -144,6 +144,41 @@ public:
}
}
+ void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
+ {
+ while (true) {
+ frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
+ /* Process buffer */
+ if (buff) {
+ bool rcvr_found = false;
+ for (auto& rcvr : _callbacks) {
+ if (rcvr->callback(buff, recv_link)) {
+ rcvr_found = true;
+ if (rcvr == cb) {
+ assert(!buff);
+ return;
+ } else if (buff) {
+ /* NOTE: Should not overflow, by construction
+ * Every queue can hold link->get_num_recv_frames()
+ */
+ _queues[rcvr]->push_back(buff.release());
+ } else {
+ /* Continue looping if buffer was consumed and
+ receiver is not the requested one */
+ break;
+ }
+ }
+ }
+ if (not rcvr_found) {
+ UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ } else { /* Timeout */
+ return;
+ }
+ }
+ }
+
private:
recv_link_if* _link;
std::list<inline_recv_cb*> _callbacks;
@@ -248,11 +283,18 @@ public:
void release_send_buff(frame_buff::uptr buff)
{
- while (buff) { /* TODO: Possibly don't loop indefinitely here */
+ while (buff) {
+ // Try to send a packet
+ _send_cb(buff, _send_link.get());
if (_recv_link) {
- _io_srv->recv(this, _recv_link.get(), 0);
+ // If the buffer was not released, use a timeout to receive
+ // the flow control packet, to avoid wasting CPU.
+ if (!buff) {
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0);
+ } else {
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100);
+ }
}
- _send_cb(buff, _send_link.get());
}
_num_frames_in_use--;
}
@@ -430,7 +472,38 @@ frame_buff::uptr inline_io_service::recv(
return frame_buff::uptr();
}
}
- return frame_buff::uptr();
+}
+
+void inline_io_service::recv_flow_ctrl(
+ inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms)
+{
+ inline_recv_mux* mux;
+ inline_recv_cb* rcvr;
+ std::tie(mux, rcvr) = _recv_tbl.at(recv_link);
+
+ if (mux) {
+ /* Defer to mux's recv_flow_ctrl() if present */
+ mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
+ return;
+ } else {
+ assert(recv_io_cb == rcvr);
+ }
+
+ while (true) {
+ frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
+ /* Process buffer */
+ if (buff) {
+ if (rcvr->callback(buff, recv_link)) {
+ assert(!buff);
+ return;
+ } else {
+ UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ } else { /* Timeout */
+ return;
+ }
+ }
}
}} // namespace uhd::transport
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index c9b9af344..53c8f017d 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -24,7 +24,7 @@ namespace uhd { namespace transport {
namespace {
-constexpr int32_t blocking_timeout_ms = 100;
+constexpr int32_t blocking_timeout_ms = 10;
// Fixed-size queue that supports blocking semantics
template <typename queue_item_t>
@@ -108,6 +108,13 @@ public:
return queue_element.buff;
}
+ frame_buff* client_pop(int32_t timeout_ms)
+ {
+ from_offload_thread_t queue_element;
+ _from_offload_thread.pop(queue_element, timeout_ms);
+ return queue_element.buff;
+ }
+
size_t client_read_available()
{
return _from_offload_thread.read_available();
@@ -474,8 +481,13 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re
port->client_wait_until_connected();
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_recv_io<offload_io_service_impl>>(
- shared_from_this(), num_recv_frames, num_send_frames, port);
+ if (_offload_thread_params.wait_mode == POLL) {
+ return std::make_shared<offload_recv_io<offload_io_service_impl, true>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ } else {
+ return std::make_shared<offload_recv_io<offload_io_service_impl, false>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ }
}
send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link,
@@ -528,8 +540,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
}
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_send_io<offload_io_service_impl>>(
- shared_from_this(), num_recv_frames, num_send_frames, port);
+ if (_offload_thread_params.wait_mode == POLL) {
+ return std::make_shared<offload_send_io<offload_io_service_impl, true>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ } else {
+ return std::make_shared<offload_send_io<offload_io_service_impl, false>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ }
}
void offload_io_service_impl::_queue_client_req(std::function<void()> fn)