diff options
Diffstat (limited to 'host/lib/transport/inline_io_service.cpp')
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 85 |
1 files changed, 47 insertions, 38 deletions
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp index 942c99d5f..9dd0814ca 100644 --- a/host/lib/transport/inline_io_service.cpp +++ b/host/lib/transport/inline_io_service.cpp @@ -1,3 +1,9 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + #include <uhd/config.hpp> #include <uhd/exception.hpp> #include <uhd/utils/log.hpp> @@ -159,38 +165,43 @@ public: : inline_recv_cb(recv_cb, fc_link.get()) , _io_srv(io_srv) , _data_link(data_link) - , _num_recv_frames(num_recv_frames) , _fc_link(fc_link) - , _num_send_frames(num_send_frames) , _fc_cb(fc_cb) { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; } ~inline_recv_io() { - _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames); + _io_srv->disconnect_receiver(_data_link.get(), this); if (_fc_link) { - _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames); + _io_srv->disconnect_sender(_fc_link.get()); } } frame_buff::uptr get_recv_buff(int32_t timeout_ms) { - return _io_srv->recv(this, _data_link.get(), timeout_ms); + auto buff = _io_srv->recv(this, _data_link.get(), timeout_ms); + if (buff) { + _num_frames_in_use++; + assert(_num_frames_in_use <= _num_recv_frames); + } + return buff; } void release_recv_buff(frame_buff::uptr buff) { _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get()); + _num_frames_in_use--; } private: inline_io_service::sptr _io_srv; recv_link_if::sptr _data_link; - size_t _num_recv_frames; send_link_if::sptr _fc_link; - size_t _num_send_frames; fc_callback_t _fc_cb; + size_t _num_frames_in_use = 0; }; class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb @@ -208,18 +219,18 @@ public: : inline_recv_cb(fc_cb, send_link.get()) , _io_srv(io_srv) , _send_link(send_link) - , _num_send_frames(num_send_frames) , _send_cb(send_cb) , _recv_link(recv_link) - , _num_recv_frames(num_recv_frames) { + _num_recv_frames = num_recv_frames; + _num_send_frames = num_send_frames; } ~inline_send_io() { - _io_srv->disconnect_sender(_send_link.get(), _num_send_frames); + _io_srv->disconnect_sender(_send_link.get()); if (_recv_link) { - _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames); + _io_srv->disconnect_receiver(_recv_link.get(), this); } } @@ -228,6 +239,8 @@ public: /* Check initial flow control result */ frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); if (buff) { + _num_frames_in_use++; + assert(_num_frames_in_use <= _num_send_frames); return frame_buff::uptr(std::move(buff)); } return frame_buff::uptr(); @@ -241,16 +254,16 @@ public: } _send_cb(buff, _send_link.get()); } + _num_frames_in_use--; } private: inline_io_service::sptr _io_srv; send_link_if::sptr _send_link; - size_t _num_send_frames; send_callback_t _send_cb; recv_link_if::sptr _recv_link; - size_t _num_recv_frames; recv_callback_t _recv_cb; + size_t _num_frames_in_use = 0; }; inline_io_service::~inline_io_service(){}; @@ -260,7 +273,7 @@ void inline_io_service::attach_recv_link(recv_link_if::sptr link) auto link_ptr = link.get(); UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); _recv_tbl[link_ptr] = - std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>(nullptr, nullptr, 0); + std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr); _recv_links.push_back(link); }; @@ -272,9 +285,11 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin recv_io_if::fc_callback_t fc_cb) { UHD_ASSERT_THROW(data_link); + UHD_ASSERT_THROW(num_recv_frames > 0); UHD_ASSERT_THROW(cb); if (fc_link) { UHD_ASSERT_THROW(fc_cb); + UHD_ASSERT_THROW(num_send_frames > 0); connect_sender(fc_link.get(), num_send_frames); } sptr io_srv = shared_from_this(); @@ -286,9 +301,7 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin void inline_io_service::attach_send_link(send_link_if::sptr link) { - auto link_ptr = link.get(); - UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0); - _send_tbl[link_ptr] = 0; + UHD_ASSERT_THROW(std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end()); _send_links.push_back(link); }; @@ -300,6 +313,7 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin recv_callback_t recv_cb) { UHD_ASSERT_THROW(send_link); + UHD_ASSERT_THROW(num_send_frames > 0); UHD_ASSERT_THROW(send_cb); connect_sender(send_link.get(), num_send_frames); sptr io_srv = shared_from_this(); @@ -307,37 +321,37 @@ send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_lin io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); if (recv_link) { UHD_ASSERT_THROW(recv_cb); + UHD_ASSERT_THROW(num_recv_frames > 0); connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); } return send_io; } /* - * Senders are free to mux a send_link, but the total reserved send_frames - * must be less than or equal to the link's capacity + * This I/O service does not check frame reservations strictly since frames can + * be shared by multiple clients as long as they are not in use at the same + * time. */ void inline_io_service::connect_sender(send_link_if* link, size_t num_frames) { - size_t rsvd_frames = _send_tbl.at(link); size_t frame_capacity = link->get_num_send_frames(); - UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames); - _send_tbl[link] = rsvd_frames + num_frames; + UHD_ASSERT_THROW(frame_capacity >= num_frames); } -void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames) +void inline_io_service::disconnect_sender(send_link_if* /*link*/) { - size_t rsvd_frames = _send_tbl.at(link); - UHD_ASSERT_THROW(rsvd_frames >= num_frames); - _send_tbl[link] = rsvd_frames - num_frames; + // No-op } void inline_io_service::connect_receiver( recv_link_if* link, inline_recv_cb* cb, size_t num_frames) { + size_t capacity = link->get_num_recv_frames(); + UHD_ASSERT_THROW(num_frames <= capacity); + inline_recv_mux* mux; inline_recv_cb* rcvr; - size_t rsvd_frames; - std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + std::tie(mux, rcvr) = _recv_tbl.at(link); if (mux) { mux->connect(cb); } else if (rcvr) { @@ -348,19 +362,15 @@ void inline_io_service::connect_receiver( } else { rcvr = cb; } - size_t capacity = link->get_num_recv_frames(); - UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity); - _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames); + _recv_tbl[link] = std::make_tuple(mux, rcvr); } void inline_io_service::disconnect_receiver( - recv_link_if* link, inline_recv_cb* cb, size_t num_frames) + recv_link_if* link, inline_recv_cb* cb) { inline_recv_mux* mux; inline_recv_cb* rcvr; - size_t rsvd_frames; - std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); - UHD_ASSERT_THROW(rsvd_frames >= num_frames); + std::tie(mux, rcvr) = _recv_tbl.at(link); if (mux) { mux->disconnect(cb); if (mux->is_empty()) { @@ -370,7 +380,7 @@ void inline_io_service::disconnect_receiver( } else { rcvr = nullptr; } - _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames); + _recv_tbl[link] = std::make_tuple(mux, rcvr); } frame_buff::uptr inline_io_service::recv( @@ -378,8 +388,7 @@ frame_buff::uptr inline_io_service::recv( { inline_recv_mux* mux; inline_recv_cb* rcvr; - size_t num_frames; - std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link); + std::tie(mux, rcvr) = _recv_tbl.at(recv_link); if (mux) { /* Defer to mux's recv() if present */ |