From 110527f96b8c83de47d25cdf14474e7eeba5fedb Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Tue, 28 May 2019 14:55:29 -0700 Subject: transport: Implement a single-threaded I/O service The inline_io_service connects transports to links without any worker threads. Send operations go directly to the link, and recv will perform the I/O as part of the get_recv_buffer() call. The inline_io_service also supports muxed links natively. The receive mux is entirely inline. There is no separate thread for the inline_io_service, and that continues here. A queue is created for each client of the mux, and packets are processed as they come in. If a packet is to go up to a different client, the packet is queued up for later. When that client attempts to recv(), the queue is checked first, and the attempts to receive from the link happen ONLY if no packet was found. Also add mock transport to test I/O service APIs. Tests I/O service construction and some basic packet transmision. One case will also uses a single link that is shared between the send and recv transports. That link is muxed between two compatible but different transports. --- .../include/uhdlib/transport/inline_io_service.hpp | 121 ++++++ host/lib/transport/CMakeLists.txt | 1 + host/lib/transport/inline_io_service.cpp | 415 +++++++++++++++++++++ host/tests/CMakeLists.txt | 6 + host/tests/common/mock_transport.hpp | 369 ++++++++++++++++++ host/tests/transport_test.cpp | 188 ++++++++++ 6 files changed, 1100 insertions(+) create mode 100644 host/lib/include/uhdlib/transport/inline_io_service.hpp create mode 100644 host/lib/transport/inline_io_service.cpp create mode 100644 host/tests/common/mock_transport.hpp create mode 100644 host/tests/transport_test.cpp (limited to 'host') diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp new file mode 100644 index 000000000..f10e7018d --- /dev/null +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -0,0 +1,121 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP +#define INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP + +#include +#include +#include + +namespace uhd { namespace transport { + +class inline_recv_mux; +class inline_recv_cb; + +/*! + * Single-threaded I/O service + * Note this is not an appropriate io_service to use with polling-mode drivers, + * since such drivers require a thread to poll them and not block (i.e. + * timeouts are not allowed at the link interface) + */ +class inline_io_service : public virtual io_service, + public std::enable_shared_from_this +{ +public: + using sptr = std::shared_ptr; + static sptr make(void) + { + return sptr(new inline_io_service()); + } + + ~inline_io_service(); + + void attach_recv_link(recv_link_if::sptr link); + void attach_send_link(send_link_if::sptr link); + + recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb); + + send_io_if::sptr make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb); + +private: + friend class inline_recv_io; + friend class inline_send_io; + + inline_io_service() = default; + inline_io_service(const inline_io_service&) = delete; + + /*! + * Senders are free to mux a send_link, but the total reserved send_frames + * must be less than or equal to the link's capacity + * + * \param link the link used for sending data + * \param num_frames number of frames to reserve for this connection + */ + void connect_sender(send_link_if* link, size_t num_frames); + + /*! + * Disconnect the sender and free resources + * + * \param link the link that was used for sending data + * \param num_frames number of frames to release (same as reservation) + */ + void disconnect_sender(send_link_if* link, size_t num_frames); + + /*! + * Connect a receiver to the link and reserve resources + * \param link the recv link to use for getting data + * \param cb a callback for processing received data + * \param num_frames the number of frames to reserve for this receiver + */ + void connect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + + /*! + * Disconnect the receiver from the provided link and free resources + * \param link the recv link that was used for reception + * \param cb the callback to disassociate + * \param num_frames the number of frames that was reserved for the cb + */ + void disconnect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + + /* + * Function to perform recv operations on a link, which is potentially + * muxed. Packets are forwarded to the appropriate mux or callback. + * + * \param recv_io_cb the callback+interface initiating the operation + * \param recv_link link to perform receive on + * \param timeout_ms timeout to wait for a buffer on the link + * \return a frame_buff uptr with either a buffer with data or no buffer + */ + frame_buff::uptr recv( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); + + /* Track whether link is muxed, the callback, and buffer reservations */ + std::unordered_map> + _recv_tbl; + + /* Track how many send_frames have been reserved for each link */ + std::unordered_map _send_tbl; + + /* Shared ptr kept to avoid untimely release */ + std::list _send_links; + std::list _recv_links; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index df94f42be..a9663c89a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -121,6 +121,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp ) if(ENABLE_X300) diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp new file mode 100644 index 000000000..72acea738 --- /dev/null +++ b/host/lib/transport/inline_io_service.cpp @@ -0,0 +1,415 @@ +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +/*! + * Interface class for unifying callback processing between both inline_send_io + * and inline_recv_io + */ +class inline_recv_cb +{ +public: + /*! + * Function to call the callback method + * + * \param buff buffer received + * \param recv_link pointer to recv link used with the callback + * \return whether the packet was destined for this callback + */ + UHD_FORCE_INLINE bool callback(frame_buff::uptr& buff, recv_link_if* recv_link) + { + return _recv_cb(buff, recv_link, _cb_send_link); + } + +protected: + inline_recv_cb(recv_callback_t cb, send_link_if* send_link) + : _recv_cb(cb), _cb_send_link(send_link) + { + } + + recv_callback_t _recv_cb; + // pointer to send link used with the callback + send_link_if* _cb_send_link; +}; + +/*! + * Mux class that intercepts packets from the link and distributes them to + * queues for each client that is not the caller of the recv() function + */ +class inline_recv_mux +{ +public: + inline_recv_mux(recv_link_if* link) : _link(link){}; + + ~inline_recv_mux(){}; + + /*! + * Connect a new receiver to the recv link + * + * \param cb pointer to the callback for the receiver + */ + void connect(inline_recv_cb* cb) + { + UHD_ASSERT_THROW(_queues.count(cb) == 0); + /* Always create queue of max size, since we don't know when there are + * virtual channels (which share frames) + */ + auto queue = + new boost::circular_buffer(_link->get_num_recv_frames()); + _queues[cb] = queue; + _callbacks.push_back(cb); + } + + /*! + * Disconnect a receiver currently connected to the recv link + * \param cb a pointer to the callback to disconnect + */ + void disconnect(inline_recv_cb* cb) + { + auto queue = _queues.at(cb); + while (!queue->empty()) { + frame_buff* buff = queue->front(); + _link->release_recv_buff(frame_buff::uptr(buff)); + queue->pop_front(); + } + delete queue; + _queues.erase(cb); + _callbacks.remove(cb); + } + + /*! + * Check if there are callbacks registered to this mux + * \return whether there are no callbacks registered + */ + UHD_FORCE_INLINE bool is_empty(void) const + { + return _callbacks.empty(); + } + + /*! + * Do receive processing for the mux + * \param cb the callback that is currently seeking a buffer + * \param recv_link the link to do recv on + * \param timeout_ms the timeout for the recv operation + * \return a frame_buff with data if a packet was received (else empty) + */ + frame_buff::uptr recv(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + { + auto queue = _queues.at(cb); + if (!queue->empty()) { + frame_buff* buff = queue->front(); + queue->pop_front(); + return frame_buff::uptr(buff); + } + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + bool rcvr_found = false; + for (auto& rcvr : _callbacks) { + if (rcvr->callback(buff, recv_link)) { + rcvr_found = true; + if (buff) { + if (rcvr == cb) { + return frame_buff::uptr(std::move(buff)); + } else { + /* NOTE: Should not overflow, by construction + * Every queue can hold link->get_num_recv_frames() + */ + _queues[rcvr]->push_back(buff.release()); + } + } + /* Continue looping if buffer was consumed */ + break; + } + } + if (not rcvr_found) { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return frame_buff::uptr(); + } + } + } + +private: + recv_link_if* _link; + std::list _callbacks; + std::unordered_map*> _queues; +}; + +class inline_recv_io : public virtual recv_io_if, public virtual inline_recv_cb +{ +public: + using sptr = std::shared_ptr; + + inline_recv_io(inline_io_service::sptr io_srv, + recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + fc_callback_t fc_cb) + : inline_recv_cb(recv_cb, fc_link.get()) + , _io_srv(io_srv) + , _data_link(data_link) + , _num_recv_frames(num_recv_frames) + , _fc_link(fc_link) + , _num_send_frames(num_send_frames) + , _fc_cb(fc_cb) + { + } + + ~inline_recv_io() + { + _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames); + if (_fc_link) { + _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames); + } + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + return _io_srv->recv(this, _data_link.get(), timeout_ms); + } + + void release_recv_buff(frame_buff::uptr buff) + { + _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get()); + } + +private: + inline_io_service::sptr _io_srv; + recv_link_if::sptr _data_link; + size_t _num_recv_frames; + send_link_if::sptr _fc_link; + size_t _num_send_frames; + fc_callback_t _fc_cb; +}; + +class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb +{ +public: + using sptr = std::shared_ptr; + + inline_send_io(inline_io_service::sptr io_srv, + send_link_if::sptr send_link, + size_t num_send_frames, + send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t fc_cb) + : inline_recv_cb(fc_cb, send_link.get()) + , _io_srv(io_srv) + , _send_link(send_link) + , _num_send_frames(num_send_frames) + , _send_cb(send_cb) + , _recv_link(recv_link) + , _num_recv_frames(num_recv_frames) + { + } + + ~inline_send_io() + { + _io_srv->disconnect_sender(_send_link.get(), _num_send_frames); + if (_recv_link) { + _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames); + } + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + /* Check initial flow control result */ + frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); + if (buff) { + return frame_buff::uptr(std::move(buff)); + } + return frame_buff::uptr(); + } + + void release_send_buff(frame_buff::uptr buff) + { + while (buff) { /* TODO: Possibly don't loop indefinitely here */ + if (_recv_link) { + _io_srv->recv(this, _recv_link.get(), 0); + } + _send_cb(buff, _send_link.get()); + } + } + +private: + inline_io_service::sptr _io_srv; + send_link_if::sptr _send_link; + size_t _num_send_frames; + send_callback_t _send_cb; + recv_link_if::sptr _recv_link; + size_t _num_recv_frames; + recv_callback_t _recv_cb; +}; + +inline_io_service::~inline_io_service(){}; + +void inline_io_service::attach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); + _recv_tbl[link_ptr] = + std::tuple(nullptr, nullptr, 0); + _recv_links.push_back(link); +}; + +recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + UHD_ASSERT_THROW(data_link); + UHD_ASSERT_THROW(num_recv_frames > 0); + UHD_ASSERT_THROW(cb); + if (fc_link) { + UHD_ASSERT_THROW(num_send_frames > 0); + UHD_ASSERT_THROW(fc_cb); + connect_sender(fc_link.get(), num_send_frames); + } + sptr io_srv = shared_from_this(); + auto recv_io = std::make_shared( + io_srv, data_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb); + connect_receiver(data_link.get(), recv_io.get(), num_recv_frames); + return recv_io; +} + +void inline_io_service::attach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0); + _send_tbl[link_ptr] = 0; + _send_links.push_back(link); +}; + +send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb) +{ + UHD_ASSERT_THROW(send_link); + UHD_ASSERT_THROW(num_send_frames > 0); + UHD_ASSERT_THROW(send_cb); + connect_sender(send_link.get(), num_send_frames); + sptr io_srv = shared_from_this(); + auto send_io = std::make_shared( + io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + if (recv_link) { + UHD_ASSERT_THROW(num_recv_frames > 0); + UHD_ASSERT_THROW(recv_cb); + connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); + } + return send_io; +} + +/* + * Senders are free to mux a send_link, but the total reserved send_frames + * must be less than or equal to the link's capacity + */ +void inline_io_service::connect_sender(send_link_if* link, size_t num_frames) +{ + size_t rsvd_frames = _send_tbl.at(link); + size_t frame_capacity = link->get_num_send_frames(); + UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames); + _send_tbl[link] = rsvd_frames + num_frames; +} + +void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames) +{ + size_t rsvd_frames = _send_tbl.at(link); + UHD_ASSERT_THROW(rsvd_frames >= num_frames); + _send_tbl[link] = rsvd_frames - num_frames; +} + +void inline_io_service::connect_receiver( + recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t rsvd_frames; + std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + if (mux) { + mux->connect(cb); + } else if (rcvr) { + mux = new inline_recv_mux(link); + mux->connect(rcvr); + mux->connect(cb); + rcvr = nullptr; + } else { + rcvr = cb; + } + size_t capacity = link->get_num_recv_frames(); + UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity); + _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames); +} + +void inline_io_service::disconnect_receiver( + recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t rsvd_frames; + std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + UHD_ASSERT_THROW(rsvd_frames >= num_frames); + if (mux) { + mux->disconnect(cb); + if (mux->is_empty()) { + delete mux; + mux = nullptr; + } + } else { + rcvr = nullptr; + } + _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames); +} + +frame_buff::uptr inline_io_service::recv( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t num_frames; + std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link); + + if (mux) { + /* Defer to mux's recv() if present */ + return mux->recv(recv_io_cb, recv_link, timeout_ms); + } else { + assert(recv_io_cb == rcvr); + } + + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + if (rcvr->callback(buff, recv_link)) { + if (buff) { + return frame_buff::uptr(std::move(buff)); + } + /* Retry receive if got buffer but it got consumed */ + } else { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return frame_buff::uptr(); + } + } + return frame_buff::uptr(); +} + +}} // namespace uhd::transport diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 431c380c3..0c48ae058 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -252,6 +252,12 @@ UHD_ADD_NONAPI_TEST( ${CMAKE_SOURCE_DIR}/lib/usrp/cores/dsp_core_utils.cpp ) +UHD_ADD_NONAPI_TEST( + TARGET "transport_test.cpp" + EXTRA_SOURCES + ${CMAKE_SOURCE_DIR}/lib/transport/inline_io_service.cpp +) + ######################################################################## # demo of a loadable module ######################################################################## diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp new file mode 100644 index 000000000..321f22830 --- /dev/null +++ b/host/tests/common/mock_transport.hpp @@ -0,0 +1,369 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP +#define INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP + +#include +#include +#include + +namespace uhd { namespace transport { + +namespace { +constexpr size_t ADDR_OFFSET = 0; +constexpr size_t TYPE_OFFSET = 1; /* 0 for data, 1 for FC, 2 for msg */ +constexpr size_t SEQNO_OFFSET = 2; /* For FC, this is last seen seqno */ +constexpr size_t LEN_OFFSET = 3; +constexpr size_t DATA_OFFSET = 4; +constexpr size_t MSG_BUFFS = 8; +}; // namespace +/*! + * Mock transport with following packet format: + * Data: [dst_addr/src_addr, type, seqno, data_len, data...] + * FC: [dst_addr/src_addr, type, ackno] + * Msg: [dst_addr/src_addr, type, null, data] + * All fields are 32-bit words (dst_addr and src_addr are 16 bits each) + */ +class mock_send_transport +{ +public: + using sptr = std::shared_ptr; + + mock_send_transport(io_service::sptr io_srv, + send_link_if::sptr send_link, + recv_link_if::sptr recv_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) + : _credits(credits) + { + _send_addr = (dst_addr << 16) | (src_addr << 0); + _recv_addr = (src_addr << 16) | (dst_addr << 0); + + /* Make message client for sending side-band messages */ + send_io_if::send_callback_t msg_send_cb = [this](frame_buff::uptr& buff, + send_link_if* link) { + uint32_t* data = (uint32_t*)buff->data(); + data[ADDR_OFFSET] = this->_send_addr; + data[TYPE_OFFSET] = 2; /* MSG type */ + link->release_send_buff(std::move(buff)); + }; + _msg_if = io_srv->make_send_client( + send_link, MSG_BUFFS, msg_send_cb, recv_link_if::sptr(), 0, nullptr); + + /* Make client for sending streaming data */ + send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, + send_link_if* link) { + this->send_buff(buff, link); + }; + recv_callback_t recv_cb = [this](frame_buff::uptr& buff, + recv_link_if* link, + send_link_if* /*send_link*/) { + return this->recv_buff(buff, link); + }; + /* Pretend get 1 flow control message per sent packet */ + _send_if = io_srv->make_send_client( + send_link, credits, send_cb, recv_link, credits, recv_cb); + } + + ~mock_send_transport() {} + + /*! + * Get a buffer for creating a non-flow-controlled message + */ + bool put_msg(uint32_t msg, int32_t timeout_ms) + { + frame_buff::uptr buff = _msg_if->get_send_buff(timeout_ms); + if (!buff) { + return false; + } + uint32_t* data = (uint32_t*)buff->data(); + data[TYPE_OFFSET] = 2; + data[DATA_OFFSET] = msg; + buff->set_packet_size((1 + DATA_OFFSET) * sizeof(uint32_t)); + _msg_if->release_send_buff(std::move(buff)); + return true; + } + + /*! + * Get an empty frame buffer in which to write packet contents. + * + * \param timeout_ms a positive timeout value specifies the maximum number + of ms to wait, a negative value specifies to block + until successful, and a value of 0 specifies no wait. + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_data_buff(int32_t timeout_ms) + { + frame_buff::uptr buff = _send_if->get_send_buff(timeout_ms); + if (!buff) { + return frame_buff::uptr(); + } + uint32_t* data = (uint32_t*)buff->data(); + data[TYPE_OFFSET] = 0; + return frame_buff::uptr(std::move(buff)); + } + + /*! + * Release a frame buffer, allowing the driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_data_buff(frame_buff::uptr& buff, size_t len) + { + if (len == 0) { + _send_if->release_send_buff(std::move(buff)); + return; + } + uint32_t* data = (uint32_t*)buff->data(); + data[LEN_OFFSET] = len; + buff->set_packet_size((len + DATA_OFFSET) * sizeof(uint32_t)); + _send_if->release_send_buff(std::move(buff)); + } + + /*! + * Callback for sending the packet. Callback is responsible for calling + * release_send_buff() if it wants to send the packet. This will require + * moving the uptr's reference. If the packet will NOT be sent, the + * callback must NOT release the uptr. + * + * Function should update any internal state needed. For example, flow + * control state could be updated here, and the header could be filled out + * as well, like the packet's sequence number and/or addresses. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + */ + void send_buff(frame_buff::uptr& buff, send_link_if* send_link) + { + if (_seqno >= _ackno + _credits) { + return; + } + uint32_t* data = (uint32_t*)buff->data(); + data[ADDR_OFFSET] = _send_addr; + data[SEQNO_OFFSET] = _seqno; + send_link->release_send_buff(std::move(buff)); + _seqno++; + } + + /*! + * Callback for when packets are received (for processing). + * Function should make a determination of whether the packet belongs to it + * and return the bool. + * + * Function may consume and release the buffer internally (if packet was + * destined for it). The recv_link_if may be used to release it, and the + * provided frame_buff::uptr must relinquish ownership before returning. + * If the buffer was not destined for the user of this function, buffer must + * NOT be released, and the uptr must remain intact. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + * + * \param frame_buff the buffer that was received + * \param recv_link_if the link used to retrieve the buffer. Can be used to + * release the buffer back to the link, if buffer is consumed internally. + * \return true if buffer matched this transport, false otherwise + */ + bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link) + { + /* Check address and if no match, return false */ + uint32_t* data = (uint32_t*)buff->data(); + if (data[ADDR_OFFSET] != _recv_addr) { + return false; + } + if (data[TYPE_OFFSET] == 1) { /* Flow control message */ + _ackno = data[SEQNO_OFFSET]; + } + if (data[TYPE_OFFSET] != 0) { /* Only data packets go up to user */ + recv_link->release_recv_buff(std::move(buff)); + } else { /* mock_send_transport does not receive data packets */ + return false; + } + return true; + } + + std::pair buff_to_data(frame_buff* buff) + { + uint32_t* data = (uint32_t*)buff->data(); + size_t data_len = buff->packet_size() - DATA_OFFSET * sizeof(uint32_t); + return std::pair(&data[DATA_OFFSET], data_len); + } + +private: + uint32_t _send_addr; + uint32_t _recv_addr; + uint32_t _credits; + send_io_if::sptr _msg_if; + send_io_if::sptr _send_if; + uint32_t _seqno = 0; + uint32_t _ackno = 0; +}; + +/*! + * Mock transport with following packet format: + * Data: [dst_addr/src_addr, type, seqno, data_len, data...] + * FC: [dst_addr/src_addr, type, ackno] + * Msg: [dst_addr/src_addr, type, seqno, data_len, data...] + * All fields are 32-bit words (dst_addr and src_addr are 16 bits each) + */ +class mock_recv_transport +{ +public: + using sptr = std::shared_ptr; + + mock_recv_transport(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) + : _credits(credits) + { + _send_addr = (src_addr << 16) | (dst_addr << 0); + _recv_addr = (dst_addr << 16) | (src_addr << 0); + + /* Make client for sending streaming data */ + recv_io_if::fc_callback_t send_cb = [this](frame_buff::uptr buff, + recv_link_if* recv_link, + send_link_if* send_link) { + this->handle_flow_ctrl(std::move(buff), recv_link, send_link); + }; + recv_callback_t recv_cb = [this](frame_buff::uptr& buff, + recv_link_if* link, + send_link_if* /*send_link*/) { + return this->recv_buff(buff, link); + }; + /* Pretend get 1 flow control message per sent packet */ + _recv_if = io_srv->make_recv_client( + recv_link, credits, recv_cb, send_link, credits, send_cb); + } + + ~mock_recv_transport() {} + + /*! + * Get a buffer for creating a non-flow-controlled message + */ + bool get_msg(uint32_t& msg) + { + if (_msg_queue.read_available()) { + msg = _msg_queue.front(); + _msg_queue.pop(); + return true; + } + return false; + } + + /*! + * Get an empty frame buffer in which to write packet contents. + * + * \param timeout_ms a positive timeout value specifies the maximum number + of ms to wait, a negative value specifies to block + until successful, and a value of 0 specifies no wait. + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_data_buff(int32_t timeout_ms) + { + return _recv_if->get_recv_buff(timeout_ms); + } + + /*! + * Release a frame buffer, allowing the driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_data_buff(frame_buff::uptr buff) + { + _recv_if->release_recv_buff(std::move(buff)); + } + + /*! + * Callback for producing a flow control response. + * This callback is run whenever a frame_buff is scheduled to be released. + * + * The callback must release the buffer, but it can update internal state + * as well. It can also send a response with the send_link_if, should it + * desire to do so. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + */ + void handle_flow_ctrl( + frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* send_link) + { + uint32_t* data = (uint32_t*)buff->data(); + if (data[TYPE_OFFSET] == 0) { + frame_buff::uptr fc_buff = send_link->get_send_buff(0); + UHD_ASSERT_THROW(fc_buff); + uint32_t* fc_data = (uint32_t*)fc_buff->data(); + fc_data[SEQNO_OFFSET] = data[SEQNO_OFFSET]; + recv_link->release_recv_buff(std::move(buff)); + UHD_ASSERT_THROW(buff == nullptr); + fc_data[TYPE_OFFSET] = 1; /* FC type */ + fc_data[ADDR_OFFSET] = _send_addr; + send_link->release_send_buff(std::move(fc_buff)); + } else { + recv_link->release_recv_buff(std::move(buff)); + } + } + + /*! + * Callback for when packets are received (for processing). + * Function should make a determination of whether the packet belongs to it + * and return the bool. + * + * Function may consume and release the buffer internally (if packet was + * destined for it). The recv_link_if may be used to release it, and the + * provided frame_buff::uptr must relinquish ownership before returning. + * If the buffer was not destined for the user of this function, buffer must + * NOT be released, and the uptr must remain intact. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + * + * \param frame_buff the buffer that was received + * \param recv_link_if the link used to retrieve the buffer. Can be used to + * release the buffer back to the link, if buffer is consumed internally. + * \return true if buffer matched this transport, false otherwise + */ + bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link) + { + /* Check address and if no match, return false */ + uint32_t* data = (uint32_t*)buff->data(); + if (data[ADDR_OFFSET] != _recv_addr) { + return false; + } + if (data[TYPE_OFFSET] == 1) { /* No FC for mock_recv_transport */ + return false; + } + if (data[TYPE_OFFSET] == 2) { /* Record message */ + _msg_queue.push(data[DATA_OFFSET]); + recv_link->release_recv_buff(std::move(buff)); + } + /* (Data packets will go up to user) */ + return true; + } + + std::pair buff_to_data(frame_buff* buff) + { + uint32_t* data = (uint32_t*)buff->data(); + size_t data_len = data[LEN_OFFSET]; + return std::pair(&data[DATA_OFFSET], data_len); + } + +private: + uint32_t _send_addr; + uint32_t _recv_addr; + uint32_t _credits; + recv_io_if::sptr _recv_if; + boost::lockfree::spsc_queue> _msg_queue; + uint32_t _seqno = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP */ diff --git a/host/tests/transport_test.cpp b/host/tests/transport_test.cpp new file mode 100644 index 000000000..3e86da2d8 --- /dev/null +++ b/host/tests/transport_test.cpp @@ -0,0 +1,188 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "common/mock_link.hpp" +#include "common/mock_transport.hpp" +#include +#include + +using namespace uhd::transport; + +static mock_send_link::sptr make_send_link(size_t num_frames) +{ + const mock_send_link::link_params params = {1000, num_frames}; + return std::make_shared(params); +} + +static mock_recv_link::sptr make_recv_link(size_t num_frames) +{ + const mock_recv_link::link_params params = {1000, num_frames}; + return std::make_shared(params); +} + +static mock_send_transport::sptr make_send_xport(io_service::sptr io_srv, + send_link_if::sptr send_link, + recv_link_if::sptr recv_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) +{ + return std::make_shared( + io_srv, send_link, recv_link, dst_addr, src_addr, credits); +} + +static mock_recv_transport::sptr make_recv_xport(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) +{ + return std::make_shared( + io_srv, recv_link, send_link, dst_addr, src_addr, credits); +} + +BOOST_AUTO_TEST_CASE(test_construction) +{ + auto io_srv = inline_io_service::make(); + auto send_link = make_send_link(40); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(40); + io_srv->attach_recv_link(recv_link); + auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + + auto send_buff = send_xport->get_data_buff(0); + send_buff->set_packet_size(0); + send_xport->release_data_buff(send_buff, 0); + send_xport.reset(); + + auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false); +} + +BOOST_AUTO_TEST_CASE(test_io) +{ + auto io_srv = inline_io_service::make(); + auto send_link0 = make_send_link(40); + io_srv->attach_send_link(send_link0); + auto recv_link0 = make_recv_link(40); + io_srv->attach_recv_link(recv_link0); + auto send_xport = make_send_xport(io_srv, send_link0, recv_link0, 1, 2, 32); + + auto send_link1 = make_send_link(40); + io_srv->attach_send_link(send_link1); + auto recv_link1 = make_recv_link(40); + io_srv->attach_recv_link(recv_link1); + auto recv_xport = make_recv_xport(io_srv, recv_link1, send_link1, 1, 2, 32); + + /* FIXME: Testing async messages requires the dummy read -- To not have it, needs recv + * mux + separate recv queue */ + send_xport->put_msg(0xa5d3b33f, 0); + auto packet = send_link0->pop_send_packet(); + recv_link1->push_back_recv_packet(packet.first, packet.second); + auto recv_buff = recv_xport->get_data_buff(0); + if (recv_buff) { + recv_xport->release_data_buff(std::move(recv_buff)); + } + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg)); + UHD_ASSERT_THROW(msg == 0xa5d3b33f); + + auto send_buff = send_xport->get_data_buff(0); + UHD_ASSERT_THROW(send_buff); + auto buff_data = send_xport->buff_to_data(send_buff.get()); + UHD_ASSERT_THROW(buff_data.second >= 16); + uint32_t* data = buff_data.first; + for (size_t i = 0; i < 16; i++) { + data[i] = (uint32_t)i; + } + + send_xport->release_data_buff(send_buff, 16); + packet = send_link0->pop_send_packet(); + recv_link1->push_back_recv_packet(packet.first, packet.second); + + recv_buff = recv_xport->get_data_buff(0); + UHD_ASSERT_THROW(recv_buff); + auto recv_data = recv_xport->buff_to_data(recv_buff.get()); + UHD_ASSERT_THROW(recv_data.second == 16); + data = recv_data.first; + for (size_t i = 0; i < 16; i++) { + UHD_ASSERT_THROW(data[i] == (uint32_t)i); + } + recv_xport->release_data_buff(std::move(recv_buff)); +} + +BOOST_AUTO_TEST_CASE(test_muxed_io) +{ + auto io_srv = inline_io_service::make(); + auto send_link = make_send_link(80); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(80); + io_srv->attach_recv_link(recv_link); + auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + + /* Send a sideband message */ + send_xport->put_msg(0xa5d3b33f, 0); + + /* Send some normal data */ + auto send_buff = send_xport->get_data_buff(0); + UHD_ASSERT_THROW(send_buff); + auto buff_data = send_xport->buff_to_data(send_buff.get()); + UHD_ASSERT_THROW(buff_data.second >= 16); + uint32_t* data = buff_data.first; + for (size_t i = 0; i < 16; i++) { + data[i] = (uint32_t)i; + } + send_xport->release_data_buff(send_buff, 16); + + /* Move the two packets over */ + auto packet = send_link->pop_send_packet(); + recv_link->push_back_recv_packet(packet.first, packet.second); + packet = send_link->pop_send_packet(); + recv_link->push_back_recv_packet(packet.first, packet.second); + + /* Try to receive the data + * (message won't arrive unless we try to get the data first) + * However, the message should be processed and enqueued here + */ + auto recv_buff = recv_xport->get_data_buff(0); + UHD_ASSERT_THROW(recv_buff); + auto recv_data = recv_xport->buff_to_data(recv_buff.get()); + UHD_ASSERT_THROW(recv_data.second == 16); + data = recv_data.first; + for (size_t i = 0; i < 16; i++) { + UHD_ASSERT_THROW(data[i] == (uint32_t)i); + } + recv_xport->release_data_buff(std::move(recv_buff)); + + /* Now can get the message */ + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg)); + UHD_ASSERT_THROW(msg == 0xa5d3b33f); +} + +/* +BOOST_AUTO_TEST_CASE(test_oversubscribed) +{ + auto io_srv = inline_io_service::make(); + auto send_link = make_send_link(32); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(32); + io_srv->attach_recv_link(recv_link); + auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + + auto send_buff = send_xport->get_data_buff(0); + send_buff->set_packet_size(0); + send_xport->release_data_buff(send_buff, 0); + send_xport.reset(); + + auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false); +} +*/ -- cgit v1.2.3