From 110527f96b8c83de47d25cdf14474e7eeba5fedb Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Tue, 28 May 2019 14:55:29 -0700 Subject: transport: Implement a single-threaded I/O service The inline_io_service connects transports to links without any worker threads. Send operations go directly to the link, and recv will perform the I/O as part of the get_recv_buffer() call. The inline_io_service also supports muxed links natively. The receive mux is entirely inline. There is no separate thread for the inline_io_service, and that continues here. A queue is created for each client of the mux, and packets are processed as they come in. If a packet is to go up to a different client, the packet is queued up for later. When that client attempts to recv(), the queue is checked first, and the attempts to receive from the link happen ONLY if no packet was found. Also add mock transport to test I/O service APIs. Tests I/O service construction and some basic packet transmision. One case will also uses a single link that is shared between the send and recv transports. That link is muxed between two compatible but different transports. --- host/lib/transport/CMakeLists.txt | 1 + host/lib/transport/inline_io_service.cpp | 415 +++++++++++++++++++++++++++++++ 2 files changed, 416 insertions(+) create mode 100644 host/lib/transport/inline_io_service.cpp (limited to 'host/lib/transport') diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index df94f42be..a9663c89a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -121,6 +121,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp ) if(ENABLE_X300) diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp new file mode 100644 index 000000000..72acea738 --- /dev/null +++ b/host/lib/transport/inline_io_service.cpp @@ -0,0 +1,415 @@ +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +/*! + * Interface class for unifying callback processing between both inline_send_io + * and inline_recv_io + */ +class inline_recv_cb +{ +public: + /*! + * Function to call the callback method + * + * \param buff buffer received + * \param recv_link pointer to recv link used with the callback + * \return whether the packet was destined for this callback + */ + UHD_FORCE_INLINE bool callback(frame_buff::uptr& buff, recv_link_if* recv_link) + { + return _recv_cb(buff, recv_link, _cb_send_link); + } + +protected: + inline_recv_cb(recv_callback_t cb, send_link_if* send_link) + : _recv_cb(cb), _cb_send_link(send_link) + { + } + + recv_callback_t _recv_cb; + // pointer to send link used with the callback + send_link_if* _cb_send_link; +}; + +/*! + * Mux class that intercepts packets from the link and distributes them to + * queues for each client that is not the caller of the recv() function + */ +class inline_recv_mux +{ +public: + inline_recv_mux(recv_link_if* link) : _link(link){}; + + ~inline_recv_mux(){}; + + /*! + * Connect a new receiver to the recv link + * + * \param cb pointer to the callback for the receiver + */ + void connect(inline_recv_cb* cb) + { + UHD_ASSERT_THROW(_queues.count(cb) == 0); + /* Always create queue of max size, since we don't know when there are + * virtual channels (which share frames) + */ + auto queue = + new boost::circular_buffer(_link->get_num_recv_frames()); + _queues[cb] = queue; + _callbacks.push_back(cb); + } + + /*! + * Disconnect a receiver currently connected to the recv link + * \param cb a pointer to the callback to disconnect + */ + void disconnect(inline_recv_cb* cb) + { + auto queue = _queues.at(cb); + while (!queue->empty()) { + frame_buff* buff = queue->front(); + _link->release_recv_buff(frame_buff::uptr(buff)); + queue->pop_front(); + } + delete queue; + _queues.erase(cb); + _callbacks.remove(cb); + } + + /*! + * Check if there are callbacks registered to this mux + * \return whether there are no callbacks registered + */ + UHD_FORCE_INLINE bool is_empty(void) const + { + return _callbacks.empty(); + } + + /*! + * Do receive processing for the mux + * \param cb the callback that is currently seeking a buffer + * \param recv_link the link to do recv on + * \param timeout_ms the timeout for the recv operation + * \return a frame_buff with data if a packet was received (else empty) + */ + frame_buff::uptr recv(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + { + auto queue = _queues.at(cb); + if (!queue->empty()) { + frame_buff* buff = queue->front(); + queue->pop_front(); + return frame_buff::uptr(buff); + } + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + bool rcvr_found = false; + for (auto& rcvr : _callbacks) { + if (rcvr->callback(buff, recv_link)) { + rcvr_found = true; + if (buff) { + if (rcvr == cb) { + return frame_buff::uptr(std::move(buff)); + } else { + /* NOTE: Should not overflow, by construction + * Every queue can hold link->get_num_recv_frames() + */ + _queues[rcvr]->push_back(buff.release()); + } + } + /* Continue looping if buffer was consumed */ + break; + } + } + if (not rcvr_found) { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return frame_buff::uptr(); + } + } + } + +private: + recv_link_if* _link; + std::list _callbacks; + std::unordered_map*> _queues; +}; + +class inline_recv_io : public virtual recv_io_if, public virtual inline_recv_cb +{ +public: + using sptr = std::shared_ptr; + + inline_recv_io(inline_io_service::sptr io_srv, + recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + fc_callback_t fc_cb) + : inline_recv_cb(recv_cb, fc_link.get()) + , _io_srv(io_srv) + , _data_link(data_link) + , _num_recv_frames(num_recv_frames) + , _fc_link(fc_link) + , _num_send_frames(num_send_frames) + , _fc_cb(fc_cb) + { + } + + ~inline_recv_io() + { + _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames); + if (_fc_link) { + _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames); + } + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + return _io_srv->recv(this, _data_link.get(), timeout_ms); + } + + void release_recv_buff(frame_buff::uptr buff) + { + _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get()); + } + +private: + inline_io_service::sptr _io_srv; + recv_link_if::sptr _data_link; + size_t _num_recv_frames; + send_link_if::sptr _fc_link; + size_t _num_send_frames; + fc_callback_t _fc_cb; +}; + +class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb +{ +public: + using sptr = std::shared_ptr; + + inline_send_io(inline_io_service::sptr io_srv, + send_link_if::sptr send_link, + size_t num_send_frames, + send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t fc_cb) + : inline_recv_cb(fc_cb, send_link.get()) + , _io_srv(io_srv) + , _send_link(send_link) + , _num_send_frames(num_send_frames) + , _send_cb(send_cb) + , _recv_link(recv_link) + , _num_recv_frames(num_recv_frames) + { + } + + ~inline_send_io() + { + _io_srv->disconnect_sender(_send_link.get(), _num_send_frames); + if (_recv_link) { + _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames); + } + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + /* Check initial flow control result */ + frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); + if (buff) { + return frame_buff::uptr(std::move(buff)); + } + return frame_buff::uptr(); + } + + void release_send_buff(frame_buff::uptr buff) + { + while (buff) { /* TODO: Possibly don't loop indefinitely here */ + if (_recv_link) { + _io_srv->recv(this, _recv_link.get(), 0); + } + _send_cb(buff, _send_link.get()); + } + } + +private: + inline_io_service::sptr _io_srv; + send_link_if::sptr _send_link; + size_t _num_send_frames; + send_callback_t _send_cb; + recv_link_if::sptr _recv_link; + size_t _num_recv_frames; + recv_callback_t _recv_cb; +}; + +inline_io_service::~inline_io_service(){}; + +void inline_io_service::attach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); + _recv_tbl[link_ptr] = + std::tuple(nullptr, nullptr, 0); + _recv_links.push_back(link); +}; + +recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + UHD_ASSERT_THROW(data_link); + UHD_ASSERT_THROW(num_recv_frames > 0); + UHD_ASSERT_THROW(cb); + if (fc_link) { + UHD_ASSERT_THROW(num_send_frames > 0); + UHD_ASSERT_THROW(fc_cb); + connect_sender(fc_link.get(), num_send_frames); + } + sptr io_srv = shared_from_this(); + auto recv_io = std::make_shared( + io_srv, data_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb); + connect_receiver(data_link.get(), recv_io.get(), num_recv_frames); + return recv_io; +} + +void inline_io_service::attach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0); + _send_tbl[link_ptr] = 0; + _send_links.push_back(link); +}; + +send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb) +{ + UHD_ASSERT_THROW(send_link); + UHD_ASSERT_THROW(num_send_frames > 0); + UHD_ASSERT_THROW(send_cb); + connect_sender(send_link.get(), num_send_frames); + sptr io_srv = shared_from_this(); + auto send_io = std::make_shared( + io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + if (recv_link) { + UHD_ASSERT_THROW(num_recv_frames > 0); + UHD_ASSERT_THROW(recv_cb); + connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); + } + return send_io; +} + +/* + * Senders are free to mux a send_link, but the total reserved send_frames + * must be less than or equal to the link's capacity + */ +void inline_io_service::connect_sender(send_link_if* link, size_t num_frames) +{ + size_t rsvd_frames = _send_tbl.at(link); + size_t frame_capacity = link->get_num_send_frames(); + UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames); + _send_tbl[link] = rsvd_frames + num_frames; +} + +void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames) +{ + size_t rsvd_frames = _send_tbl.at(link); + UHD_ASSERT_THROW(rsvd_frames >= num_frames); + _send_tbl[link] = rsvd_frames - num_frames; +} + +void inline_io_service::connect_receiver( + recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t rsvd_frames; + std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + if (mux) { + mux->connect(cb); + } else if (rcvr) { + mux = new inline_recv_mux(link); + mux->connect(rcvr); + mux->connect(cb); + rcvr = nullptr; + } else { + rcvr = cb; + } + size_t capacity = link->get_num_recv_frames(); + UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity); + _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames); +} + +void inline_io_service::disconnect_receiver( + recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t rsvd_frames; + std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + UHD_ASSERT_THROW(rsvd_frames >= num_frames); + if (mux) { + mux->disconnect(cb); + if (mux->is_empty()) { + delete mux; + mux = nullptr; + } + } else { + rcvr = nullptr; + } + _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames); +} + +frame_buff::uptr inline_io_service::recv( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t num_frames; + std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link); + + if (mux) { + /* Defer to mux's recv() if present */ + return mux->recv(recv_io_cb, recv_link, timeout_ms); + } else { + assert(recv_io_cb == rcvr); + } + + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + if (rcvr->callback(buff, recv_link)) { + if (buff) { + return frame_buff::uptr(std::move(buff)); + } + /* Retry receive if got buffer but it got consumed */ + } else { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return frame_buff::uptr(); + } + } + return frame_buff::uptr(); +} + +}} // namespace uhd::transport -- cgit v1.2.3