aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/include/uhdlib/transport/inline_io_service.hpp121
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/inline_io_service.cpp415
-rw-r--r--host/tests/CMakeLists.txt6
-rw-r--r--host/tests/common/mock_transport.hpp369
-rw-r--r--host/tests/transport_test.cpp188
6 files changed, 1100 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp
new file mode 100644
index 000000000..f10e7018d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp
@@ -0,0 +1,121 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP
+#define INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP
+
+#include <uhdlib/transport/io_service.hpp>
+#include <unordered_map>
+#include <list>
+
+namespace uhd { namespace transport {
+
+class inline_recv_mux;
+class inline_recv_cb;
+
+/*!
+ * Single-threaded I/O service
+ * Note this is not an appropriate io_service to use with polling-mode drivers,
+ * since such drivers require a thread to poll them and not block (i.e.
+ * timeouts are not allowed at the link interface)
+ */
+class inline_io_service : public virtual io_service,
+ public std::enable_shared_from_this<inline_io_service>
+{
+public:
+ using sptr = std::shared_ptr<inline_io_service>;
+ static sptr make(void)
+ {
+ return sptr(new inline_io_service());
+ }
+
+ ~inline_io_service();
+
+ void attach_recv_link(recv_link_if::sptr link);
+ void attach_send_link(send_link_if::sptr link);
+
+ recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link,
+ size_t num_recv_frames,
+ recv_callback_t cb,
+ send_link_if::sptr fc_link,
+ size_t num_send_frames,
+ recv_io_if::fc_callback_t fc_cb);
+
+ send_io_if::sptr make_send_client(send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_io_if::send_callback_t send_cb,
+ recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb);
+
+private:
+ friend class inline_recv_io;
+ friend class inline_send_io;
+
+ inline_io_service() = default;
+ inline_io_service(const inline_io_service&) = delete;
+
+ /*!
+ * Senders are free to mux a send_link, but the total reserved send_frames
+ * must be less than or equal to the link's capacity
+ *
+ * \param link the link used for sending data
+ * \param num_frames number of frames to reserve for this connection
+ */
+ void connect_sender(send_link_if* link, size_t num_frames);
+
+ /*!
+ * Disconnect the sender and free resources
+ *
+ * \param link the link that was used for sending data
+ * \param num_frames number of frames to release (same as reservation)
+ */
+ void disconnect_sender(send_link_if* link, size_t num_frames);
+
+ /*!
+ * Connect a receiver to the link and reserve resources
+ * \param link the recv link to use for getting data
+ * \param cb a callback for processing received data
+ * \param num_frames the number of frames to reserve for this receiver
+ */
+ void connect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames);
+
+ /*!
+ * Disconnect the receiver from the provided link and free resources
+ * \param link the recv link that was used for reception
+ * \param cb the callback to disassociate
+ * \param num_frames the number of frames that was reserved for the cb
+ */
+ void disconnect_receiver(recv_link_if* link, inline_recv_cb* cb, size_t num_frames);
+
+ /*
+ * Function to perform recv operations on a link, which is potentially
+ * muxed. Packets are forwarded to the appropriate mux or callback.
+ *
+ * \param recv_io_cb the callback+interface initiating the operation
+ * \param recv_link link to perform receive on
+ * \param timeout_ms timeout to wait for a buffer on the link
+ * \return a frame_buff uptr with either a buffer with data or no buffer
+ */
+ frame_buff::uptr recv(
+ inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms);
+
+ /* Track whether link is muxed, the callback, and buffer reservations */
+ std::unordered_map<recv_link_if*,
+ std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>>
+ _recv_tbl;
+
+ /* Track how many send_frames have been reserved for each link */
+ std::unordered_map<send_link_if*, size_t> _send_tbl;
+
+ /* Shared ptr kept to avoid untimely release */
+ std::list<send_link_if::sptr> _send_links;
+ std::list<recv_link_if::sptr> _recv_links;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHDLIB_TRANSPORT_INLINE_IO_SERVICE_HPP */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index df94f42be..a9663c89a 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -121,6 +121,7 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp
${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/inline_io_service.cpp
)
if(ENABLE_X300)
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
new file mode 100644
index 000000000..72acea738
--- /dev/null
+++ b/host/lib/transport/inline_io_service.cpp
@@ -0,0 +1,415 @@
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/inline_io_service.hpp>
+#include <boost/circular_buffer.hpp>
+#include <cassert>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Interface class for unifying callback processing between both inline_send_io
+ * and inline_recv_io
+ */
+class inline_recv_cb
+{
+public:
+ /*!
+ * Function to call the callback method
+ *
+ * \param buff buffer received
+ * \param recv_link pointer to recv link used with the callback
+ * \return whether the packet was destined for this callback
+ */
+ UHD_FORCE_INLINE bool callback(frame_buff::uptr& buff, recv_link_if* recv_link)
+ {
+ return _recv_cb(buff, recv_link, _cb_send_link);
+ }
+
+protected:
+ inline_recv_cb(recv_callback_t cb, send_link_if* send_link)
+ : _recv_cb(cb), _cb_send_link(send_link)
+ {
+ }
+
+ recv_callback_t _recv_cb;
+ // pointer to send link used with the callback
+ send_link_if* _cb_send_link;
+};
+
+/*!
+ * Mux class that intercepts packets from the link and distributes them to
+ * queues for each client that is not the caller of the recv() function
+ */
+class inline_recv_mux
+{
+public:
+ inline_recv_mux(recv_link_if* link) : _link(link){};
+
+ ~inline_recv_mux(){};
+
+ /*!
+ * Connect a new receiver to the recv link
+ *
+ * \param cb pointer to the callback for the receiver
+ */
+ void connect(inline_recv_cb* cb)
+ {
+ UHD_ASSERT_THROW(_queues.count(cb) == 0);
+ /* Always create queue of max size, since we don't know when there are
+ * virtual channels (which share frames)
+ */
+ auto queue =
+ new boost::circular_buffer<frame_buff*>(_link->get_num_recv_frames());
+ _queues[cb] = queue;
+ _callbacks.push_back(cb);
+ }
+
+ /*!
+ * Disconnect a receiver currently connected to the recv link
+ * \param cb a pointer to the callback to disconnect
+ */
+ void disconnect(inline_recv_cb* cb)
+ {
+ auto queue = _queues.at(cb);
+ while (!queue->empty()) {
+ frame_buff* buff = queue->front();
+ _link->release_recv_buff(frame_buff::uptr(buff));
+ queue->pop_front();
+ }
+ delete queue;
+ _queues.erase(cb);
+ _callbacks.remove(cb);
+ }
+
+ /*!
+ * Check if there are callbacks registered to this mux
+ * \return whether there are no callbacks registered
+ */
+ UHD_FORCE_INLINE bool is_empty(void) const
+ {
+ return _callbacks.empty();
+ }
+
+ /*!
+ * Do receive processing for the mux
+ * \param cb the callback that is currently seeking a buffer
+ * \param recv_link the link to do recv on
+ * \param timeout_ms the timeout for the recv operation
+ * \return a frame_buff with data if a packet was received (else empty)
+ */
+ frame_buff::uptr recv(inline_recv_cb* cb, recv_link_if* recv_link, int32_t timeout_ms)
+ {
+ auto queue = _queues.at(cb);
+ if (!queue->empty()) {
+ frame_buff* buff = queue->front();
+ queue->pop_front();
+ return frame_buff::uptr(buff);
+ }
+ while (true) {
+ frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
+ /* Process buffer */
+ if (buff) {
+ bool rcvr_found = false;
+ for (auto& rcvr : _callbacks) {
+ if (rcvr->callback(buff, recv_link)) {
+ rcvr_found = true;
+ if (buff) {
+ if (rcvr == cb) {
+ return frame_buff::uptr(std::move(buff));
+ } else {
+ /* NOTE: Should not overflow, by construction
+ * Every queue can hold link->get_num_recv_frames()
+ */
+ _queues[rcvr]->push_back(buff.release());
+ }
+ }
+ /* Continue looping if buffer was consumed */
+ break;
+ }
+ }
+ if (not rcvr_found) {
+ UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ } else { /* Timeout */
+ return frame_buff::uptr();
+ }
+ }
+ }
+
+private:
+ recv_link_if* _link;
+ std::list<inline_recv_cb*> _callbacks;
+ std::unordered_map<inline_recv_cb*, boost::circular_buffer<frame_buff*>*> _queues;
+};
+
+class inline_recv_io : public virtual recv_io_if, public virtual inline_recv_cb
+{
+public:
+ using sptr = std::shared_ptr<inline_recv_io>;
+
+ inline_recv_io(inline_io_service::sptr io_srv,
+ recv_link_if::sptr data_link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb,
+ send_link_if::sptr fc_link,
+ size_t num_send_frames,
+ fc_callback_t fc_cb)
+ : inline_recv_cb(recv_cb, fc_link.get())
+ , _io_srv(io_srv)
+ , _data_link(data_link)
+ , _num_recv_frames(num_recv_frames)
+ , _fc_link(fc_link)
+ , _num_send_frames(num_send_frames)
+ , _fc_cb(fc_cb)
+ {
+ }
+
+ ~inline_recv_io()
+ {
+ _io_srv->disconnect_receiver(_data_link.get(), this, _num_recv_frames);
+ if (_fc_link) {
+ _io_srv->disconnect_sender(_fc_link.get(), _num_send_frames);
+ }
+ }
+
+ frame_buff::uptr get_recv_buff(int32_t timeout_ms)
+ {
+ return _io_srv->recv(this, _data_link.get(), timeout_ms);
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ _fc_cb(frame_buff::uptr(std::move(buff)), _data_link.get(), _fc_link.get());
+ }
+
+private:
+ inline_io_service::sptr _io_srv;
+ recv_link_if::sptr _data_link;
+ size_t _num_recv_frames;
+ send_link_if::sptr _fc_link;
+ size_t _num_send_frames;
+ fc_callback_t _fc_cb;
+};
+
+class inline_send_io : public virtual send_io_if, public virtual inline_recv_cb
+{
+public:
+ using sptr = std::shared_ptr<inline_send_io>;
+
+ inline_send_io(inline_io_service::sptr io_srv,
+ send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_callback_t send_cb,
+ recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t fc_cb)
+ : inline_recv_cb(fc_cb, send_link.get())
+ , _io_srv(io_srv)
+ , _send_link(send_link)
+ , _num_send_frames(num_send_frames)
+ , _send_cb(send_cb)
+ , _recv_link(recv_link)
+ , _num_recv_frames(num_recv_frames)
+ {
+ }
+
+ ~inline_send_io()
+ {
+ _io_srv->disconnect_sender(_send_link.get(), _num_send_frames);
+ if (_recv_link) {
+ _io_srv->disconnect_receiver(_recv_link.get(), this, _num_recv_frames);
+ }
+ }
+
+ frame_buff::uptr get_send_buff(int32_t timeout_ms)
+ {
+ /* Check initial flow control result */
+ frame_buff::uptr buff = _send_link->get_send_buff(timeout_ms);
+ if (buff) {
+ return frame_buff::uptr(std::move(buff));
+ }
+ return frame_buff::uptr();
+ }
+
+ void release_send_buff(frame_buff::uptr buff)
+ {
+ while (buff) { /* TODO: Possibly don't loop indefinitely here */
+ if (_recv_link) {
+ _io_srv->recv(this, _recv_link.get(), 0);
+ }
+ _send_cb(buff, _send_link.get());
+ }
+ }
+
+private:
+ inline_io_service::sptr _io_srv;
+ send_link_if::sptr _send_link;
+ size_t _num_send_frames;
+ send_callback_t _send_cb;
+ recv_link_if::sptr _recv_link;
+ size_t _num_recv_frames;
+ recv_callback_t _recv_cb;
+};
+
+inline_io_service::~inline_io_service(){};
+
+void inline_io_service::attach_recv_link(recv_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0);
+ _recv_tbl[link_ptr] =
+ std::tuple<inline_recv_mux*, inline_recv_cb*, size_t>(nullptr, nullptr, 0);
+ _recv_links.push_back(link);
+};
+
+recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link,
+ size_t num_recv_frames,
+ recv_callback_t cb,
+ send_link_if::sptr fc_link,
+ size_t num_send_frames,
+ recv_io_if::fc_callback_t fc_cb)
+{
+ UHD_ASSERT_THROW(data_link);
+ UHD_ASSERT_THROW(num_recv_frames > 0);
+ UHD_ASSERT_THROW(cb);
+ if (fc_link) {
+ UHD_ASSERT_THROW(num_send_frames > 0);
+ UHD_ASSERT_THROW(fc_cb);
+ connect_sender(fc_link.get(), num_send_frames);
+ }
+ sptr io_srv = shared_from_this();
+ auto recv_io = std::make_shared<inline_recv_io>(
+ io_srv, data_link, num_recv_frames, cb, fc_link, num_send_frames, fc_cb);
+ connect_receiver(data_link.get(), recv_io.get(), num_recv_frames);
+ return recv_io;
+}
+
+void inline_io_service::attach_send_link(send_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ UHD_ASSERT_THROW(_send_tbl.count(link_ptr) == 0);
+ _send_tbl[link_ptr] = 0;
+ _send_links.push_back(link);
+};
+
+send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_io_if::send_callback_t send_cb,
+ recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb)
+{
+ UHD_ASSERT_THROW(send_link);
+ UHD_ASSERT_THROW(num_send_frames > 0);
+ UHD_ASSERT_THROW(send_cb);
+ connect_sender(send_link.get(), num_send_frames);
+ sptr io_srv = shared_from_this();
+ auto send_io = std::make_shared<inline_send_io>(
+ io_srv, send_link, num_send_frames, send_cb, recv_link, num_recv_frames, recv_cb);
+ if (recv_link) {
+ UHD_ASSERT_THROW(num_recv_frames > 0);
+ UHD_ASSERT_THROW(recv_cb);
+ connect_receiver(recv_link.get(), send_io.get(), num_recv_frames);
+ }
+ return send_io;
+}
+
+/*
+ * Senders are free to mux a send_link, but the total reserved send_frames
+ * must be less than or equal to the link's capacity
+ */
+void inline_io_service::connect_sender(send_link_if* link, size_t num_frames)
+{
+ size_t rsvd_frames = _send_tbl.at(link);
+ size_t frame_capacity = link->get_num_send_frames();
+ UHD_ASSERT_THROW(frame_capacity >= rsvd_frames + num_frames);
+ _send_tbl[link] = rsvd_frames + num_frames;
+}
+
+void inline_io_service::disconnect_sender(send_link_if* link, size_t num_frames)
+{
+ size_t rsvd_frames = _send_tbl.at(link);
+ UHD_ASSERT_THROW(rsvd_frames >= num_frames);
+ _send_tbl[link] = rsvd_frames - num_frames;
+}
+
+void inline_io_service::connect_receiver(
+ recv_link_if* link, inline_recv_cb* cb, size_t num_frames)
+{
+ inline_recv_mux* mux;
+ inline_recv_cb* rcvr;
+ size_t rsvd_frames;
+ std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link);
+ if (mux) {
+ mux->connect(cb);
+ } else if (rcvr) {
+ mux = new inline_recv_mux(link);
+ mux->connect(rcvr);
+ mux->connect(cb);
+ rcvr = nullptr;
+ } else {
+ rcvr = cb;
+ }
+ size_t capacity = link->get_num_recv_frames();
+ UHD_ASSERT_THROW(rsvd_frames + num_frames <= capacity);
+ _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames + num_frames);
+}
+
+void inline_io_service::disconnect_receiver(
+ recv_link_if* link, inline_recv_cb* cb, size_t num_frames)
+{
+ inline_recv_mux* mux;
+ inline_recv_cb* rcvr;
+ size_t rsvd_frames;
+ std::tie(mux, rcvr, rsvd_frames) = _recv_tbl.at(link);
+ UHD_ASSERT_THROW(rsvd_frames >= num_frames);
+ if (mux) {
+ mux->disconnect(cb);
+ if (mux->is_empty()) {
+ delete mux;
+ mux = nullptr;
+ }
+ } else {
+ rcvr = nullptr;
+ }
+ _recv_tbl[link] = std::make_tuple(mux, rcvr, rsvd_frames - num_frames);
+}
+
+frame_buff::uptr inline_io_service::recv(
+ inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms)
+{
+ inline_recv_mux* mux;
+ inline_recv_cb* rcvr;
+ size_t num_frames;
+ std::tie(mux, rcvr, num_frames) = _recv_tbl.at(recv_link);
+
+ if (mux) {
+ /* Defer to mux's recv() if present */
+ return mux->recv(recv_io_cb, recv_link, timeout_ms);
+ } else {
+ assert(recv_io_cb == rcvr);
+ }
+
+ while (true) {
+ frame_buff::uptr buff = recv_link->get_recv_buff(timeout_ms);
+ /* Process buffer */
+ if (buff) {
+ if (rcvr->callback(buff, recv_link)) {
+ if (buff) {
+ return frame_buff::uptr(std::move(buff));
+ }
+ /* Retry receive if got buffer but it got consumed */
+ } else {
+ UHD_LOG_DEBUG("IO_SRV", "Dropping packet with no receiver");
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ } else { /* Timeout */
+ return frame_buff::uptr();
+ }
+ }
+ return frame_buff::uptr();
+}
+
+}} // namespace uhd::transport
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index 431c380c3..0c48ae058 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -252,6 +252,12 @@ UHD_ADD_NONAPI_TEST(
${CMAKE_SOURCE_DIR}/lib/usrp/cores/dsp_core_utils.cpp
)
+UHD_ADD_NONAPI_TEST(
+ TARGET "transport_test.cpp"
+ EXTRA_SOURCES
+ ${CMAKE_SOURCE_DIR}/lib/transport/inline_io_service.cpp
+)
+
########################################################################
# demo of a loadable module
########################################################################
diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp
new file mode 100644
index 000000000..321f22830
--- /dev/null
+++ b/host/tests/common/mock_transport.hpp
@@ -0,0 +1,369 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP
+#define INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP
+
+#include <uhdlib/transport/io_service.hpp>
+#include <boost/lockfree/spsc_queue.hpp>
+#include <utility>
+
+namespace uhd { namespace transport {
+
+namespace {
+constexpr size_t ADDR_OFFSET = 0;
+constexpr size_t TYPE_OFFSET = 1; /* 0 for data, 1 for FC, 2 for msg */
+constexpr size_t SEQNO_OFFSET = 2; /* For FC, this is last seen seqno */
+constexpr size_t LEN_OFFSET = 3;
+constexpr size_t DATA_OFFSET = 4;
+constexpr size_t MSG_BUFFS = 8;
+}; // namespace
+/*!
+ * Mock transport with following packet format:
+ * Data: [dst_addr/src_addr, type, seqno, data_len, data...]
+ * FC: [dst_addr/src_addr, type, ackno]
+ * Msg: [dst_addr/src_addr, type, null, data]
+ * All fields are 32-bit words (dst_addr and src_addr are 16 bits each)
+ */
+class mock_send_transport
+{
+public:
+ using sptr = std::shared_ptr<mock_send_transport>;
+
+ mock_send_transport(io_service::sptr io_srv,
+ send_link_if::sptr send_link,
+ recv_link_if::sptr recv_link,
+ uint16_t dst_addr,
+ uint16_t src_addr,
+ uint32_t credits)
+ : _credits(credits)
+ {
+ _send_addr = (dst_addr << 16) | (src_addr << 0);
+ _recv_addr = (src_addr << 16) | (dst_addr << 0);
+
+ /* Make message client for sending side-band messages */
+ send_io_if::send_callback_t msg_send_cb = [this](frame_buff::uptr& buff,
+ send_link_if* link) {
+ uint32_t* data = (uint32_t*)buff->data();
+ data[ADDR_OFFSET] = this->_send_addr;
+ data[TYPE_OFFSET] = 2; /* MSG type */
+ link->release_send_buff(std::move(buff));
+ };
+ _msg_if = io_srv->make_send_client(
+ send_link, MSG_BUFFS, msg_send_cb, recv_link_if::sptr(), 0, nullptr);
+
+ /* Make client for sending streaming data */
+ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff,
+ send_link_if* link) {
+ this->send_buff(buff, link);
+ };
+ recv_callback_t recv_cb = [this](frame_buff::uptr& buff,
+ recv_link_if* link,
+ send_link_if* /*send_link*/) {
+ return this->recv_buff(buff, link);
+ };
+ /* Pretend get 1 flow control message per sent packet */
+ _send_if = io_srv->make_send_client(
+ send_link, credits, send_cb, recv_link, credits, recv_cb);
+ }
+
+ ~mock_send_transport() {}
+
+ /*!
+ * Get a buffer for creating a non-flow-controlled message
+ */
+ bool put_msg(uint32_t msg, int32_t timeout_ms)
+ {
+ frame_buff::uptr buff = _msg_if->get_send_buff(timeout_ms);
+ if (!buff) {
+ return false;
+ }
+ uint32_t* data = (uint32_t*)buff->data();
+ data[TYPE_OFFSET] = 2;
+ data[DATA_OFFSET] = msg;
+ buff->set_packet_size((1 + DATA_OFFSET) * sizeof(uint32_t));
+ _msg_if->release_send_buff(std::move(buff));
+ return true;
+ }
+
+ /*!
+ * Get an empty frame buffer in which to write packet contents.
+ *
+ * \param timeout_ms a positive timeout value specifies the maximum number
+ of ms to wait, a negative value specifies to block
+ until successful, and a value of 0 specifies no wait.
+ * \return a frame buffer, or null uptr if timeout occurs
+ */
+ frame_buff::uptr get_data_buff(int32_t timeout_ms)
+ {
+ frame_buff::uptr buff = _send_if->get_send_buff(timeout_ms);
+ if (!buff) {
+ return frame_buff::uptr();
+ }
+ uint32_t* data = (uint32_t*)buff->data();
+ data[TYPE_OFFSET] = 0;
+ return frame_buff::uptr(std::move(buff));
+ }
+
+ /*!
+ * Release a frame buffer, allowing the driver to reuse it.
+ *
+ * \param buffer frame buffer to release for reuse by the link
+ */
+ void release_data_buff(frame_buff::uptr& buff, size_t len)
+ {
+ if (len == 0) {
+ _send_if->release_send_buff(std::move(buff));
+ return;
+ }
+ uint32_t* data = (uint32_t*)buff->data();
+ data[LEN_OFFSET] = len;
+ buff->set_packet_size((len + DATA_OFFSET) * sizeof(uint32_t));
+ _send_if->release_send_buff(std::move(buff));
+ }
+
+ /*!
+ * Callback for sending the packet. Callback is responsible for calling
+ * release_send_buff() if it wants to send the packet. This will require
+ * moving the uptr's reference. If the packet will NOT be sent, the
+ * callback must NOT release the uptr.
+ *
+ * Function should update any internal state needed. For example, flow
+ * control state could be updated here, and the header could be filled out
+ * as well, like the packet's sequence number and/or addresses.
+ *
+ * Callbacks execute on the I/O thread! Be careful about what state is
+ * touched. In addition, this callback should NOT sleep.
+ */
+ void send_buff(frame_buff::uptr& buff, send_link_if* send_link)
+ {
+ if (_seqno >= _ackno + _credits) {
+ return;
+ }
+ uint32_t* data = (uint32_t*)buff->data();
+ data[ADDR_OFFSET] = _send_addr;
+ data[SEQNO_OFFSET] = _seqno;
+ send_link->release_send_buff(std::move(buff));
+ _seqno++;
+ }
+
+ /*!
+ * Callback for when packets are received (for processing).
+ * Function should make a determination of whether the packet belongs to it
+ * and return the bool.
+ *
+ * Function may consume and release the buffer internally (if packet was
+ * destined for it). The recv_link_if may be used to release it, and the
+ * provided frame_buff::uptr must relinquish ownership before returning.
+ * If the buffer was not destined for the user of this function, buffer must
+ * NOT be released, and the uptr must remain intact.
+ *
+ * Callbacks execute on the I/O thread! Be careful about what state is
+ * touched. In addition, this callback should NOT sleep.
+ *
+ * \param frame_buff the buffer that was received
+ * \param recv_link_if the link used to retrieve the buffer. Can be used to
+ * release the buffer back to the link, if buffer is consumed internally.
+ * \return true if buffer matched this transport, false otherwise
+ */
+ bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link)
+ {
+ /* Check address and if no match, return false */
+ uint32_t* data = (uint32_t*)buff->data();
+ if (data[ADDR_OFFSET] != _recv_addr) {
+ return false;
+ }
+ if (data[TYPE_OFFSET] == 1) { /* Flow control message */
+ _ackno = data[SEQNO_OFFSET];
+ }
+ if (data[TYPE_OFFSET] != 0) { /* Only data packets go up to user */
+ recv_link->release_recv_buff(std::move(buff));
+ } else { /* mock_send_transport does not receive data packets */
+ return false;
+ }
+ return true;
+ }
+
+ std::pair<uint32_t*, size_t> buff_to_data(frame_buff* buff)
+ {
+ uint32_t* data = (uint32_t*)buff->data();
+ size_t data_len = buff->packet_size() - DATA_OFFSET * sizeof(uint32_t);
+ return std::pair<uint32_t*, size_t>(&data[DATA_OFFSET], data_len);
+ }
+
+private:
+ uint32_t _send_addr;
+ uint32_t _recv_addr;
+ uint32_t _credits;
+ send_io_if::sptr _msg_if;
+ send_io_if::sptr _send_if;
+ uint32_t _seqno = 0;
+ uint32_t _ackno = 0;
+};
+
+/*!
+ * Mock transport with following packet format:
+ * Data: [dst_addr/src_addr, type, seqno, data_len, data...]
+ * FC: [dst_addr/src_addr, type, ackno]
+ * Msg: [dst_addr/src_addr, type, seqno, data_len, data...]
+ * All fields are 32-bit words (dst_addr and src_addr are 16 bits each)
+ */
+class mock_recv_transport
+{
+public:
+ using sptr = std::shared_ptr<mock_recv_transport>;
+
+ mock_recv_transport(io_service::sptr io_srv,
+ recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ uint16_t dst_addr,
+ uint16_t src_addr,
+ uint32_t credits)
+ : _credits(credits)
+ {
+ _send_addr = (src_addr << 16) | (dst_addr << 0);
+ _recv_addr = (dst_addr << 16) | (src_addr << 0);
+
+ /* Make client for sending streaming data */
+ recv_io_if::fc_callback_t send_cb = [this](frame_buff::uptr buff,
+ recv_link_if* recv_link,
+ send_link_if* send_link) {
+ this->handle_flow_ctrl(std::move(buff), recv_link, send_link);
+ };
+ recv_callback_t recv_cb = [this](frame_buff::uptr& buff,
+ recv_link_if* link,
+ send_link_if* /*send_link*/) {
+ return this->recv_buff(buff, link);
+ };
+ /* Pretend get 1 flow control message per sent packet */
+ _recv_if = io_srv->make_recv_client(
+ recv_link, credits, recv_cb, send_link, credits, send_cb);
+ }
+
+ ~mock_recv_transport() {}
+
+ /*!
+ * Get a buffer for creating a non-flow-controlled message
+ */
+ bool get_msg(uint32_t& msg)
+ {
+ if (_msg_queue.read_available()) {
+ msg = _msg_queue.front();
+ _msg_queue.pop();
+ return true;
+ }
+ return false;
+ }
+
+ /*!
+ * Get an empty frame buffer in which to write packet contents.
+ *
+ * \param timeout_ms a positive timeout value specifies the maximum number
+ of ms to wait, a negative value specifies to block
+ until successful, and a value of 0 specifies no wait.
+ * \return a frame buffer, or null uptr if timeout occurs
+ */
+ frame_buff::uptr get_data_buff(int32_t timeout_ms)
+ {
+ return _recv_if->get_recv_buff(timeout_ms);
+ }
+
+ /*!
+ * Release a frame buffer, allowing the driver to reuse it.
+ *
+ * \param buffer frame buffer to release for reuse by the link
+ */
+ void release_data_buff(frame_buff::uptr buff)
+ {
+ _recv_if->release_recv_buff(std::move(buff));
+ }
+
+ /*!
+ * Callback for producing a flow control response.
+ * This callback is run whenever a frame_buff is scheduled to be released.
+ *
+ * The callback must release the buffer, but it can update internal state
+ * as well. It can also send a response with the send_link_if, should it
+ * desire to do so.
+ *
+ * Callbacks execute on the I/O thread! Be careful about what state is
+ * touched. In addition, this callback should NOT sleep.
+ */
+ void handle_flow_ctrl(
+ frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* send_link)
+ {
+ uint32_t* data = (uint32_t*)buff->data();
+ if (data[TYPE_OFFSET] == 0) {
+ frame_buff::uptr fc_buff = send_link->get_send_buff(0);
+ UHD_ASSERT_THROW(fc_buff);
+ uint32_t* fc_data = (uint32_t*)fc_buff->data();
+ fc_data[SEQNO_OFFSET] = data[SEQNO_OFFSET];
+ recv_link->release_recv_buff(std::move(buff));
+ UHD_ASSERT_THROW(buff == nullptr);
+ fc_data[TYPE_OFFSET] = 1; /* FC type */
+ fc_data[ADDR_OFFSET] = _send_addr;
+ send_link->release_send_buff(std::move(fc_buff));
+ } else {
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ }
+
+ /*!
+ * Callback for when packets are received (for processing).
+ * Function should make a determination of whether the packet belongs to it
+ * and return the bool.
+ *
+ * Function may consume and release the buffer internally (if packet was
+ * destined for it). The recv_link_if may be used to release it, and the
+ * provided frame_buff::uptr must relinquish ownership before returning.
+ * If the buffer was not destined for the user of this function, buffer must
+ * NOT be released, and the uptr must remain intact.
+ *
+ * Callbacks execute on the I/O thread! Be careful about what state is
+ * touched. In addition, this callback should NOT sleep.
+ *
+ * \param frame_buff the buffer that was received
+ * \param recv_link_if the link used to retrieve the buffer. Can be used to
+ * release the buffer back to the link, if buffer is consumed internally.
+ * \return true if buffer matched this transport, false otherwise
+ */
+ bool recv_buff(frame_buff::uptr& buff, recv_link_if* recv_link)
+ {
+ /* Check address and if no match, return false */
+ uint32_t* data = (uint32_t*)buff->data();
+ if (data[ADDR_OFFSET] != _recv_addr) {
+ return false;
+ }
+ if (data[TYPE_OFFSET] == 1) { /* No FC for mock_recv_transport */
+ return false;
+ }
+ if (data[TYPE_OFFSET] == 2) { /* Record message */
+ _msg_queue.push(data[DATA_OFFSET]);
+ recv_link->release_recv_buff(std::move(buff));
+ }
+ /* (Data packets will go up to user) */
+ return true;
+ }
+
+ std::pair<uint32_t*, size_t> buff_to_data(frame_buff* buff)
+ {
+ uint32_t* data = (uint32_t*)buff->data();
+ size_t data_len = data[LEN_OFFSET];
+ return std::pair<uint32_t*, size_t>(&data[DATA_OFFSET], data_len);
+ }
+
+private:
+ uint32_t _send_addr;
+ uint32_t _recv_addr;
+ uint32_t _credits;
+ recv_io_if::sptr _recv_if;
+ boost::lockfree::spsc_queue<uint32_t, boost::lockfree::capacity<8>> _msg_queue;
+ uint32_t _seqno = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP */
diff --git a/host/tests/transport_test.cpp b/host/tests/transport_test.cpp
new file mode 100644
index 000000000..3e86da2d8
--- /dev/null
+++ b/host/tests/transport_test.cpp
@@ -0,0 +1,188 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "common/mock_link.hpp"
+#include "common/mock_transport.hpp"
+#include <uhdlib/transport/inline_io_service.hpp>
+#include <boost/test/unit_test.hpp>
+
+using namespace uhd::transport;
+
+static mock_send_link::sptr make_send_link(size_t num_frames)
+{
+ const mock_send_link::link_params params = {1000, num_frames};
+ return std::make_shared<mock_send_link>(params);
+}
+
+static mock_recv_link::sptr make_recv_link(size_t num_frames)
+{
+ const mock_recv_link::link_params params = {1000, num_frames};
+ return std::make_shared<mock_recv_link>(params);
+}
+
+static mock_send_transport::sptr make_send_xport(io_service::sptr io_srv,
+ send_link_if::sptr send_link,
+ recv_link_if::sptr recv_link,
+ uint16_t dst_addr,
+ uint16_t src_addr,
+ uint32_t credits)
+{
+ return std::make_shared<mock_send_transport>(
+ io_srv, send_link, recv_link, dst_addr, src_addr, credits);
+}
+
+static mock_recv_transport::sptr make_recv_xport(io_service::sptr io_srv,
+ recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ uint16_t dst_addr,
+ uint16_t src_addr,
+ uint32_t credits)
+{
+ return std::make_shared<mock_recv_transport>(
+ io_srv, recv_link, send_link, dst_addr, src_addr, credits);
+}
+
+BOOST_AUTO_TEST_CASE(test_construction)
+{
+ auto io_srv = inline_io_service::make();
+ auto send_link = make_send_link(40);
+ io_srv->attach_send_link(send_link);
+ auto recv_link = make_recv_link(40);
+ io_srv->attach_recv_link(recv_link);
+ auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32);
+
+ auto send_buff = send_xport->get_data_buff(0);
+ send_buff->set_packet_size(0);
+ send_xport->release_data_buff(send_buff, 0);
+ send_xport.reset();
+
+ auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32);
+ uint32_t msg;
+ UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false);
+}
+
+BOOST_AUTO_TEST_CASE(test_io)
+{
+ auto io_srv = inline_io_service::make();
+ auto send_link0 = make_send_link(40);
+ io_srv->attach_send_link(send_link0);
+ auto recv_link0 = make_recv_link(40);
+ io_srv->attach_recv_link(recv_link0);
+ auto send_xport = make_send_xport(io_srv, send_link0, recv_link0, 1, 2, 32);
+
+ auto send_link1 = make_send_link(40);
+ io_srv->attach_send_link(send_link1);
+ auto recv_link1 = make_recv_link(40);
+ io_srv->attach_recv_link(recv_link1);
+ auto recv_xport = make_recv_xport(io_srv, recv_link1, send_link1, 1, 2, 32);
+
+ /* FIXME: Testing async messages requires the dummy read -- To not have it, needs recv
+ * mux + separate recv queue */
+ send_xport->put_msg(0xa5d3b33f, 0);
+ auto packet = send_link0->pop_send_packet();
+ recv_link1->push_back_recv_packet(packet.first, packet.second);
+ auto recv_buff = recv_xport->get_data_buff(0);
+ if (recv_buff) {
+ recv_xport->release_data_buff(std::move(recv_buff));
+ }
+ uint32_t msg;
+ UHD_ASSERT_THROW(recv_xport->get_msg(msg));
+ UHD_ASSERT_THROW(msg == 0xa5d3b33f);
+
+ auto send_buff = send_xport->get_data_buff(0);
+ UHD_ASSERT_THROW(send_buff);
+ auto buff_data = send_xport->buff_to_data(send_buff.get());
+ UHD_ASSERT_THROW(buff_data.second >= 16);
+ uint32_t* data = buff_data.first;
+ for (size_t i = 0; i < 16; i++) {
+ data[i] = (uint32_t)i;
+ }
+
+ send_xport->release_data_buff(send_buff, 16);
+ packet = send_link0->pop_send_packet();
+ recv_link1->push_back_recv_packet(packet.first, packet.second);
+
+ recv_buff = recv_xport->get_data_buff(0);
+ UHD_ASSERT_THROW(recv_buff);
+ auto recv_data = recv_xport->buff_to_data(recv_buff.get());
+ UHD_ASSERT_THROW(recv_data.second == 16);
+ data = recv_data.first;
+ for (size_t i = 0; i < 16; i++) {
+ UHD_ASSERT_THROW(data[i] == (uint32_t)i);
+ }
+ recv_xport->release_data_buff(std::move(recv_buff));
+}
+
+BOOST_AUTO_TEST_CASE(test_muxed_io)
+{
+ auto io_srv = inline_io_service::make();
+ auto send_link = make_send_link(80);
+ io_srv->attach_send_link(send_link);
+ auto recv_link = make_recv_link(80);
+ io_srv->attach_recv_link(recv_link);
+ auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32);
+ auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32);
+
+ /* Send a sideband message */
+ send_xport->put_msg(0xa5d3b33f, 0);
+
+ /* Send some normal data */
+ auto send_buff = send_xport->get_data_buff(0);
+ UHD_ASSERT_THROW(send_buff);
+ auto buff_data = send_xport->buff_to_data(send_buff.get());
+ UHD_ASSERT_THROW(buff_data.second >= 16);
+ uint32_t* data = buff_data.first;
+ for (size_t i = 0; i < 16; i++) {
+ data[i] = (uint32_t)i;
+ }
+ send_xport->release_data_buff(send_buff, 16);
+
+ /* Move the two packets over */
+ auto packet = send_link->pop_send_packet();
+ recv_link->push_back_recv_packet(packet.first, packet.second);
+ packet = send_link->pop_send_packet();
+ recv_link->push_back_recv_packet(packet.first, packet.second);
+
+ /* Try to receive the data
+ * (message won't arrive unless we try to get the data first)
+ * However, the message should be processed and enqueued here
+ */
+ auto recv_buff = recv_xport->get_data_buff(0);
+ UHD_ASSERT_THROW(recv_buff);
+ auto recv_data = recv_xport->buff_to_data(recv_buff.get());
+ UHD_ASSERT_THROW(recv_data.second == 16);
+ data = recv_data.first;
+ for (size_t i = 0; i < 16; i++) {
+ UHD_ASSERT_THROW(data[i] == (uint32_t)i);
+ }
+ recv_xport->release_data_buff(std::move(recv_buff));
+
+ /* Now can get the message */
+ uint32_t msg;
+ UHD_ASSERT_THROW(recv_xport->get_msg(msg));
+ UHD_ASSERT_THROW(msg == 0xa5d3b33f);
+}
+
+/*
+BOOST_AUTO_TEST_CASE(test_oversubscribed)
+{
+ auto io_srv = inline_io_service::make();
+ auto send_link = make_send_link(32);
+ io_srv->attach_send_link(send_link);
+ auto recv_link = make_recv_link(32);
+ io_srv->attach_recv_link(recv_link);
+ auto send_xport = make_send_xport(io_srv, send_link, recv_link, 1, 2, 32);
+
+ auto send_buff = send_xport->get_data_buff(0);
+ send_buff->set_packet_size(0);
+ send_xport->release_data_buff(send_buff, 0);
+ send_xport.reset();
+
+ auto recv_xport = make_recv_xport(io_srv, recv_link, send_link, 1, 2, 32);
+ uint32_t msg;
+ UHD_ASSERT_THROW(recv_xport->get_msg(msg) == false);
+}
+*/