aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/inline_io_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/inline_io_service.cpp')
-rw-r--r--host/lib/transport/inline_io_service.cpp85
1 files changed, 47 insertions, 38 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 942c99d5f..9dd0814ca 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -1,3 +1,9 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
#include <uhd/config.hpp>
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
@@ -159,38 +165,43 @@ public:
: inline_recv_cb(recv_cb, fc_link.get())
, _io_srv(io_srv)
, _data_link(data_link)
- , _num_recv_frames(num_recv_frames)
, _fc_link(fc_link)
- , _num_send_frames(num_send_frames)
, _fc_cb(fc_cb)
{
+ _num_recv_frames = num_recv_frames;
+ _num_send_frames = num_send_frames;
}
~inline_recv_io()
{
- _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames);
+ _io_srv->disconnect_receiver(_data_link.get(), this);
if (_fc_link) {
- _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames);
+ _io_srv->disconnect_sender(_fc_link.get());
}
}
frame_buff::uptr get_recv_buff(int32_t timeout_ms)
{
- return _io_srv->recv(this, _data_link.get(), timeout_ms);
+ auto buff = _io_srv->recv(this, _data_link.get(), timeout_ms);
+ if (buff) {
+ _num_frames_in_use++;
+ assert(_num_frames_in_use <= _num_recv_frames);
+ }
+ return buff;
}
void release_recv_buff(frame_buff::uptr buff)
{
_fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get());
+ _num_frames_in_use--;
}
private:
inline_io_service::sptr _io_srv;
recv_link_if::sptr _data_link;
- size_t _num_recv_frames;
send_link_if::sptr _fc_link;
- size_t _num_send_frames;
fc_callback_t _fc_cb;
+ size_t _num_frames_in_use = 0;
};
class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb
@@ -208,18 +219,18 @@ public:
: inline_recv_cb(fc_cb, send_link.get())
, _io_srv(io_srv)
, _send_link(send_link)
- , _num_send_frames(num_send_frames)
, _send_cb(send_cb)
, _recv_link(recv_link)
- , _num_recv_frames(num_recv_frames)
{
+ _num_recv_frames = num_recv_frames;
+ _num_send_frames = num_send_frames;
}
~inline_send_io()
{
- _io_srv->disconnect_sender(_send_link.get(), _num_send_frames);
+ _io_srv->disconnect_sender(_send_link.get());
if (_recv_link) {
- _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames);
+ _io_srv->disconnect_receiver(_recv_link.get(), this);
}
}
@@ -228,6 +239,8 @@ public:
/* Check initial flow control result */
frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms);
if (buff) {
+ _num_frames_in_use++;
+ assert(_num_frames_in_use <= _num_send_frames);
return frame_buff::uptr(std::move(buff));
}
return frame_buff::uptr();
@@ -241,16 +254,16 @@ public:
}
_send_cb(buff, _send_link.get());
}
+ _num_frames_in_use--;
}
private:
inline_io_service::sptr _io_srv;
send_link_if::sptr _send_link;
- size_t _num_send_frames;
send_callback_t _send_cb;
recv_link_if::sptr _recv_link;
- size_t _num_recv_frames;
recv_callback_t _recv_cb;
+ size_t _num_frames_in_use = 0;
};
inline_io_service::~inline_io_service(){};
@@ -260,7 +273,7 @@ void inline_io_service::attach_recv_link(recv_link_if::sptr link)
auto link_ptr = link.get();
UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0);
_recv_tbl[link_ptr] =
- std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>(nullptr, nullptr, 0);
+ std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr);
_recv_links.push_back(link);
};
@@ -272,9 +285,11 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin
recv_io_if::fc_callback_t fc_cb)
{
UHD_ASSERT_THROW(data_link);
+ UHD_ASSERT_THROW(num_recv_frames > 0);
UHD_ASSERT_THROW(cb);
if (fc_link) {
UHD_ASSERT_THROW(fc_cb);
+ UHD_ASSERT_THROW(num_send_frames > 0);
connect_sender(fc_link.get(), num_send_frames);
}
sptr io_srv = shared_from_this();
@@ -286,9 +301,7 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin
void inline_io_service::attach_send_link(send_link_if::sptr link)
{
- auto link_ptr = link.get();
- UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0);
- _send_tbl[link_ptr] = 0;
+ UHD_ASSERT_THROW(std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end());
_send_links.push_back(link);
};
@@ -300,6 +313,7 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
recv_callback_t recv_cb)
{
UHD_ASSERT_THROW(send_link);
+ UHD_ASSERT_THROW(num_send_frames > 0);
UHD_ASSERT_THROW(send_cb);
connect_sender(send_link.get(), num_send_frames);
sptr io_srv = shared_from_this();
@@ -307,37 +321,37 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin
io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
if (recv_link) {
UHD_ASSERT_THROW(recv_cb);
+ UHD_ASSERT_THROW(num_recv_frames > 0);
connect_receiver(recv_link.get(), send_io.get(), num_recv_frames);
}
return send_io;
}
/*
- * Senders are free to mux a send_link, but the total reserved send_frames
- * must be less than or equal to the link's capacity
+ * This I/O service does not check frame reservations strictly since frames can
+ * be shared by multiple clients as long as they are not in use at the same
+ * time.
*/
void inline_io_service::connect_sender(send_link_if* link, size_t num_frames)
{
- size_t rsvd_frames = _send_tbl.at(link);
size_t frame_capacity = link->get_num_send_frames();
- UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames);
- _send_tbl[link] = rsvd_frames + num_frames;
+ UHD_ASSERT_THROW(frame_capacity >= num_frames);
}
-void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames)
+void inline_io_service::disconnect_sender(send_link_if* /*link*/)
{
- size_t rsvd_frames = _send_tbl.at(link);
- UHD_ASSERT_THROW(rsvd_frames >= num_frames);
- _send_tbl[link] = rsvd_frames - num_frames;
+ // No-op
}
void inline_io_service::connect_receiver(
recv_link_if* link, inline_recv_cb* cb, size_t num_frames)
{
+ size_t capacity = link->get_num_recv_frames();
+ UHD_ASSERT_THROW(num_frames <= capacity);
+
inline_recv_mux* mux;
inline_recv_cb* rcvr;
- size_t rsvd_frames;
- std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link);
+ std::tie(mux, rcvr) = _recv_tbl.at(link);
if (mux) {
mux->connect(cb);
} else if (rcvr) {
@@ -348,19 +362,15 @@ void inline_io_service::connect_receiver(
} else {
rcvr = cb;
}
- size_t capacity = link->get_num_recv_frames();
- UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity);
- _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames);
+ _recv_tbl[link] = std::make_tuple(mux, rcvr);
}
void inline_io_service::disconnect_receiver(
- recv_link_if* link, inline_recv_cb* cb, size_t num_frames)
+ recv_link_if* link, inline_recv_cb* cb)
{
inline_recv_mux* mux;
inline_recv_cb* rcvr;
- size_t rsvd_frames;
- std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link);
- UHD_ASSERT_THROW(rsvd_frames >= num_frames);
+ std::tie(mux, rcvr) = _recv_tbl.at(link);
if (mux) {
mux->disconnect(cb);
if (mux->is_empty()) {
@@ -370,7 +380,7 @@ void inline_io_service::disconnect_receiver(
} else {
rcvr = nullptr;
}
- _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames);
+ _recv_tbl[link] = std::make_tuple(mux, rcvr);
}
frame_buff::uptr inline_io_service::recv(
@@ -378,8 +388,7 @@ frame_buff::uptr inline_io_service::recv(
{
inline_recv_mux* mux;
inline_recv_cb* rcvr;
- size_t num_frames;
- std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link);
+ std::tie(mux, rcvr) = _recv_tbl.at(recv_link);
if (mux) {
/* Defer to mux's recv() if present */