aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/inline_io_service.cpp81
-rw-r--r--host/lib/transport/offload_io_service.cpp27
2 files changed, 99 insertions, 9 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 93967e09a..6449bbda8 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -144,6 +144,41 @@ public:
}
}
+ void recv_flow_ctrl(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
+ {
+ while (true) {
+ frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
+ /* Process buffer */
+ if (buff) {
+ bool rcvr_found = false;
+ for (auto& rcvr : _callbacks) {
+ if (rcvr->callback(buff, recv_link)) {
+ rcvr_found = true;
+ if (rcvr == cb) {
+ assert(!buff);
+ return;
+ } else if (buff) {
+ /* NOTE: Should not overflow, by construction
+ * Every queue can hold link->get_num_recv_frames()
+ */
+ _queues[rcvr]->push_back(buff.release());
+ } else {
+ /* Continue looping if buffer was consumed and
+ receiver is not the requested one */
+ break;
+ }
+ }
+ }
+ if (not rcvr_found) {
+ UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ } else { /* Timeout */
+ return;
+ }
+ }
+ }
+
private:
recv_link_if* _link;
std::list<inline_recv_cb*> _callbacks;
@@ -248,11 +283,18 @@ public:
void release_send_buff(frame_buff::uptr buff)
{
- while (buff) { /* TODO: Possibly don't loop indefinitely here */
+ while (buff) {
+ // Try to send a packet
+ _send_cb(buff, _send_link.get());
if (_recv_link) {
- _io_srv->recv(this, _recv_link.get(), 0);
+ // If the buffer was not released, use a timeout to receive
+ // the flow control packet, to avoid wasting CPU.
+ if (!buff) {
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), 0);
+ } else {
+ _io_srv->recv_flow_ctrl(this, _recv_link.get(), 100);
+ }
}
- _send_cb(buff, _send_link.get());
}
_num_frames_in_use--;
}
@@ -430,7 +472,38 @@ frame_buff::uptr inline_io_service::recv(
return frame_buff::uptr();
}
}
- return frame_buff::uptr();
+}
+
+void inline_io_service::recv_flow_ctrl(
+ inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms)
+{
+ inline_recv_mux* mux;
+ inline_recv_cb* rcvr;
+ std::tie(mux, rcvr) = _recv_tbl.at(recv_link);
+
+ if (mux) {
+ /* Defer to mux's recv_flow_ctrl() if present */
+ mux->recv_flow_ctrl(recv_io_cb, recv_link, timeout_ms);
+ return;
+ } else {
+ assert(recv_io_cb == rcvr);
+ }
+
+ while (true) {
+ frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
+ /* Process buffer */
+ if (buff) {
+ if (rcvr->callback(buff, recv_link)) {
+ assert(!buff);
+ return;
+ } else {
+ UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ } else { /* Timeout */
+ return;
+ }
+ }
}
}} // namespace uhd::transport
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index c9b9af344..53c8f017d 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -24,7 +24,7 @@ namespace uhd { namespace transport {
namespace {
-constexpr int32_t blocking_timeout_ms = 100;
+constexpr int32_t blocking_timeout_ms = 10;
// Fixed-size queue that supports blocking semantics
template <typename queue_item_t>
@@ -108,6 +108,13 @@ public:
return queue_element.buff;
}
+ frame_buff* client_pop(int32_t timeout_ms)
+ {
+ from_offload_thread_t queue_element;
+ _from_offload_thread.pop(queue_element, timeout_ms);
+ return queue_element.buff;
+ }
+
size_t client_read_available()
{
return _from_offload_thread.read_available();
@@ -474,8 +481,13 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re
port->client_wait_until_connected();
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_recv_io<offload_io_service_impl>>(
- shared_from_this(), num_recv_frames, num_send_frames, port);
+ if (_offload_thread_params.wait_mode == POLL) {
+ return std::make_shared<offload_recv_io<offload_io_service_impl, true>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ } else {
+ return std::make_shared<offload_recv_io<offload_io_service_impl, false>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ }
}
send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr send_link,
@@ -528,8 +540,13 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
}
// Return a new recv client to the caller that just operates on the queues
- return std::make_shared<offload_send_io<offload_io_service_impl>>(
- shared_from_this(), num_recv_frames, num_send_frames, port);
+ if (_offload_thread_params.wait_mode == POLL) {
+ return std::make_shared<offload_send_io<offload_io_service_impl, true>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ } else {
+ return std::make_shared<offload_send_io<offload_io_service_impl, false>>(
+ shared_from_this(), num_recv_frames, num_send_frames, port);
+ }
}
void offload_io_service_impl::_queue_client_req(std::function<void()> fn)