diff options
-rw-r--r-- | host/lib/include/uhdlib/transport/inline_io_service.hpp | 121 | ||||
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/transport/inline_io_service.cpp | 415 | ||||
-rw-r--r-- | host/tests/CMakeLists.txt | 6 | ||||
-rw-r--r-- | host/tests/common/mock_transport.hpp | 369 | ||||
-rw-r--r-- | host/tests/transport_test.cpp | 188 |
6 files changed, 1100 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp new file mode 100644 index 000000000..f10e7018d --- /dev/null +++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp @@ -0,0 +1,121 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP +#define INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP + +#include <uhdlib/transport/io_service.hpp> +#include <unordered_map> +#include <list> + +namespace uhd { namespace transport { + +class inline_recv_mux; +class inline_recv_cb; + +/*! + * Single-threaded I/O service + * Note this is not an appropriate io_service to use with polling-mode drivers, + * since such drivers require a thread to poll them and not block (i.e. + * timeouts are not allowed at the link interface) + */ +class inline_io_service : public virtual io_service, + public std::enable_shared_from_this<inline_io_service> +{ +public: + using sptr = std::shared_ptr<inline_io_service>; + static sptr make(void) + { + return sptr(new inline_io_service()); + } + + ~inline_io_service(); + + void attach_recv_link(recv_link_if::sptr link); + void attach_send_link(send_link_if::sptr link); + + recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb); + + send_io_if::sptr make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb); + +private: + friend class inline_recv_io; + friend class inline_send_io; + + inline_io_service() = default; + inline_io_service(const inline_io_service&) = delete; + + /*! + * Senders are free to mux a send_link, but the total reserved send_frames + * must be less than or equal to the link's capacity + * + * \param link the link used for sending data + * \param num_frames number of frames to reserve for this connection + */ + void connect_sender(send_link_if* link, size_t num_frames); + + /*! + * Disconnect the sender and free resources + * + * \param link the link that was used for sending data + * \param num_frames number of frames to release (same as reservation) + */ + void disconnect_sender(send_link_if* link, size_t num_frames); + + /*! + * Connect a receiver to the link and reserve resources + * \param link the recv link to use for getting data + * \param cb a callback for processing received data + * \param num_frames the number of frames to reserve for this receiver + */ + void connect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + + /*! + * Disconnect the receiver from the provided link and free resources + * \param link the recv link that was used for reception + * \param cb the callback to disassociate + * \param num_frames the number of frames that was reserved for the cb + */ + void disconnect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames); + + /* + * Function to perform recv operations on a link, which is potentially + * muxed. Packets are forwarded to the appropriate mux or callback. + * + * \param recv_io_cb the callback+interface initiating the operation + * \param recv_link link to perform receive on + * \param timeout_ms timeout to wait for a buffer on the link + * \return a frame_buff uptr with either a buffer with data or no buffer + */ + frame_buff::uptr recv( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms); + + /* Track whether link is muxed, the callback, and buffer reservations */ + std::unordered_map<recv_link_if*, + std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>> + _recv_tbl; + + /* Track how many send_frames have been reserved for each link */ + std::unordered_map<send_link_if*, size_t> _send_tbl; + + /* Shared ptr kept to avoid untimely release */ + std::list<send_link_if::sptr> _send_links; + std::list<recv_link_if::sptr> _recv_links; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index df94f42be..a9663c89a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -121,6 +121,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp ) if(ENABLE_X300) diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp new file mode 100644 index 000000000..72acea738 --- /dev/null +++ b/host/lib/transport/inline_io_service.cpp @@ -0,0 +1,415 @@ +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/inline_io_service.hpp> +#include <boost/circular_buffer.hpp> +#include <cassert> + +namespace uhd { namespace transport { + +/*! + * Interface class for unifying callback processing between both inline_send_io + * and inline_recv_io + */ +class inline_recv_cb +{ +public: + /*! + * Function to call the callback method + * + * \param buff buffer received + * \param recv_link pointer to recv link used with the callback + * \return whether the packet was destined for this callback + */ + UHD_FORCE_INLINE bool callback(frame_buff::uptr& buff, recv_link_if* recv_link) + { + return _recv_cb(buff, recv_link, _cb_send_link); + } + +protected: + inline_recv_cb(recv_callback_t cb, send_link_if* send_link) + : _recv_cb(cb), _cb_send_link(send_link) + { + } + + recv_callback_t _recv_cb; + // pointer to send link used with the callback + send_link_if* _cb_send_link; +}; + +/*! + * Mux class that intercepts packets from the link and distributes them to + * queues for each client that is not the caller of the recv() function + */ +class inline_recv_mux +{ +public: + inline_recv_mux(recv_link_if* link) : _link(link){}; + + ~inline_recv_mux(){}; + + /*! + * Connect a new receiver to the recv link + * + * \param cb pointer to the callback for the receiver + */ + void connect(inline_recv_cb* cb) + { + UHD_ASSERT_THROW(_queues.count(cb) == 0); + /* Always create queue of max size, since we don't know when there are + * virtual channels (which share frames) + */ + auto queue = + new boost::circular_buffer<frame_buff*>(_link->get_num_recv_frames()); + _queues[cb] = queue; + _callbacks.push_back(cb); + } + + /*! + * Disconnect a receiver currently connected to the recv link + * \param cb a pointer to the callback to disconnect + */ + void disconnect(inline_recv_cb* cb) + { + auto queue = _queues.at(cb); + while (!queue->empty()) { + frame_buff* buff = queue->front(); + _link->release_recv_buff(frame_buff::uptr(buff)); + queue->pop_front(); + } + delete queue; + _queues.erase(cb); + _callbacks.remove(cb); + } + + /*! + * Check if there are callbacks registered to this mux + * \return whether there are no callbacks registered + */ + UHD_FORCE_INLINE bool is_empty(void) const + { + return _callbacks.empty(); + } + + /*! + * Do receive processing for the mux + * \param cb the callback that is currently seeking a buffer + * \param recv_link the link to do recv on + * \param timeout_ms the timeout for the recv operation + * \return a frame_buff with data if a packet was received (else empty) + */ + frame_buff::uptr recv(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms) + { + auto queue = _queues.at(cb); + if (!queue->empty()) { + frame_buff* buff = queue->front(); + queue->pop_front(); + return frame_buff::uptr(buff); + } + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + bool rcvr_found = false; + for (auto& rcvr : _callbacks) { + if (rcvr->callback(buff, recv_link)) { + rcvr_found = true; + if (buff) { + if (rcvr == cb) { + return frame_buff::uptr(std::move(buff)); + } else { + /* NOTE: Should not overflow, by construction + * Every queue can hold link->get_num_recv_frames() + */ + _queues[rcvr]->push_back(buff.release()); + } + } + /* Continue looping if buffer was consumed */ + break; + } + } + if (not rcvr_found) { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return frame_buff::uptr(); + } + } + } + +private: + recv_link_if* _link; + std::list<inline_recv_cb*> _callbacks; + std::unordered_map<inline_recv_cb*, boost::circular_buffer<frame_buff*>*> _queues; +}; + +class inline_recv_io : public virtual recv_io_if, public virtual inline_recv_cb +{ +public: + using sptr = std::shared_ptr<inline_recv_io>; + + inline_recv_io(inline_io_service::sptr io_srv, + recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + fc_callback_t fc_cb) + : inline_recv_cb(recv_cb, fc_link.get()) + , _io_srv(io_srv) + , _data_link(data_link) + , _num_recv_frames(num_recv_frames) + , _fc_link(fc_link) + , _num_send_frames(num_send_frames) + , _fc_cb(fc_cb) + { + } + + ~inline_recv_io() + { + _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames); + if (_fc_link) { + _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames); + } + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + return _io_srv->recv(this, _data_link.get(), timeout_ms); + } + + void release_recv_buff(frame_buff::uptr buff) + { + _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get()); + } + +private: + inline_io_service::sptr _io_srv; + recv_link_if::sptr _data_link; + size_t _num_recv_frames; + send_link_if::sptr _fc_link; + size_t _num_send_frames; + fc_callback_t _fc_cb; +}; + +class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb +{ +public: + using sptr = std::shared_ptr<inline_send_io>; + + inline_send_io(inline_io_service::sptr io_srv, + send_link_if::sptr send_link, + size_t num_send_frames, + send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t fc_cb) + : inline_recv_cb(fc_cb, send_link.get()) + , _io_srv(io_srv) + , _send_link(send_link) + , _num_send_frames(num_send_frames) + , _send_cb(send_cb) + , _recv_link(recv_link) + , _num_recv_frames(num_recv_frames) + { + } + + ~inline_send_io() + { + _io_srv->disconnect_sender(_send_link.get(), _num_send_frames); + if (_recv_link) { + _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames); + } + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + /* Check initial flow control result */ + frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms); + if (buff) { + return frame_buff::uptr(std::move(buff)); + } + return frame_buff::uptr(); + } + + void release_send_buff(frame_buff::uptr buff) + { + while (buff) { /* TODO: Possibly don't loop indefinitely here */ + if (_recv_link) { + _io_srv->recv(this, _recv_link.get(), 0); + } + _send_cb(buff, _send_link.get()); + } + } + +private: + inline_io_service::sptr _io_srv; + send_link_if::sptr _send_link; + size_t _num_send_frames; + send_callback_t _send_cb; + recv_link_if::sptr _recv_link; + size_t _num_recv_frames; + recv_callback_t _recv_cb; +}; + +inline_io_service::~inline_io_service(){}; + +void inline_io_service::attach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0); + _recv_tbl[link_ptr] = + std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>(nullptr, nullptr, 0); + _recv_links.push_back(link); +}; + +recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + UHD_ASSERT_THROW(data_link); + UHD_ASSERT_THROW(num_recv_frames > 0); + UHD_ASSERT_THROW(cb); + if (fc_link) { + UHD_ASSERT_THROW(num_send_frames > 0); + UHD_ASSERT_THROW(fc_cb); + connect_sender(fc_link.get(), num_send_frames); + } + sptr io_srv = shared_from_this(); + auto recv_io = std::make_shared<inline_recv_io>( + io_srv, data_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb); + connect_receiver(data_link.get(), recv_io.get(), num_recv_frames); + return recv_io; +} + +void inline_io_service::attach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0); + _send_tbl[link_ptr] = 0; + _send_links.push_back(link); +}; + +send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb) +{ + UHD_ASSERT_THROW(send_link); + UHD_ASSERT_THROW(num_send_frames > 0); + UHD_ASSERT_THROW(send_cb); + connect_sender(send_link.get(), num_send_frames); + sptr io_srv = shared_from_this(); + auto send_io = std::make_shared<inline_send_io>( + io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb); + if (recv_link) { + UHD_ASSERT_THROW(num_recv_frames > 0); + UHD_ASSERT_THROW(recv_cb); + connect_receiver(recv_link.get(), send_io.get(), num_recv_frames); + } + return send_io; +} + +/* + * Senders are free to mux a send_link, but the total reserved send_frames + * must be less than or equal to the link's capacity + */ +void inline_io_service::connect_sender(send_link_if* link, size_t num_frames) +{ + size_t rsvd_frames = _send_tbl.at(link); + size_t frame_capacity = link->get_num_send_frames(); + UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames); + _send_tbl[link] = rsvd_frames + num_frames; +} + +void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames) +{ + size_t rsvd_frames = _send_tbl.at(link); + UHD_ASSERT_THROW(rsvd_frames >= num_frames); + _send_tbl[link] = rsvd_frames - num_frames; +} + +void inline_io_service::connect_receiver( + recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t rsvd_frames; + std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + if (mux) { + mux->connect(cb); + } else if (rcvr) { + mux = new inline_recv_mux(link); + mux->connect(rcvr); + mux->connect(cb); + rcvr = nullptr; + } else { + rcvr = cb; + } + size_t capacity = link->get_num_recv_frames(); + UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity); + _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames); +} + +void inline_io_service::disconnect_receiver( + recv_link_if* link, inline_recv_cb* cb, size_t num_frames) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t rsvd_frames; + std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link); + UHD_ASSERT_THROW(rsvd_frames >= num_frames); + if (mux) { + mux->disconnect(cb); + if (mux->is_empty()) { + delete mux; + mux = nullptr; + } + } else { + rcvr = nullptr; + } + _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames); +} + +frame_buff::uptr inline_io_service::recv( + inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms) +{ + inline_recv_mux* mux; + inline_recv_cb* rcvr; + size_t num_frames; + std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link); + + if (mux) { + /* Defer to mux's recv() if present */ + return mux->recv(recv_io_cb, recv_link, timeout_ms); + } else { + assert(recv_io_cb == rcvr); + } + + while (true) { + frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms); + /* Process buffer */ + if (buff) { + if (rcvr->callback(buff, recv_link)) { + if (buff) { + return frame_buff::uptr(std::move(buff)); + } + /* Retry receive if got buffer but it got consumed */ + } else { + UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver"); + recv_link->release_recv_buff(std::move(buff)); + } + } else { /* Timeout */ + return frame_buff::uptr(); + } + } + return frame_buff::uptr(); +} + +}} // namespace uhd::transport diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 431c380c3..0c48ae058 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -252,6 +252,12 @@ UHD_ADD_NONAPI_TEST( ${CMAKE_SOURCE_DIR}/lib/usrp/cores/dsp_core_utils.cpp ) +UHD_ADD_NONAPI_TEST( + TARGET "transport_test.cpp" + EXTRA_SOURCES + ${CMAKE_SOURCE_DIR}/lib/transport/inline_io_service.cpp +) + ######################################################################## # demo of a loadable module ######################################################################## diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp new file mode 100644 index 000000000..321f22830 --- /dev/null +++ b/host/tests/common/mock_transport.hpp @@ -0,0 +1,369 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP +#define INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP + +#include <uhdlib/transport/io_service.hpp> +#include <boost/lockfree/spsc_queue.hpp> +#include <utility> + +namespace uhd { namespace transport { + +namespace { +constexpr size_t ADDR_OFFSET = 0; +constexpr size_t TYPE_OFFSET = 1; /* 0 for data, 1 for FC, 2 for msg */ +constexpr size_t SEQNO_OFFSET = 2; /* For FC, this is last seen seqno */ +constexpr size_t LEN_OFFSET = 3; +constexpr size_t DATA_OFFSET = 4; +constexpr size_t MSG_BUFFS = 8; +}; // namespace +/*! + * Mock transport with following packet format: + * Data: [dst_addr/src_addr, type, seqno, data_len, data...] + * FC: [dst_addr/src_addr, type, ackno] + * Msg: [dst_addr/src_addr, type, null, data] + * All fields are 32-bit words (dst_addr and src_addr are 16 bits each) + */ +class mock_send_transport +{ +public: + using sptr = std::shared_ptr<mock_send_transport>; + + mock_send_transport(io_service::sptr io_srv, + send_link_if::sptr send_link, + recv_link_if::sptr recv_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) + : _credits(credits) + { + _send_addr = (dst_addr << 16) | (src_addr << 0); + _recv_addr = (src_addr << 16) | (dst_addr << 0); + + /* Make message client for sending side-band messages */ + send_io_if::send_callback_t msg_send_cb = [this](frame_buff::uptr& buff, + send_link_if* link) { + uint32_t* data = (uint32_t*)buff->data(); + data[ADDR_OFFSET] = this->_send_addr; + data[TYPE_OFFSET] = 2; /* MSG type */ + link->release_send_buff(std::move(buff)); + }; + _msg_if = io_srv->make_send_client( + send_link, MSG_BUFFS, msg_send_cb, recv_link_if::sptr(), 0, nullptr); + + /* Make client for sending streaming data */ + send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, + send_link_if* link) { + this->send_buff(buff, link); + }; + recv_callback_t recv_cb = [this](frame_buff::uptr& buff, + recv_link_if* link, + send_link_if* /*send_link*/) { + return this->recv_buff(buff, link); + }; + /* Pretend get 1 flow control message per sent packet */ + _send_if = io_srv->make_send_client( + send_link, credits, send_cb, recv_link, credits, recv_cb); + } + + ~mock_send_transport() {} + + /*! + * Get a buffer for creating a non-flow-controlled message + */ + bool put_msg(uint32_t msg, int32_t timeout_ms) + { + frame_buff::uptr buff = _msg_if->get_send_buff(timeout_ms); + if (!buff) { + return false; + } + uint32_t* data = (uint32_t*)buff->data(); + data[TYPE_OFFSET] = 2; + data[DATA_OFFSET] = msg; + buff->set_packet_size((1 + DATA_OFFSET) * sizeof(uint32_t)); + _msg_if->release_send_buff(std::move(buff)); + return true; + } + + /*! + * Get an empty frame buffer in which to write packet contents. + * + * \param timeout_ms a positive timeout value specifies the maximum number + of ms to wait, a negative value specifies to block + until successful, and a value of 0 specifies no wait. + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_data_buff(int32_t timeout_ms) + { + frame_buff::uptr buff = _send_if->get_send_buff(timeout_ms); + if (!buff) { + return frame_buff::uptr(); + } + uint32_t* data = (uint32_t*)buff->data(); + data[TYPE_OFFSET] = 0; + return frame_buff::uptr(std::move(buff)); + } + + /*! + * Release a frame buffer, allowing the driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_data_buff(frame_buff::uptr& buff, size_t len) + { + if (len == 0) { + _send_if->release_send_buff(std::move(buff)); + return; + } + uint32_t* data = (uint32_t*)buff->data(); + data[LEN_OFFSET] = len; + buff->set_packet_size((len + DATA_OFFSET) * sizeof(uint32_t)); + _send_if->release_send_buff(std::move(buff)); + } + + /*! + * Callback for sending the packet. Callback is responsible for calling + * release_send_buff() if it wants to send the packet. This will require + * moving the uptr's reference. If the packet will NOT be sent, the + * callback must NOT release the uptr. + * + * Function should update any internal state needed. For example, flow + * control state could be updated here, and the header could be filled out + * as well, like the packet's sequence number and/or addresses. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + */ + void send_buff(frame_buff::uptr& buff, send_link_if* send_link) + { + if (_seqno >= _ackno + _credits) { + return; + } + uint32_t* data = (uint32_t*)buff->data(); + data[ADDR_OFFSET] = _send_addr; + data[SEQNO_OFFSET] = _seqno; + send_link->release_send_buff(std::move(buff)); + _seqno++; + } + + /*! + * Callback for when packets are received (for processing). + * Function should make a determination of whether the packet belongs to it + * and return the bool. + * + * Function may consume and release the buffer internally (if packet was + * destined for it). The recv_link_if may be used to release it, and the + * provided frame_buff::uptr must relinquish ownership before returning. + * If the buffer was not destined for the user of this function, buffer must + * NOT be released, and the uptr must remain intact. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + * + * \param frame_buff the buffer that was received + * \param recv_link_if the link used to retrieve the buffer. Can be used to + * release the buffer back to the link, if buffer is consumed internally. + * \return true if buffer matched this transport, false otherwise + */ + bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link) + { + /* Check address and if no match, return false */ + uint32_t* data = (uint32_t*)buff->data(); + if (data[ADDR_OFFSET] != _recv_addr) { + return false; + } + if (data[TYPE_OFFSET] == 1) { /* Flow control message */ + _ackno = data[SEQNO_OFFSET]; + } + if (data[TYPE_OFFSET] != 0) { /* Only data packets go up to user */ + recv_link->release_recv_buff(std::move(buff)); + } else { /* mock_send_transport does not receive data packets */ + return false; + } + return true; + } + + std::pair<uint32_t*, size_t> buff_to_data(frame_buff* buff) + { + uint32_t* data = (uint32_t*)buff->data(); + size_t data_len = buff->packet_size() - DATA_OFFSET * sizeof(uint32_t); + return std::pair<uint32_t*, size_t>(&data[DATA_OFFSET], data_len); + } + +private: + uint32_t _send_addr; + uint32_t _recv_addr; + uint32_t _credits; + send_io_if::sptr _msg_if; + send_io_if::sptr _send_if; + uint32_t _seqno = 0; + uint32_t _ackno = 0; +}; + +/*! + * Mock transport with following packet format: + * Data: [dst_addr/src_addr, type, seqno, data_len, data...] + * FC: [dst_addr/src_addr, type, ackno] + * Msg: [dst_addr/src_addr, type, seqno, data_len, data...] + * All fields are 32-bit words (dst_addr and src_addr are 16 bits each) + */ +class mock_recv_transport +{ +public: + using sptr = std::shared_ptr<mock_recv_transport>; + + mock_recv_transport(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) + : _credits(credits) + { + _send_addr = (src_addr << 16) | (dst_addr << 0); + _recv_addr = (dst_addr << 16) | (src_addr << 0); + + /* Make client for sending streaming data */ + recv_io_if::fc_callback_t send_cb = [this](frame_buff::uptr buff, + recv_link_if* recv_link, + send_link_if* send_link) { + this->handle_flow_ctrl(std::move(buff), recv_link, send_link); + }; + recv_callback_t recv_cb = [this](frame_buff::uptr& buff, + recv_link_if* link, + send_link_if* /*send_link*/) { + return this->recv_buff(buff, link); + }; + /* Pretend get 1 flow control message per sent packet */ + _recv_if = io_srv->make_recv_client( + recv_link, credits, recv_cb, send_link, credits, send_cb); + } + + ~mock_recv_transport() {} + + /*! + * Get a buffer for creating a non-flow-controlled message + */ + bool get_msg(uint32_t& msg) + { + if (_msg_queue.read_available()) { + msg = _msg_queue.front(); + _msg_queue.pop(); + return true; + } + return false; + } + + /*! + * Get an empty frame buffer in which to write packet contents. + * + * \param timeout_ms a positive timeout value specifies the maximum number + of ms to wait, a negative value specifies to block + until successful, and a value of 0 specifies no wait. + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_data_buff(int32_t timeout_ms) + { + return _recv_if->get_recv_buff(timeout_ms); + } + + /*! + * Release a frame buffer, allowing the driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_data_buff(frame_buff::uptr buff) + { + _recv_if->release_recv_buff(std::move(buff)); + } + + /*! + * Callback for producing a flow control response. + * This callback is run whenever a frame_buff is scheduled to be released. + * + * The callback must release the buffer, but it can update internal state + * as well. It can also send a response with the send_link_if, should it + * desire to do so. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + */ + void handle_flow_ctrl( + frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* send_link) + { + uint32_t* data = (uint32_t*)buff->data(); + if (data[TYPE_OFFSET] == 0) { + frame_buff::uptr fc_buff = send_link->get_send_buff(0); + UHD_ASSERT_THROW(fc_buff); + uint32_t* fc_data = (uint32_t*)fc_buff->data(); + fc_data[SEQNO_OFFSET] = data[SEQNO_OFFSET]; + recv_link->release_recv_buff(std::move(buff)); + UHD_ASSERT_THROW(buff == nullptr); + fc_data[TYPE_OFFSET] = 1; /* FC type */ + fc_data[ADDR_OFFSET] = _send_addr; + send_link->release_send_buff(std::move(fc_buff)); + } else { + recv_link->release_recv_buff(std::move(buff)); + } + } + + /*! + * Callback for when packets are received (for processing). + * Function should make a determination of whether the packet belongs to it + * and return the bool. + * + * Function may consume and release the buffer internally (if packet was + * destined for it). The recv_link_if may be used to release it, and the + * provided frame_buff::uptr must relinquish ownership before returning. + * If the buffer was not destined for the user of this function, buffer must + * NOT be released, and the uptr must remain intact. + * + * Callbacks execute on the I/O thread! Be careful about what state is + * touched. In addition, this callback should NOT sleep. + * + * \param frame_buff the buffer that was received + * \param recv_link_if the link used to retrieve the buffer. Can be used to + * release the buffer back to the link, if buffer is consumed internally. + * \return true if buffer matched this transport, false otherwise + */ + bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link) + { + /* Check address and if no match, return false */ + uint32_t* data = (uint32_t*)buff->data(); + if (data[ADDR_OFFSET] != _recv_addr) { + return false; + } + if (data[TYPE_OFFSET] == 1) { /* No FC for mock_recv_transport */ + return false; + } + if (data[TYPE_OFFSET] == 2) { /* Record message */ + _msg_queue.push(data[DATA_OFFSET]); + recv_link->release_recv_buff(std::move(buff)); + } + /* (Data packets will go up to user) */ + return true; + } + + std::pair<uint32_t*, size_t> buff_to_data(frame_buff* buff) + { + uint32_t* data = (uint32_t*)buff->data(); + size_t data_len = data[LEN_OFFSET]; + return std::pair<uint32_t*, size_t>(&data[DATA_OFFSET], data_len); + } + +private: + uint32_t _send_addr; + uint32_t _recv_addr; + uint32_t _credits; + recv_io_if::sptr _recv_if; + boost::lockfree::spsc_queue<uint32_t, boost::lockfree::capacity<8>> _msg_queue; + uint32_t _seqno = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP */ diff --git a/host/tests/transport_test.cpp b/host/tests/transport_test.cpp new file mode 100644 index 000000000..3e86da2d8 --- /dev/null +++ b/host/tests/transport_test.cpp @@ -0,0 +1,188 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "common/mock_link.hpp" +#include "common/mock_transport.hpp" +#include <uhdlib/transport/inline_io_service.hpp> +#include <boost/test/unit_test.hpp> + +using namespace uhd::transport; + +static mock_send_link::sptr make_send_link(size_t num_frames) +{ + const mock_send_link::link_params params = {1000, num_frames}; + return std::make_shared<mock_send_link>(params); +} + +static mock_recv_link::sptr make_recv_link(size_t num_frames) +{ + const mock_recv_link::link_params params = {1000, num_frames}; + return std::make_shared<mock_recv_link>(params); +} + +static mock_send_transport::sptr make_send_xport(io_service::sptr io_srv, + send_link_if::sptr send_link, + recv_link_if::sptr recv_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) +{ + return std::make_shared<mock_send_transport>( + io_srv, send_link, recv_link, dst_addr, src_addr, credits); +} + +static mock_recv_transport::sptr make_recv_xport(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + uint16_t dst_addr, + uint16_t src_addr, + uint32_t credits) +{ + return std::make_shared<mock_recv_transport>( + io_srv, recv_link, send_link, dst_addr, src_addr, credits); +} + +BOOST_AUTO_TEST_CASE(test_construction) +{ + auto io_srv = inline_io_service::make(); + auto send_link = make_send_link(40); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(40); + io_srv->attach_recv_link(recv_link); + auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + + auto send_buff = send_xport->get_data_buff(0); + send_buff->set_packet_size(0); + send_xport->release_data_buff(send_buff, 0); + send_xport.reset(); + + auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false); +} + +BOOST_AUTO_TEST_CASE(test_io) +{ + auto io_srv = inline_io_service::make(); + auto send_link0 = make_send_link(40); + io_srv->attach_send_link(send_link0); + auto recv_link0 = make_recv_link(40); + io_srv->attach_recv_link(recv_link0); + auto send_xport = make_send_xport(io_srv, send_link0, recv_link0, 1, 2, 32); + + auto send_link1 = make_send_link(40); + io_srv->attach_send_link(send_link1); + auto recv_link1 = make_recv_link(40); + io_srv->attach_recv_link(recv_link1); + auto recv_xport = make_recv_xport(io_srv, recv_link1, send_link1, 1, 2, 32); + + /* FIXME: Testing async messages requires the dummy read -- To not have it, needs recv + * mux + separate recv queue */ + send_xport->put_msg(0xa5d3b33f, 0); + auto packet = send_link0->pop_send_packet(); + recv_link1->push_back_recv_packet(packet.first, packet.second); + auto recv_buff = recv_xport->get_data_buff(0); + if (recv_buff) { + recv_xport->release_data_buff(std::move(recv_buff)); + } + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg)); + UHD_ASSERT_THROW(msg == 0xa5d3b33f); + + auto send_buff = send_xport->get_data_buff(0); + UHD_ASSERT_THROW(send_buff); + auto buff_data = send_xport->buff_to_data(send_buff.get()); + UHD_ASSERT_THROW(buff_data.second >= 16); + uint32_t* data = buff_data.first; + for (size_t i = 0; i < 16; i++) { + data[i] = (uint32_t)i; + } + + send_xport->release_data_buff(send_buff, 16); + packet = send_link0->pop_send_packet(); + recv_link1->push_back_recv_packet(packet.first, packet.second); + + recv_buff = recv_xport->get_data_buff(0); + UHD_ASSERT_THROW(recv_buff); + auto recv_data = recv_xport->buff_to_data(recv_buff.get()); + UHD_ASSERT_THROW(recv_data.second == 16); + data = recv_data.first; + for (size_t i = 0; i < 16; i++) { + UHD_ASSERT_THROW(data[i] == (uint32_t)i); + } + recv_xport->release_data_buff(std::move(recv_buff)); +} + +BOOST_AUTO_TEST_CASE(test_muxed_io) +{ + auto io_srv = inline_io_service::make(); + auto send_link = make_send_link(80); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(80); + io_srv->attach_recv_link(recv_link); + auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + + /* Send a sideband message */ + send_xport->put_msg(0xa5d3b33f, 0); + + /* Send some normal data */ + auto send_buff = send_xport->get_data_buff(0); + UHD_ASSERT_THROW(send_buff); + auto buff_data = send_xport->buff_to_data(send_buff.get()); + UHD_ASSERT_THROW(buff_data.second >= 16); + uint32_t* data = buff_data.first; + for (size_t i = 0; i < 16; i++) { + data[i] = (uint32_t)i; + } + send_xport->release_data_buff(send_buff, 16); + + /* Move the two packets over */ + auto packet = send_link->pop_send_packet(); + recv_link->push_back_recv_packet(packet.first, packet.second); + packet = send_link->pop_send_packet(); + recv_link->push_back_recv_packet(packet.first, packet.second); + + /* Try to receive the data + * (message won't arrive unless we try to get the data first) + * However, the message should be processed and enqueued here + */ + auto recv_buff = recv_xport->get_data_buff(0); + UHD_ASSERT_THROW(recv_buff); + auto recv_data = recv_xport->buff_to_data(recv_buff.get()); + UHD_ASSERT_THROW(recv_data.second == 16); + data = recv_data.first; + for (size_t i = 0; i < 16; i++) { + UHD_ASSERT_THROW(data[i] == (uint32_t)i); + } + recv_xport->release_data_buff(std::move(recv_buff)); + + /* Now can get the message */ + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg)); + UHD_ASSERT_THROW(msg == 0xa5d3b33f); +} + +/* +BOOST_AUTO_TEST_CASE(test_oversubscribed) +{ + auto io_srv = inline_io_service::make(); + auto send_link = make_send_link(32); + io_srv->attach_send_link(send_link); + auto recv_link = make_recv_link(32); + io_srv->attach_recv_link(recv_link); + auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32); + + auto send_buff = send_xport->get_data_buff(0); + send_buff->set_packet_size(0); + send_xport->release_data_buff(send_buff, 0); + send_xport.reset(); + + auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32); + uint32_t msg; + UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false); +} +*/ |