aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorAaron Rossetto <aaron.rossetto@ni.com>2019-10-17 08:44:11 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:32 -0800
commit0bd233e64210c6605e8a6ec1424fa81f9ea8a681 (patch)
treef97729a7bba21cdfc45ee756bee1ac0489358544 /host/lib
parent912ed28b3df13b9f9c33f2fa92867ec0ac7445fd (diff)
downloaduhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.tar.gz
uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.tar.bz2
uhd-0bd233e64210c6605e8a6ec1424fa81f9ea8a681.zip
uhd: Introduce I/O service manager
- Implement I/O service detach link methods - The I/O service manager instantiates new I/O services or connects links to existing I/O services based on options provided by the user in stream_args. - Add a streamer ID parameter to methods to create transports so that the I/O service manager can group transports appropriately when using offload threads. - Change X300 and MPMD to use I/O service manager to connect links to I/O services. - There is now a single I/O service manager per rfnoc_graph (and it is also stored in the graph) - The I/O service manager now also knows the device args for the rfnoc_graph it was created with, and can make decisions based upon those (e.g, use a specific I/O service for DPDK, share cores between streamers, etc.) - The I/O Service Manager does not get any decision logic with this commit, though - The MB ifaces for mpmd and x300 now access this global I/O service manager - Add configuration of link parameters with overrides Co-Authored-By: Martin Braun <martin.braun@ettus.com> Co-Authored-By: Aaron Rossetto <aaron.rossetto@ni.com>
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp9
-rw-r--r--host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp8
-rw-r--r--host/lib/include/uhdlib/rfnoc/mb_iface.hpp34
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp5
-rw-r--r--host/lib/include/uhdlib/transport/inline_io_service.hpp6
-rw-r--r--host/lib/include/uhdlib/transport/io_service.hpp13
-rw-r--r--host/lib/include/uhdlib/transport/links.hpp3
-rw-r--r--host/lib/include/uhdlib/transport/offload_io_service.hpp14
-rw-r--r--host/lib/include/uhdlib/transport/udp_common.hpp75
-rw-r--r--host/lib/include/uhdlib/usrp/common/io_service_args.hpp93
-rw-r--r--host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp104
-rw-r--r--host/lib/rfnoc/graph_stream_manager.cpp10
-rw-r--r--host/lib/rfnoc/link_stream_manager.cpp12
-rw-r--r--host/lib/rfnoc/rfnoc_graph.cpp21
-rw-r--r--host/lib/transport/inline_io_service.cpp30
-rw-r--r--host/lib/transport/offload_io_service.cpp73
-rw-r--r--host/lib/usrp/common/CMakeLists.txt2
-rw-r--r--host/lib/usrp/common/io_service_args.cpp101
-rw-r--r--host/lib/usrp/common/io_service_mgr.cpp511
-rw-r--r--host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp66
-rw-r--r--host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp2
-rw-r--r--host/lib/usrp/mpmd/mpmd_mb_iface.cpp66
-rw-r--r--host/lib/usrp/mpmd/mpmd_mb_iface.hpp16
-rw-r--r--host/lib/usrp/x300/x300_eth_mgr.cpp74
-rw-r--r--host/lib/usrp/x300/x300_impl.hpp12
-rw-r--r--host/lib/usrp/x300/x300_mb_iface.cpp80
26 files changed, 1264 insertions, 176 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
index 8a721ea26..b9f4205ab 100644
--- a/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/graph_stream_manager.hpp
@@ -19,6 +19,7 @@
#include <functional>
#include <memory>
#include <set>
+#include <string>
#include <tuple>
namespace uhd { namespace rfnoc {
@@ -114,6 +115,7 @@ public:
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param adapter The preference for the adapter to use to get to the destination
* \param xport_args The transport arguments
+ * \param streamer_id A unique identifier for the streamer that will own the transport
* \return An transport instance
*/
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
@@ -121,7 +123,8 @@ public:
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const uhd::transport::adapter_id_t adapter,
- const device_addr_t& xport_args) = 0;
+ const device_addr_t& xport_args,
+ const std::string& streamer_id) = 0;
/*! \brief Create a data stream going from the host to the device
*
@@ -130,6 +133,7 @@ public:
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param adapter The preference for the adapter to use to get to the destination
* \param xport_args The transport arguments
+ * \param streamer_id A unique identifier for the streamer that will own the transport
* \return An transport instance
*/
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
@@ -137,7 +141,8 @@ public:
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const uhd::transport::adapter_id_t adapter,
- const device_addr_t& xport_args) = 0;
+ const device_addr_t& xport_args,
+ const std::string& streamer_id) = 0;
/*! \brief Get all the adapters that can reach the specified endpoint
*
diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
index b1a934891..836a50dcf 100644
--- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
+++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp
@@ -126,13 +126,15 @@ public:
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args The transport arguments
+ * \param streamer_id A unique identifier for the streamer that will own the transport
* \return An transport instance
*/
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
const sep_addr_t dst_addr,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const device_addr_t& xport_args) = 0;
+ const device_addr_t& xport_args,
+ const std::string& streamer_id) = 0;
/*! \brief Create a data stream going from the device to the host
*
@@ -140,13 +142,15 @@ public:
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args The transport arguments
+ * \param streamer_id A unique identifier for the streamer that will own the transport
* \return An transport instance
*/
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
const sep_addr_t src_addr,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const device_addr_t& xport_args) = 0;
+ const device_addr_t& xport_args,
+ const std::string& streamer_id) = 0;
static uptr make(const chdr::chdr_packet_factory& pkt_factory,
mb_iface& mb_if,
diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
index abfc9d1c4..53f0897f9 100644
--- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
+++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_LIBUHD_MB_IFACE_HPP
#define INCLUDED_LIBUHD_MB_IFACE_HPP
+#include <uhd/exception.hpp>
#include <uhd/transport/adapter_id.hpp>
#include <uhd/types/endianness.hpp>
#include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
@@ -14,6 +15,7 @@
#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp>
#include <uhdlib/rfnoc/clock_iface.hpp>
#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/usrp/common/io_service_mgr.hpp>
#include <memory>
namespace uhd { namespace rfnoc {
@@ -86,6 +88,27 @@ public:
virtual std::shared_ptr<clock_iface> get_clock_iface(
const std::string& clock_name) = 0;
+ /*! Set the IO service manager
+ *
+ */
+ void set_io_srv_mgr(uhd::usrp::io_service_mgr::sptr io_srv_mgr)
+ {
+ _io_srv_mgr = io_srv_mgr;
+ }
+
+ /*! Get the I/O Service Manager
+ *
+ * This function must be called by the implementations of the various
+ * make_*_transport() calls to get access to the global I/O Service Manager
+ */
+ uhd::usrp::io_service_mgr::sptr get_io_srv_mgr()
+ {
+ if (!_io_srv_mgr) {
+ throw uhd::runtime_error("I/O Service Manager not set for mb_iface!");
+ }
+ return _io_srv_mgr;
+ }
+
/*! Create a control transport
*
* This is usually called once per motherboard, since there is only one
@@ -108,6 +131,7 @@ public:
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
+ * \param streamer_id A unique identifier for the streamer that will own the transport
* \return A CHDR RX data transport
*/
virtual chdr_rx_data_xport::uptr make_rx_data_transport(
@@ -116,7 +140,8 @@ public:
const sep_id_pair_t& epids,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const device_addr_t& xport_args) = 0;
+ const device_addr_t& xport_args,
+ const std::string& streamer_id) = 0;
/*! Create an TX data transport
*
@@ -127,6 +152,7 @@ public:
* \param pyld_buff_fmt Datatype of SW buffer that holds the data payload
* \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata
* \param xport_args Transport configuration args
+ * \param streamer_id A unique identifier for the streamer that will own the transport
* \return A CHDR TX data transport
*/
virtual chdr_tx_data_xport::uptr make_tx_data_transport(
@@ -135,7 +161,11 @@ public:
const sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const device_addr_t& xport_args) = 0;
+ const device_addr_t& xport_args,
+ const std::string& streamer_id) = 0;
+
+private:
+ uhd::usrp::io_service_mgr::sptr _io_srv_mgr;
};
}} /* namespace uhd::rfnoc */
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
index c2ec4e0e3..226d4f069 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp
@@ -95,6 +95,11 @@ constexpr uint32_t MAX_FC_FREQ_PKTS = (uint32_t(1) << 24) - 1;
constexpr uint64_t MAX_FC_HEADROOM_BYTES = (uint64_t(1) << 16) - 1;
constexpr uint32_t MAX_FC_HEADROOM_PKTS = (uint32_t(1) << 8) - 1;
+// RFNoC devices need a minimum of two frame buffers to be available from the
+// link--one for the data transport and one for the control transport to
+// simultaneously handle MGMT and STRC/STRS initialization packets.
+constexpr size_t MIN_NUM_FRAMES = 2;
+
}} // namespace uhd::rfnoc
#endif /* INCLUDED_RFNOC_RFNOC_COMMON_HPP */
diff --git a/host/lib/include/uhdlib/transport/inline_io_service.hpp b/host/lib/include/uhdlib/transport/inline_io_service.hpp
index f207d15a0..fe41b96b6 100644
--- a/host/lib/include/uhdlib/transport/inline_io_service.hpp
+++ b/host/lib/include/uhdlib/transport/inline_io_service.hpp
@@ -37,6 +37,9 @@ public:
void attach_recv_link(recv_link_if::sptr link);
void attach_send_link(send_link_if::sptr link);
+ void detach_recv_link(recv_link_if::sptr link);
+ void detach_send_link(send_link_if::sptr link);
+
recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link,
size_t num_recv_frames,
recv_callback_t cb,
@@ -102,8 +105,7 @@ private:
inline_recv_cb* recv_io_cb, recv_link_if* recv_link, int32_t timeout_ms);
/* Track whether link is muxed and the callback */
- std::unordered_map<recv_link_if*,
- std::tuple<inline_recv_mux*, inline_recv_cb*>>
+ std::unordered_map<recv_link_if*, std::tuple<inline_recv_mux*, inline_recv_cb*>>
_recv_tbl;
/* Shared ptr kept to avoid untimely release */
diff --git a/host/lib/include/uhdlib/transport/io_service.hpp b/host/lib/include/uhdlib/transport/io_service.hpp
index 69a3a523e..399b693dc 100644
--- a/host/lib/include/uhdlib/transport/io_service.hpp
+++ b/host/lib/include/uhdlib/transport/io_service.hpp
@@ -282,10 +282,19 @@ public:
*/
virtual void attach_send_link(send_link_if::sptr link) = 0;
- /* TODO: Cleanup functions
+ /*!
+ * Detach a recv_link_if previously attached to this I/O service.
+ *
+ * \param link the recv_link_if to detach
+ */
virtual void detach_recv_link(recv_link_if::sptr link) = 0;
+
+ /*!
+ * Detach a send_link_if previously attached to this I/O service.
+ *
+ * \param link the send_link_if to detach
+ */
virtual void detach_send_link(send_link_if::sptr link) = 0;
- */
/*!
* Create a send_io_if so a transport may send packets through the link.
diff --git a/host/lib/include/uhdlib/transport/links.hpp b/host/lib/include/uhdlib/transport/links.hpp
index 64673f02f..09872b145 100644
--- a/host/lib/include/uhdlib/transport/links.hpp
+++ b/host/lib/include/uhdlib/transport/links.hpp
@@ -16,8 +16,7 @@ namespace uhd { namespace transport {
enum class link_type_t { CTRL = 0, ASYNC_MSG, TX_DATA, RX_DATA };
//! Contains all information regarding a link interface
-using both_links_t = std::tuple<uhd::transport::io_service::sptr,
- uhd::transport::send_link_if::sptr,
+using both_links_t = std::tuple<uhd::transport::send_link_if::sptr,
size_t, // num_send_frames
uhd::transport::recv_link_if::sptr,
size_t, // num_recv_frames
diff --git a/host/lib/include/uhdlib/transport/offload_io_service.hpp b/host/lib/include/uhdlib/transport/offload_io_service.hpp
index a7d9d211d..02231c502 100644
--- a/host/lib/include/uhdlib/transport/offload_io_service.hpp
+++ b/host/lib/include/uhdlib/transport/offload_io_service.hpp
@@ -8,6 +8,7 @@
#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_HPP
#include <uhdlib/transport/io_service.hpp>
+#include <vector>
namespace uhd { namespace transport {
@@ -21,18 +22,9 @@ namespace uhd { namespace transport {
class offload_io_service : public io_service
{
public:
- enum client_type_t
- {
- RECV_ONLY,
- SEND_ONLY,
- BOTH_SEND_AND_RECV
- };
+ enum client_type_t { RECV_ONLY, SEND_ONLY, BOTH_SEND_AND_RECV };
- enum wait_mode_t
- {
- POLL,
- BLOCK
- };
+ enum wait_mode_t { POLL, BLOCK };
/*!
* Options for configuring offload I/O service
diff --git a/host/lib/include/uhdlib/transport/udp_common.hpp b/host/lib/include/uhdlib/transport/udp_common.hpp
index 5f5a18c27..6c87ef498 100644
--- a/host/lib/include/uhdlib/transport/udp_common.hpp
+++ b/host/lib/include/uhdlib/transport/udp_common.hpp
@@ -10,7 +10,10 @@
#include <uhd/config.hpp>
#include <uhd/exception.hpp>
+#include <uhd/rfnoc/constants.hpp>
+#include <uhd/types/device_addr.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/links.hpp>
#include <boost/asio.hpp>
#include <boost/format.hpp>
#include <thread>
@@ -194,6 +197,78 @@ UHD_INLINE size_t resize_udp_socket_buffer_with_warning(
return actual_size;
}
+/*!
+ * Determines a set of values to use for a UDP CHDR link based on defaults and
+ * any overrides that the user may have provided. In cases where both device
+ * and stream arguments can be used to override a value, note that the stream
+ * argument will always take precedence.
+ *
+ * \param link_type the link type (CTRL, RX, TX) to calculate parameters for
+ * \param send_mtu the MTU of link for Tx cases
+ * \param recv_mtu the MTU of link for Rx cases
+ * \param default_link_params default values to use for the link parameters
+ * \param device_args device-level argument dictionary for overrides
+ * \param link_args argument dictionary with stream-level overrides (come from
+ * stream params)
+ * \return Parameters to apply
+ */
+inline link_params_t calculate_udp_link_params(
+ const uhd::transport::link_type_t link_type,
+ const size_t send_mtu,
+ const size_t recv_mtu,
+ const link_params_t& default_link_params,
+ const uhd::device_addr_t& device_args,
+ const uhd::device_addr_t& link_args)
+{
+ // Apply any device-level overrides to the default values first.
+ // If the MTU is overridden, it will be capped to the value provided by
+ // the caller.
+ const size_t constrained_send_mtu =
+ std::min(send_mtu, device_args.cast<size_t>("mtu", send_mtu));
+ const size_t constrained_recv_mtu =
+ std::min(recv_mtu, device_args.cast<size_t>("mtu", recv_mtu));
+
+ link_params_t link_params;
+ link_params.num_send_frames =
+ device_args.cast<size_t>("num_send_frames", default_link_params.num_send_frames);
+ link_params.num_recv_frames =
+ device_args.cast<size_t>("num_recv_frames", default_link_params.num_recv_frames);
+ link_params.send_frame_size =
+ device_args.cast<size_t>("send_frame_size", default_link_params.send_frame_size);
+ link_params.recv_frame_size =
+ device_args.cast<size_t>("recv_frame_size", default_link_params.recv_frame_size);
+ link_params.send_buff_size =
+ device_args.cast<size_t>("send_buff_size", default_link_params.send_buff_size);
+ link_params.recv_buff_size =
+ device_args.cast<size_t>("recv_buff_size", default_link_params.recv_buff_size);
+
+ // Now apply stream-level overrides based on the link type.
+ if (link_type == link_type_t::CTRL) {
+ // Control links typically do not allow the number of frames to be
+ // configured.
+ link_params.num_recv_frames =
+ uhd::rfnoc::CMD_FIFO_SIZE / uhd::rfnoc::MAX_CMD_PKT_SIZE;
+ } else if (link_type == link_type_t::TX_DATA) {
+ // Note that the send frame size will be capped to the Tx MTU.
+ link_params.send_frame_size = link_args.cast<size_t>("send_frame_size",
+ std::min(link_params.send_frame_size, constrained_send_mtu));
+ link_params.num_send_frames =
+ link_args.cast<size_t>("num_send_frames", link_params.num_send_frames);
+ link_params.send_buff_size =
+ link_args.cast<size_t>("send_buff_size", link_params.send_buff_size);
+ } else if (link_type == link_type_t::RX_DATA) {
+ // Note that the receive frame size will be capped to the Rx MTU.
+ link_params.recv_frame_size = link_args.cast<size_t>("recv_frame_size",
+ std::min(link_params.recv_frame_size, constrained_recv_mtu));
+ link_params.num_recv_frames =
+ link_args.cast<size_t>("num_recv_frames", link_params.num_recv_frames);
+ link_params.recv_buff_size =
+ link_args.cast<size_t>("recv_buff_size", link_params.recv_buff_size);
+ }
+
+ return link_params;
+}
+
}} // namespace uhd::transport
diff --git a/host/lib/include/uhdlib/usrp/common/io_service_args.hpp b/host/lib/include/uhdlib/usrp/common/io_service_args.hpp
new file mode 100644
index 000000000..a783cc825
--- /dev/null
+++ b/host/lib/include/uhdlib/usrp/common/io_service_args.hpp
@@ -0,0 +1,93 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_IO_SERVICE_ARGS_HPP
+#define INCLUDED_LIBUHD_IO_SERVICE_ARGS_HPP
+
+#include <uhd/types/device_addr.hpp>
+#include <boost/optional.hpp>
+
+namespace uhd { namespace usrp {
+
+/*! Struct containing user options for I/O services
+ *
+ * The I/O service manager supports the following args:
+ *
+ * recv_offload: set to "true" to use an offload thread for RX_DATA links, "false"
+ * to use an inline I/O service.
+ * send_offload: set to "true" to use an offload thread for TX_DATA links, "false"
+ * to use an inline I/O service.
+ * recv_offload_wait_mode: set to "poll" to use a polling strategy in the offload
+ * thread, set to "block" to use a blocking strategy.
+ * send_offload_wait_mode: set to "poll" to use a polling strategy in the offload
+ * thread, set to "block" to use a blocking strategy.
+ * num_poll_offload_threads: set to the total number of offload threads to use for
+ * RX_DATA and TX_DATA in this rfnoc_graph. New connections
+ * always go to the offload thread containing the fewest
+ * connections, with lowest numbered thread as a second
+ * criterion. The default is 1.
+ * recv_offload_cpu_<N>: an integer to specify cpu affinity of the offload thread.
+ * N indicates the thread instance, starting with 0 for each
+ * streamer and ending with the number of transport adapters
+ * minus one. Only used if the I/O service is configured to
+ * block.
+ * send_offload_cpu_<N>: an integer to specify cpu affinity of the offload thread.
+ * N indicates the thread instance, starting with 0 for each
+ * streamer and ending with the number of transport adapters
+ * minus one. Only used if the I/O service is configured to
+ * block.
+ * poll_offload_cpu_<N>: an integer to specify cpu affinity of the offload thread.
+ * N indicates the thread instance, starting with 0 and up to
+ * num_poll_offload_threads minus 1. Only used if the I/O
+ * service is configured to poll.
+ */
+struct io_service_args_t
+{
+ enum wait_mode_t { POLL, BLOCK };
+
+ //! Whether to offload streaming I/O to a worker thread
+ bool recv_offload = false;
+
+ //! Whether to offload streaming I/O to a worker thread
+ bool send_offload = false;
+
+ //! Whether the offload thread should poll or block
+ wait_mode_t recv_offload_wait_mode = BLOCK;
+
+ //! Whether the offload thread should poll or block
+ wait_mode_t send_offload_wait_mode = BLOCK;
+
+ //! Number of polling threads to use, if wait_mode is set to POLL
+ size_t num_poll_offload_threads = 1;
+
+ //! CPU affinity of offload threads, if wait_mode is set to BLOCK (one item
+ //! per thread)
+ std::vector<boost::optional<size_t>> recv_offload_thread_cpu;
+
+ //! CPU affinity of offload threads, if wait_mode is set to BLOCK (one item
+ //! per thread)
+ std::vector<boost::optional<size_t>> send_offload_thread_cpu;
+
+ //! CPU affinity of offload threads, if wait_mode is set to POLL (one item
+ //! per thread)
+ std::vector<boost::optional<size_t>> poll_offload_thread_cpu;
+};
+
+/*! Reads I/O service args from provided dictionary
+ *
+ * If an option is not specified in the dictionary, the default value of the
+ * struct above is returned.
+ *
+ * \param args The dictionary from which to read the I/O service args
+ * \param defaults Default values (not including boost::optional values)
+ * \return The I/O service args read
+ */
+io_service_args_t read_io_service_args(
+ const device_addr_t& args, const io_service_args_t& defaults);
+
+}} // namespace uhd::usrp
+
+#endif /* INCLUDED_LIBUHD_IO_SERVICE_ARGS_HPP */
diff --git a/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp b/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp
new file mode 100644
index 000000000..1093f7bec
--- /dev/null
+++ b/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp
@@ -0,0 +1,104 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_IO_SERVICE_MGR_HPP
+#define INCLUDED_LIBUHD_IO_SERVICE_MGR_HPP
+
+#include <uhd/transport/adapter_id.hpp>
+#include <uhd/types/device_addr.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <uhdlib/transport/links.hpp>
+#include <uhdlib/usrp/common/io_service_args.hpp>
+#include <memory>
+
+namespace uhd { namespace usrp {
+
+/*! Class to create and manage I/O services
+ *
+ * The I/O service manager connects links to I/O services, instantiating new I/O
+ * services as needed. It chooses the I/O service to connect based on options
+ * from the user passed through stream_args, as well as defaults provided by the
+ * caller.
+ *
+ * The I/O service manager supports two types of I/O services: inline I/O service
+ * and offload I/O service. Inline I/O services execute all I/O in the caller
+ * thread. Offload I/O services execute all I/O in an offload thread. The offload
+ * thread can be configured to block or poll. All control links use inline I/O
+ * services, only RX and TX data links currently use offload I/O services.
+ *
+ * If polling I/O services are requested, the I/O service manager instantiates
+ * the number of I/O services specified by the user through args. It chooses
+ * which I/O service to connect a set of links to by selecting the I/O service
+ * with the fewest number of connections.
+ *
+ * If blocking I/O services are requested, the I/O service manager instantiates
+ * one offload I/O service for each transport adapter used by a streamer. When
+ * there are multiple streamers, this manager creates a separate set of I/O
+ * services for each streamer.
+ *
+ * Offload I/O services have a number of restrictions that must be observed:
+ * - Offload I/O services currently do not support links that require frame
+ * buffers to be released in order.
+ * - Blocking I/O services should only be used for groups of RX or TX data
+ * transport in the same streamer. Since the I/O service blocks on each
+ * channel, if two streamers were to be configured to share the I/O service,
+ * one streamer would block the progress of the other. The I/O service
+ * manager ensures this restriction is observed by grouping I/O services
+ * and links appropriately.
+ * - Blocking I/O services do not currently support muxed links, since the I/O
+ * service is specialized to either RX or TX data and the procedure to configure
+ * a data transport requires both RX and TX clients. The I/O service manager
+ * throws an exception if requested to mux a link configured with a blocking
+ * I/O service.
+ */
+class io_service_mgr
+{
+public:
+ using sptr = std::shared_ptr<io_service_mgr>;
+
+ /*! Connects a pair of links to an I/O service
+ *
+ * Call this method to connect a pair of links to an I/O service. For muxed
+ * links, the I/O service manager keeps track of the number of muxed
+ * connections (the number of times this method has been called with the same
+ * links).
+ *
+ * The last two parameters are ignored for control links.
+ *
+ * \param recv_link The recv link to connect to an I/O service
+ * \param send_link The send link to connect to an I/O service
+ * \param link_type The type of transport in which the links will be used
+ * \param io_srv_args The user-requested options for the stream
+ * \param streamer_id A unique ID for the streamer that will use the links
+ * \return The I/O service to which the links are connected
+ */
+ virtual transport::io_service::sptr connect_links(
+ transport::recv_link_if::sptr recv_link,
+ transport::send_link_if::sptr send_link,
+ const transport::link_type_t link_type,
+ const io_service_args_t& io_srv_args = io_service_args_t(),
+ const std::string& streamer_id = "") = 0;
+
+ /*! Disconnects links from their I/O service
+ *
+ * \param recv_link The recv link to disconnect from an I/O service
+ * \param send_link The send link to disconnect from an I/O service
+ */
+ virtual void disconnect_links(transport::recv_link_if::sptr recv_link,
+ transport::send_link_if::sptr send_link) = 0;
+
+ /*! Creates an instance of an I/O service manager
+ *
+ * \params Device args used to create the UHD session
+ * \return The I/O service manager instance
+ */
+ static sptr make(const uhd::device_addr_t& args);
+};
+
+}} // namespace uhd::usrp
+
+#endif /* INCLUDED_LIBUHD_IO_SERVICE_MGR_HPP */
diff --git a/host/lib/rfnoc/graph_stream_manager.cpp b/host/lib/rfnoc/graph_stream_manager.cpp
index dba913998..f8acab9f2 100644
--- a/host/lib/rfnoc/graph_stream_manager.cpp
+++ b/host/lib/rfnoc/graph_stream_manager.cpp
@@ -189,7 +189,8 @@ public:
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const uhd::transport::adapter_id_t adapter,
- const device_addr_t& xport_args)
+ const device_addr_t& xport_args,
+ const std::string& streamer_id)
{
device_id_t dev = _check_dst_and_find_src(
src_addr, adapter, uhd::transport::link_type_t::RX_DATA);
@@ -198,7 +199,7 @@ public:
allocs.rx++;
_alloc_map[chosen] = allocs;
return _link_mgrs.at(dev)->create_device_to_host_data_stream(
- src_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args);
+ src_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
}
virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
@@ -206,7 +207,8 @@ public:
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
const uhd::transport::adapter_id_t adapter,
- const device_addr_t& xport_args)
+ const device_addr_t& xport_args,
+ const std::string& streamer_id)
{
device_id_t dev = _check_dst_and_find_src(
dst_addr, adapter, uhd::transport::link_type_t::TX_DATA);
@@ -215,7 +217,7 @@ public:
allocs.tx++;
_alloc_map[chosen] = allocs;
return _link_mgrs.at(dev)->create_host_to_device_data_stream(
- dst_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args);
+ dst_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
}
std::vector<uhd::transport::adapter_id_t> get_adapters(sep_addr_t addr) const
diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp
index c0d79c519..59b80b59e 100644
--- a/host/lib/rfnoc/link_stream_manager.cpp
+++ b/host/lib/rfnoc/link_stream_manager.cpp
@@ -222,7 +222,8 @@ public:
const sep_addr_t dst_addr,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const device_addr_t& xport_args)
+ const device_addr_t& xport_args,
+ const std::string& streamer_id)
{
_ensure_ep_is_reachable(dst_addr);
@@ -244,14 +245,16 @@ public:
{src_epid, dst_epid},
pyld_buff_fmt,
mdata_buff_fmt,
- xport_args);
+ xport_args,
+ streamer_id);
}
virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream(
sep_addr_t src_addr,
const sw_buff_t pyld_buff_fmt,
const sw_buff_t mdata_buff_fmt,
- const device_addr_t& xport_args)
+ const device_addr_t& xport_args,
+ const std::string& streamer_id)
{
_ensure_ep_is_reachable(src_addr);
@@ -273,7 +276,8 @@ public:
{src_epid, dst_epid},
pyld_buff_fmt,
mdata_buff_fmt,
- xport_args);
+ xport_args,
+ streamer_id);
}
private:
diff --git a/host/lib/rfnoc/rfnoc_graph.cpp b/host/lib/rfnoc/rfnoc_graph.cpp
index 929ce518d..6ebfe8612 100644
--- a/host/lib/rfnoc/rfnoc_graph.cpp
+++ b/host/lib/rfnoc/rfnoc_graph.cpp
@@ -18,6 +18,7 @@
#include <uhdlib/rfnoc/rfnoc_device.hpp>
#include <uhdlib/rfnoc/rfnoc_rx_streamer.hpp>
#include <uhdlib/rfnoc/rfnoc_tx_streamer.hpp>
+#include <uhdlib/usrp/common/io_service_mgr.hpp>
#include <uhdlib/utils/narrow.hpp>
#include <memory>
@@ -50,6 +51,7 @@ public:
_graph(std::make_unique<uhd::rfnoc::detail::graph_t>()) {
_mb_controllers.reserve(_num_mboards);
// Now initialize all subsystems:
+ _init_io_srv_mgr(dev_addr); // Global I/O Service Manager
_init_mb_controllers();
_init_gsm(); // Graph Stream Manager
try {
@@ -252,11 +254,12 @@ public:
pyld_fmt,
mdata_fmt,
adapter_id,
- rfnoc_streamer->get_stream_args().args);
+ rfnoc_streamer->get_stream_args().args,
+ rfnoc_streamer->get_unique_id());
rfnoc_streamer->connect_channel(strm_port, std::move(xport));
- //// If this worked, then also connect the streamer in the BGL graph
+ // If this worked, then also connect the streamer in the BGL graph
auto dst = get_block(dst_blk);
graph_edge_t edge_info(strm_port, dst_port, graph_edge_t::TX_STREAM, true);
_graph->connect(rfnoc_streamer.get(), dst.get(), edge_info);
@@ -308,7 +311,8 @@ public:
pyld_fmt,
mdata_fmt,
adapter_id,
- rfnoc_streamer->get_stream_args().args);
+ rfnoc_streamer->get_stream_args().args,
+ rfnoc_streamer->get_unique_id());
rfnoc_streamer->connect_channel(strm_port, std::move(xport));
@@ -457,6 +461,14 @@ private:
/**************************************************************************
* Device Setup
*************************************************************************/
+ void _init_io_srv_mgr(const uhd::device_addr_t& dev_addr)
+ {
+ _io_srv_mgr = usrp::io_service_mgr::make(dev_addr);
+ for (size_t mb_idx = 0; mb_idx < _num_mboards; mb_idx++) {
+ _device->get_mb_iface(mb_idx).set_io_srv_mgr(_io_srv_mgr);
+ }
+ }
+
void _init_mb_controllers()
{
UHD_LOG_TRACE(LOG_ID, "Initializing MB controllers...");
@@ -834,6 +846,9 @@ private:
// easy lookups.
size_t _num_mboards;
+ //! Reference to the global I/O Service Manager
+ uhd::usrp::io_service_mgr::sptr _io_srv_mgr;
+
//! Registry for the blocks (it's a separate class)
std::unique_ptr<detail::block_container_t> _block_registry;
diff --git a/host/lib/transport/inline_io_service.cpp b/host/lib/transport/inline_io_service.cpp
index 9dd0814ca..93967e09a 100644
--- a/host/lib/transport/inline_io_service.cpp
+++ b/host/lib/transport/inline_io_service.cpp
@@ -272,10 +272,19 @@ void inline_io_service::attach_recv_link(recv_link_if::sptr link)
{
auto link_ptr = link.get();
UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) == 0);
- _recv_tbl[link_ptr] =
- std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr);
+ _recv_tbl[link_ptr] = std::tuple<inline_recv_mux*, inline_recv_cb*>(nullptr, nullptr);
_recv_links.push_back(link);
-};
+}
+
+void inline_io_service::detach_recv_link(recv_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
+ _recv_tbl.erase(link_ptr);
+
+ _recv_links.remove_if(
+ [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; });
+}
recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_link,
size_t num_recv_frames,
@@ -301,9 +310,17 @@ recv_io_if::sptr inline_io_service::make_recv_client(recv_link_if::sptr data_lin
void inline_io_service::attach_send_link(send_link_if::sptr link)
{
- UHD_ASSERT_THROW(std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end());
+ UHD_ASSERT_THROW(
+ std::find(_send_links.begin(), _send_links.end(), link) == _send_links.end());
_send_links.push_back(link);
-};
+}
+
+void inline_io_service::detach_send_link(send_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ _send_links.remove_if(
+ [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; });
+}
send_io_if::sptr inline_io_service::make_send_client(send_link_if::sptr send_link,
size_t num_send_frames,
@@ -365,8 +382,7 @@ void inline_io_service::connect_receiver(
_recv_tbl[link] = std::make_tuple(mux, rcvr);
}
-void inline_io_service::disconnect_receiver(
- recv_link_if* link, inline_recv_cb* cb)
+void inline_io_service::disconnect_receiver(recv_link_if* link, inline_recv_cb* cb)
{
inline_recv_mux* mux;
inline_recv_cb* rcvr;
diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp
index ed28a93f9..012c86868 100644
--- a/host/lib/transport/offload_io_service.cpp
+++ b/host/lib/transport/offload_io_service.cpp
@@ -54,6 +54,20 @@ public:
_send_tbl[send_link.get()] = 0;
}
+ void unregister_link(const recv_link_if::sptr& recv_link)
+ {
+ auto link_ptr = recv_link.get();
+ UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0);
+ _recv_tbl.erase(link_ptr);
+ }
+
+ void unregister_link(const send_link_if::sptr& send_link)
+ {
+ auto link_ptr = send_link.get();
+ UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0);
+ _send_tbl.erase(link_ptr);
+ }
+
void reserve_frames(const frame_reservation_t& reservation)
{
if (reservation.recv_link) {
@@ -358,6 +372,9 @@ public:
void attach_recv_link(recv_link_if::sptr link);
void attach_send_link(send_link_if::sptr link);
+ void detach_recv_link(recv_link_if::sptr link);
+ void detach_send_link(send_link_if::sptr link);
+
recv_io_if::sptr make_recv_client(recv_link_if::sptr recv_link,
size_t num_recv_frames,
recv_callback_t cb,
@@ -400,6 +417,7 @@ private:
frame_reservation_t frames_reserved;
};
+ void _queue_client_req(std::function<void()> fn);
void _get_recv_buff(recv_client_info_t& info, int32_t timeout_ms);
void _get_send_buff(send_client_info_t& info);
void _release_recv_buff(recv_client_info_t& info, frame_buff* buff);
@@ -661,12 +679,7 @@ void offload_io_service_impl::attach_recv_link(recv_link_if::sptr link)
_io_srv->attach_recv_link(link);
};
- client_req_t queue_element;
- queue_element.req = {new std::function<void()>(req_fn)};
- const bool success = _client_connect_queue.push(queue_element);
- if (!success) {
- throw uhd::runtime_error("Failed to push attach_recv_link request");
- }
+ _queue_client_req(req_fn);
}
void offload_io_service_impl::attach_send_link(send_link_if::sptr link)
@@ -685,6 +698,28 @@ void offload_io_service_impl::attach_send_link(send_link_if::sptr link)
}
}
+void offload_io_service_impl::detach_recv_link(recv_link_if::sptr link)
+{
+ // Create a request to detach link in the offload thread
+ auto req_fn = [this, link]() {
+ _reservation_mgr.unregister_link(link);
+ _io_srv->detach_recv_link(link);
+ };
+
+ _queue_client_req(req_fn);
+}
+
+void offload_io_service_impl::detach_send_link(send_link_if::sptr link)
+{
+ // Create a request to detach link in the offload thread
+ auto req_fn = [this, link]() {
+ _reservation_mgr.unregister_link(link);
+ _io_srv->detach_send_link(link);
+ };
+
+ _queue_client_req(req_fn);
+}
+
recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr recv_link,
size_t num_recv_frames,
recv_callback_t cb,
@@ -720,13 +755,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re
port->offload_thread_set_connected(true);
};
- client_req_t queue_element;
- queue_element.req = {new std::function<void()>(req_fn)};
- const bool success = _client_connect_queue.push(queue_element);
- if (!success) {
- throw uhd::runtime_error("Failed to push make_recv_client request");
- }
-
+ _queue_client_req(req_fn);
port->client_wait_until_connected();
// Return a new recv client to the caller that just operates on the queues
@@ -775,13 +804,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
port->offload_thread_set_connected(true);
};
- client_req_t queue_element;
- queue_element.req = {new std::function<void()>(req_fn)};
- const bool success = _client_connect_queue.push(queue_element);
- if (!success) {
- throw uhd::runtime_error("Failed to push make_send_client request");
- }
-
+ _queue_client_req(req_fn);
port->client_wait_until_connected();
// Wait for buffer queue to be full
@@ -794,6 +817,16 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se
shared_from_this(), num_recv_frames, num_send_frames, port);
}
+void offload_io_service_impl::_queue_client_req(std::function<void()> fn)
+{
+ client_req_t queue_element;
+ queue_element.req = {new std::function<void()>(fn)};
+ const bool success = _client_connect_queue.push(queue_element);
+ if (!success) {
+ throw uhd::runtime_error("Failed to queue client request");
+ }
+}
+
// Get a single receive buffer if available and update client info
void offload_io_service_impl::_get_recv_buff(recv_client_info_t& info, int32_t timeout_ms)
{
diff --git a/host/lib/usrp/common/CMakeLists.txt b/host/lib/usrp/common/CMakeLists.txt
index bdc8a5977..e4048fdf7 100644
--- a/host/lib/usrp/common/CMakeLists.txt
+++ b/host/lib/usrp/common/CMakeLists.txt
@@ -33,4 +33,6 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/apply_corrections.cpp
${CMAKE_CURRENT_SOURCE_DIR}/validate_subdev_spec.cpp
${CMAKE_CURRENT_SOURCE_DIR}/recv_packet_demuxer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/io_service_mgr.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/io_service_args.cpp
)
diff --git a/host/lib/usrp/common/io_service_args.cpp b/host/lib/usrp/common/io_service_args.cpp
new file mode 100644
index 000000000..09af74f36
--- /dev/null
+++ b/host/lib/usrp/common/io_service_args.cpp
@@ -0,0 +1,101 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/usrp/common/io_service_args.hpp>
+#include <uhdlib/usrp/constrained_device_args.hpp>
+#include <string>
+
+static const std::string LOG_ID = "IO_SRV";
+static const size_t MAX_NUM_XPORT_ADAPTERS = 2;
+
+namespace uhd { namespace usrp {
+
+namespace {
+
+bool get_bool_arg(const device_addr_t& args, const std::string& key, const bool def)
+{
+ constrained_device_args_t::bool_arg arg(key, def);
+ if (args.has_key(key)) {
+ arg.parse(args[key]);
+ }
+ return arg.get();
+}
+
+io_service_args_t::wait_mode_t get_wait_mode_arg(const device_addr_t& args,
+ const std::string& key,
+ const io_service_args_t::wait_mode_t def)
+{
+ constrained_device_args_t::enum_arg<io_service_args_t::wait_mode_t> arg(key,
+ def,
+ {{"poll", io_service_args_t::POLL}, {"block", io_service_args_t::BLOCK}});
+
+ if (args.has_key(key)) {
+ arg.parse(args[key]);
+ }
+ return arg.get();
+}
+
+}; // namespace
+
+io_service_args_t read_io_service_args(
+ const device_addr_t& args, const io_service_args_t& defaults)
+{
+ io_service_args_t io_srv_args;
+ std::string tmp_str, default_str;
+
+ io_srv_args.recv_offload = get_bool_arg(args, "recv_offload", defaults.recv_offload);
+ io_srv_args.send_offload = get_bool_arg(args, "send_offload", defaults.send_offload);
+
+ io_srv_args.recv_offload_wait_mode = get_wait_mode_arg(
+ args, "recv_offload_wait_mode", defaults.recv_offload_wait_mode);
+ io_srv_args.send_offload_wait_mode = get_wait_mode_arg(
+ args, "send_offload_wait_mode", defaults.send_offload_wait_mode);
+
+ io_srv_args.num_poll_offload_threads =
+ args.cast<size_t>("num_poll_offload_threads", defaults.num_poll_offload_threads);
+ if (io_srv_args.num_poll_offload_threads == 0) {
+ UHD_LOG_WARNING(LOG_ID,
+ "Invalid value for num_poll_offload_threads. "
+ "Value must be greater than 0.");
+ io_srv_args.num_poll_offload_threads = 1;
+ }
+
+ auto create_key = [](const std::string& base, size_t index) {
+ return base + "_" + std::to_string(index);
+ };
+
+ for (size_t i = 0; i < MAX_NUM_XPORT_ADAPTERS; i++) {
+ std::string key = create_key("recv_offload_thread_cpu", i);
+ if (args.has_key(key)) {
+ io_srv_args.recv_offload_thread_cpu.push_back(args.cast<size_t>(key, 0));
+ } else {
+ io_srv_args.recv_offload_thread_cpu.push_back({});
+ }
+ }
+
+ for (size_t i = 0; i < MAX_NUM_XPORT_ADAPTERS; i++) {
+ std::string key = create_key("send_offload_thread_cpu", i);
+ if (args.has_key(key)) {
+ io_srv_args.send_offload_thread_cpu.push_back(args.cast<size_t>(key, 0));
+ } else {
+ io_srv_args.send_offload_thread_cpu.push_back({});
+ }
+ }
+
+ for (size_t i = 0; i < io_srv_args.num_poll_offload_threads; i++) {
+ std::string key = create_key("poll_offload_thread_cpu", i);
+ if (args.has_key(key)) {
+ io_srv_args.poll_offload_thread_cpu.push_back(args.cast<size_t>(key, 0));
+ } else {
+ io_srv_args.poll_offload_thread_cpu.push_back({});
+ }
+ }
+
+ return io_srv_args;
+}
+
+}} // namespace uhd::usrp
diff --git a/host/lib/usrp/common/io_service_mgr.cpp b/host/lib/usrp/common/io_service_mgr.cpp
new file mode 100644
index 000000000..bf55ed228
--- /dev/null
+++ b/host/lib/usrp/common/io_service_mgr.cpp
@@ -0,0 +1,511 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/config.hpp>
+#include <uhd/transport/adapter_id.hpp>
+#include <uhd/utils/algorithm.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/inline_io_service.hpp>
+#include <uhdlib/transport/offload_io_service.hpp>
+#include <uhdlib/usrp/common/io_service_mgr.hpp>
+#include <map>
+#include <vector>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+static const std::string LOG_ID = "IO_SRV";
+
+namespace uhd { namespace usrp {
+
+/* Inline I/O service manager
+ *
+ * I/O service manager for inline I/O services. Creates a new inline_io_service
+ * for every new pair of links, unless they are already attached to an I/O
+ * service (muxed links).
+ */
+class inline_io_service_mgr : public io_service_mgr
+{
+public:
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct link_info_t
+ {
+ io_service::sptr io_srv;
+ size_t mux_ref_count;
+ };
+
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+ std::map<link_pair_t, link_info_t> _link_info_map;
+};
+
+io_service::sptr inline_io_service_mgr::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t /*link_type*/,
+ const io_service_args_t& /*args*/,
+ const std::string& /*streamer_id*/)
+{
+ // Check if links are already connected
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+
+ if (it != _link_info_map.end()) {
+ // Muxing links, add to mux ref count
+ it->second.mux_ref_count++;
+ return it->second.io_srv;
+ }
+
+ // Links are not muxed, create a new inline I/O service
+ auto io_srv = inline_io_service::make();
+
+ if (recv_link) {
+ io_srv->attach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->attach_send_link(send_link);
+ }
+
+ _link_info_map[links] = {io_srv, 1};
+ return io_srv;
+}
+
+void inline_io_service_mgr::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+ UHD_ASSERT_THROW(it != _link_info_map.end());
+
+ it->second.mux_ref_count--;
+ if (it->second.mux_ref_count == 0) {
+ if (recv_link) {
+ it->second.io_srv->detach_recv_link(recv_link);
+ }
+ if (send_link) {
+ it->second.io_srv->detach_send_link(send_link);
+ }
+
+ _link_info_map.erase(it);
+ }
+}
+
+/* Blocking I/O service manager
+ *
+ * I/O service manager for offload I/O services configured to block. This
+ * manager creates one offload I/O service for each transport adapter used by
+ * a streamer. If there are multiple streamers, this manager creates a separate
+ * set of I/O services for each streamer.
+ */
+class blocking_io_service_mgr : public io_service_mgr
+{
+public:
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct link_info_t
+ {
+ std::string streamer_id;
+ adapter_id_t adapter_id;
+ };
+ struct streamer_info_t
+ {
+ adapter_id_t adapter_id;
+ io_service::sptr io_srv;
+ size_t connection_count;
+ };
+ using streamer_map_key_t = std::pair<std::string, adapter_id_t>;
+
+ io_service::sptr _create_new_io_service(const io_service_args_t& args,
+ const link_type_t link_type,
+ const size_t thread_index);
+
+ // Map of links to streamer, so we can look up an I/O service from links
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+ std::map<link_pair_t, link_info_t> _link_info_map;
+
+ // Map of streamer to its I/O services
+ std::map<std::string, std::vector<streamer_info_t>> _streamer_info_map;
+};
+
+io_service::sptr blocking_io_service_mgr::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id)
+{
+ UHD_ASSERT_THROW(
+ link_type == link_type_t::RX_DATA || link_type == link_type_t::TX_DATA);
+
+ auto adapter_id = (link_type == link_type_t::RX_DATA)
+ ? recv_link->get_recv_adapter_id()
+ : send_link->get_send_adapter_id();
+
+ link_pair_t links = {recv_link, send_link};
+ if (_link_info_map.find(links) != _link_info_map.end()) {
+ throw uhd::runtime_error("Block option on offload thread is not "
+ "supported when the transport multiplexes links.");
+ }
+
+ // If this streamer doesn't have an entry, create one
+ if (_streamer_info_map.count(streamer_id) == 0) {
+ _streamer_info_map[streamer_id] = {};
+ _link_info_map[links] = {streamer_id, adapter_id};
+ }
+
+ // Look for whether this streamer already has an I/O service for the same
+ // adapter. If it does, then use it, otherwise create a new one.
+ io_service::sptr io_srv;
+ auto& info_vtr = _streamer_info_map.at(streamer_id);
+ auto it = std::find_if(
+ info_vtr.begin(), info_vtr.end(), [adapter_id](const streamer_info_t& info) {
+ return adapter_id == info.adapter_id;
+ });
+
+ if (it == info_vtr.end()) {
+ const size_t new_thread_index = info_vtr.size();
+ io_srv = _create_new_io_service(args, link_type, new_thread_index);
+ info_vtr.push_back({adapter_id, io_srv, 1 /*connection_count*/});
+ } else {
+ it->connection_count++;
+ io_srv = it->io_srv;
+ }
+
+ if (recv_link) {
+ io_srv->attach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->attach_send_link(send_link);
+ }
+
+ return io_srv;
+}
+
+void blocking_io_service_mgr::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ const link_pair_t links{recv_link, send_link};
+ auto link_info = _link_info_map.at(links);
+
+ // Find the streamer_info using the streamer_id and adapter_id in link_info
+ auto& info_vtr = _streamer_info_map.at(link_info.streamer_id);
+ auto it = std::find_if(info_vtr.begin(),
+ info_vtr.end(),
+ [adapter_id = link_info.adapter_id](
+ const streamer_info_t& info) { return adapter_id == info.adapter_id; });
+
+ UHD_ASSERT_THROW(it != info_vtr.end());
+
+ // Detach links and decrement the connection count in streamer_info
+ if (recv_link) {
+ it->io_srv->detach_recv_link(recv_link);
+ }
+ if (send_link) {
+ it->io_srv->detach_send_link(send_link);
+ }
+
+ it->connection_count--;
+ if (it->connection_count == 0) {
+ it->io_srv.reset();
+ }
+
+ // If all I/O services in the streamers are disconnected, clean up all its info
+ bool still_in_use = false;
+ for (auto info : info_vtr) {
+ still_in_use |= bool(info.io_srv);
+ }
+
+ if (!still_in_use) {
+ _streamer_info_map.erase(link_info.streamer_id);
+ }
+
+ // These links should no longer be connected to any I/O service
+ _link_info_map.erase(links);
+}
+
+io_service::sptr blocking_io_service_mgr::_create_new_io_service(
+ const io_service_args_t& args, const link_type_t link_type, const size_t thread_index)
+{
+ offload_io_service::params_t params;
+ params.wait_mode = offload_io_service::BLOCK;
+ params.client_type = (link_type == link_type_t::RX_DATA)
+ ? offload_io_service::RECV_ONLY
+ : offload_io_service::SEND_ONLY;
+
+ const auto& cpu_vtr = (link_type == link_type_t::RX_DATA)
+ ? args.recv_offload_thread_cpu
+ : args.send_offload_thread_cpu;
+
+ std::string cpu_affinity_str;
+ if (cpu_vtr.size() > thread_index && cpu_vtr[thread_index]) {
+ const size_t cpu = *cpu_vtr[thread_index];
+ params.cpu_affinity_list = {cpu};
+ cpu_affinity_str = ", cpu affinity: " + std::to_string(cpu);
+ } else {
+ cpu_affinity_str = ", cpu affinity: none";
+ }
+
+ std::string link_type_str = (link_type == link_type_t::RX_DATA) ? "RX data"
+ : "TX data";
+
+ UHD_LOG_INFO(LOG_ID,
+ "Creating new blocking I/O service for " << link_type_str << cpu_affinity_str);
+
+ return offload_io_service::make(inline_io_service::make(), params);
+}
+
+/* Polling I/O service manager
+ *
+ * I/O service manager for offload I/O services configured to poll. Creates the
+ * number of I/O services specified by the user in stream_args, and distributes
+ * links among them. New connections always go to the offload thread containing
+ * the fewest connections, with lowest numbered thread as a second criterion.
+ */
+class polling_io_service_mgr : public io_service_mgr
+{
+public:
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct link_info_t
+ {
+ io_service::sptr io_srv;
+ size_t mux_ref_count;
+ };
+ struct io_srv_info_t
+ {
+ size_t connection_count;
+ };
+
+ io_service::sptr _create_new_io_service(
+ const io_service_args_t& args, const size_t thread_index);
+
+ // Map of links to I/O service
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+ std::map<link_pair_t, link_info_t> _link_info_map;
+
+ // For each I/O service, keep track of the number of connections
+ std::map<io_service::sptr, io_srv_info_t> _io_srv_info_map;
+};
+
+io_service::sptr polling_io_service_mgr::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t /*link_type*/,
+ const io_service_args_t& args,
+ const std::string& /*streamer_id*/)
+{
+ // Check if links are already connected
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+ if (it != _link_info_map.end()) {
+ // Muxing links, add to mux ref count and connection count
+ it->second.mux_ref_count++;
+ _io_srv_info_map[it->second.io_srv].connection_count++;
+ return it->second.io_srv;
+ }
+
+ // Links are not muxed. If there are fewer offload threads than requested in
+ // the args, create a new service and add the links to it. Otherwise, add it
+ // to the service that has the fewest connections.
+ io_service::sptr io_srv;
+ if (_io_srv_info_map.size() < args.num_poll_offload_threads) {
+ const size_t thread_index = _io_srv_info_map.size();
+ io_srv = _create_new_io_service(args, thread_index);
+ _link_info_map[links] = {io_srv, 1 /*mux_ref_count*/};
+ _io_srv_info_map[io_srv] = {1 /*connection_count*/};
+ } else {
+ using map_pair_t = std::pair<io_service::sptr, io_srv_info_t>;
+ auto cmp = [](const map_pair_t& left, const map_pair_t& right) {
+ return left.second.connection_count < right.second.connection_count;
+ };
+
+ auto it = std::min_element(_io_srv_info_map.begin(), _io_srv_info_map.end(), cmp);
+ UHD_ASSERT_THROW(it != _io_srv_info_map.end());
+ io_srv = it->first;
+ _io_srv_info_map[io_srv].connection_count++;
+ }
+
+ if (recv_link) {
+ io_srv->attach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->attach_send_link(send_link);
+ }
+ return io_srv;
+}
+
+void polling_io_service_mgr::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+ UHD_ASSERT_THROW(it != _link_info_map.end());
+
+ auto io_srv = it->second.io_srv;
+ it->second.mux_ref_count--;
+
+ if (it->second.mux_ref_count == 0) {
+ if (recv_link) {
+ io_srv->detach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->detach_send_link(send_link);
+ }
+
+ _link_info_map.erase(it);
+ _io_srv_info_map.erase(io_srv);
+ }
+}
+
+io_service::sptr polling_io_service_mgr::_create_new_io_service(
+ const io_service_args_t& args, const size_t thread_index)
+{
+ offload_io_service::params_t params;
+ params.client_type = offload_io_service::BOTH_SEND_AND_RECV;
+ params.wait_mode = offload_io_service::POLL;
+
+ const auto& cpu_vtr = args.poll_offload_thread_cpu;
+
+ std::string cpu_affinity_str;
+ if (cpu_vtr.size() > thread_index && cpu_vtr[thread_index]) {
+ const size_t cpu = *cpu_vtr[thread_index];
+ params.cpu_affinity_list = {cpu};
+ cpu_affinity_str = ", cpu affinity: " + std::to_string(cpu);
+ } else {
+ cpu_affinity_str = ", cpu affinity: none";
+ }
+
+ UHD_LOG_INFO(LOG_ID, "Creating new polling I/O service" << cpu_affinity_str);
+
+ return offload_io_service::make(inline_io_service::make(), params);
+}
+
+/* Main I/O service manager implementation class
+ *
+ * Composite I/O service manager that dispatches requests to other managers,
+ * based on transport args and link type.
+ */
+class io_service_mgr_impl : public io_service_mgr
+{
+public:
+ io_service_mgr_impl(const uhd::device_addr_t& args) : _args(args) {}
+
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct xport_args_t
+ {
+ bool offload = false;
+ offload_io_service::wait_mode_t wait_mode = offload_io_service::BLOCK;
+ };
+ struct link_info_t
+ {
+ io_service::sptr io_srv;
+ io_service_mgr* mgr = nullptr;
+ };
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+
+ const uhd::device_addr_t _args;
+
+ inline_io_service_mgr _inline_io_srv_mgr;
+ blocking_io_service_mgr _blocking_io_srv_mgr;
+ polling_io_service_mgr _polling_io_srv_mgr;
+
+ // Map of links to I/O service
+ std::map<link_pair_t, link_info_t> _link_info_map;
+};
+
+io_service_mgr::sptr io_service_mgr::make(const uhd::device_addr_t& args)
+{
+ return std::make_shared<io_service_mgr_impl>(args);
+}
+
+io_service::sptr io_service_mgr_impl::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id)
+{
+ UHD_ASSERT_THROW(link_type != link_type_t::ASYNC_MSG);
+
+ // Check if the links are already attached to an I/O service. If they are,
+ // then use the same manager to connect, since links can only be connected
+ // to one I/O service at any given a time.
+ link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+
+ io_service::sptr io_srv;
+ io_service_mgr* mgr = nullptr;
+
+ if (it != _link_info_map.end()) {
+ io_srv = it->second.io_srv;
+ mgr = it->second.mgr;
+ } else {
+ // Links not already attached, pick an io_service_mgr to connect based
+ // on user parameters and connect them.
+ if (link_type == link_type_t::CTRL) {
+ mgr = &_inline_io_srv_mgr;
+ } else {
+ bool offload = (link_type == link_type_t::RX_DATA) ? args.recv_offload
+ : args.send_offload;
+ auto wait_mode = (link_type == link_type_t::RX_DATA)
+ ? args.recv_offload_wait_mode
+ : args.send_offload_wait_mode;
+
+ if (offload) {
+ if (wait_mode == io_service_args_t::POLL) {
+ mgr = &_polling_io_srv_mgr;
+ } else {
+ mgr = &_blocking_io_srv_mgr;
+ }
+ } else {
+ mgr = &_inline_io_srv_mgr;
+ }
+ }
+ }
+
+ io_srv = mgr->connect_links(recv_link, send_link, link_type, args, streamer_id);
+
+ _link_info_map[links] = {io_srv, mgr};
+ return io_srv;
+}
+
+void io_service_mgr_impl::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+
+ UHD_ASSERT_THROW(it != _link_info_map.end());
+ it->second.mgr->disconnect_links(recv_link, send_link);
+ _link_info_map.erase(it);
+}
+
+}} // namespace uhd::usrp
diff --git a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp
index 0e651a996..a87a9cada 100644
--- a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp
+++ b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp
@@ -7,11 +7,13 @@
#include "mpmd_link_if_ctrl_udp.hpp"
#include "mpmd_impl.hpp"
#include "mpmd_link_if_mgr.hpp"
+#include <uhd/rfnoc/constants.hpp>
#include <uhd/transport/udp_constants.hpp>
#include <uhd/transport/udp_simple.hpp>
#include <uhd/transport/udp_zero_copy.hpp>
-#include <uhdlib/transport/inline_io_service.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/transport/udp_boost_asio_link.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <uhdlib/utils/narrow.hpp>
#include <string>
@@ -26,9 +28,8 @@ namespace {
//! Maximum CHDR packet size in bytes
const size_t MPMD_10GE_DATA_FRAME_MAX_SIZE = 8000;
-
-//! Maximum CHDR packet size in bytes
-const size_t MPMD_10GE_ASYNCMSG_FRAME_MAX_SIZE = 1472;
+const size_t MPMD_1GE_DATA_FRAME_MAX_SIZE = 1472;
+const size_t MPMD_1GE_ASYNCMSG_FRAME_MAX_SIZE = 1472;
//! Number of send/recv frames
const size_t MPMD_ETH_NUM_FRAMES = 32;
@@ -194,8 +195,6 @@ size_t discover_mtu(const std::string& address,
mpmd_link_if_ctrl_udp::mpmd_link_if_ctrl_udp(const uhd::device_addr_t& mb_args,
const mpmd_link_if_mgr::xport_info_list_t& xport_info)
: _mb_args(mb_args)
- , _recv_args(filter_args(mb_args, "recv"))
- , _send_args(filter_args(mb_args, "send"))
, _udp_info(get_udp_info_from_xport_info(xport_info))
, _mtu(MPMD_10GE_DATA_FRAME_MAX_SIZE)
{
@@ -228,36 +227,52 @@ mpmd_link_if_ctrl_udp::mpmd_link_if_ctrl_udp(const uhd::device_addr_t& mb_args,
* API
*****************************************************************************/
uhd::transport::both_links_t mpmd_link_if_ctrl_udp::get_link(const size_t link_idx,
- const uhd::transport::link_type_t /*link_type*/,
- const uhd::device_addr_t& /*link_args*/)
+ const uhd::transport::link_type_t link_type,
+ const uhd::device_addr_t& link_args)
{
UHD_ASSERT_THROW(link_idx < _available_addrs.size());
const std::string ip_addr = _available_addrs.at(link_idx);
const std::string udp_port = _udp_info.at(ip_addr).udp_port;
- /* FIXME: Should have common infrastructure for creating I/O services */
- auto io_srv = uhd::transport::inline_io_service::make();
- link_params_t link_params;
- link_params.num_recv_frames = MPMD_ETH_NUM_FRAMES; // FIXME
- link_params.num_send_frames = MPMD_ETH_NUM_FRAMES; // FIXME
- link_params.recv_frame_size = get_mtu(uhd::RX_DIRECTION); // FIXME
- link_params.send_frame_size = get_mtu(uhd::TX_DIRECTION); // FIXME
- link_params.recv_buff_size = MPMD_BUFFER_DEPTH * MAX_RATE_10GIGE; // FIXME
- link_params.send_buff_size = MPMD_BUFFER_DEPTH * MAX_RATE_10GIGE; // FIXME
- auto link = uhd::transport::udp_boost_asio_link::make(ip_addr,
+ const size_t link_rate = get_link_rate(link_idx);
+ link_params_t default_link_params;
+ default_link_params.num_send_frames = MPMD_ETH_NUM_FRAMES;
+ default_link_params.num_recv_frames = MPMD_ETH_NUM_FRAMES;
+ default_link_params.send_frame_size = (link_rate == MAX_RATE_10GIGE)
+ ? MPMD_10GE_DATA_FRAME_MAX_SIZE
+ : (link_rate == MAX_RATE_1GIGE)
+ ? MPMD_1GE_DATA_FRAME_MAX_SIZE
+ : get_mtu(uhd::TX_DIRECTION);
+ default_link_params.recv_frame_size = (link_rate == MAX_RATE_10GIGE)
+ ? MPMD_10GE_DATA_FRAME_MAX_SIZE
+ : (link_rate == MAX_RATE_1GIGE)
+ ? MPMD_1GE_DATA_FRAME_MAX_SIZE
+ : get_mtu(uhd::RX_DIRECTION);
+ default_link_params.send_buff_size = get_link_rate(link_idx) * MPMD_BUFFER_DEPTH;
+ default_link_params.recv_buff_size = get_link_rate(link_idx) * MPMD_BUFFER_DEPTH;
+
+ link_params_t link_params = calculate_udp_link_params(link_type,
+ get_mtu(uhd::TX_DIRECTION),
+ get_mtu(uhd::RX_DIRECTION),
+ default_link_params,
+ _mb_args,
+ link_args);
+
+ // Enforce a minimum bound of the number of receive and send frames.
+ link_params.num_send_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_send_frames);
+ link_params.num_recv_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_recv_frames);
+
+ auto link = uhd::transport::udp_boost_asio_link::make(ip_addr,
udp_port,
link_params,
- link_params.recv_buff_size, // FIXME
- link_params.send_buff_size); // FIXME
- io_srv->attach_send_link(link);
- io_srv->attach_recv_link(link);
- return std::tuple<io_service::sptr,
- send_link_if::sptr,
+ link_params.recv_buff_size,
+ link_params.send_buff_size);
+ return std::tuple<send_link_if::sptr,
size_t,
recv_link_if::sptr,
size_t,
bool>(
- io_srv, link, link_params.send_buff_size, link, link_params.recv_buff_size, true);
+ link, link_params.send_buff_size, link, link_params.recv_buff_size, true);
}
size_t mpmd_link_if_ctrl_udp::get_num_links() const
@@ -277,3 +292,4 @@ mpmd_link_if_ctrl_udp::get_packet_factory() const
{
return _pkt_factory;
}
+
diff --git a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp
index 4c8ecade7..33db83b47 100644
--- a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp
+++ b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp
@@ -45,8 +45,6 @@ public:
private:
const uhd::device_addr_t _mb_args;
- const uhd::dict<std::string, std::string> _recv_args;
- const uhd::dict<std::string, std::string> _send_args;
//!
udp_link_info_map _udp_info;
//! A list of IP addresses we can connect our CHDR connections to
diff --git a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp
index e713cc7a3..403e53949 100644
--- a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp
+++ b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp
@@ -14,9 +14,21 @@
using namespace uhd::rfnoc;
using namespace uhd::mpmd;
+static uhd::usrp::io_service_args_t get_default_io_srv_args()
+{
+ // TODO: Need better defaults, taking into account the link type and ensuring
+ // that the number of frames is appropriate
+ uhd::usrp::io_service_args_t args;
+ args.recv_offload = false;
+ args.send_offload = false;
+ return args;
+}
+
mpmd_mboard_impl::mpmd_mb_iface::mpmd_mb_iface(
const uhd::device_addr_t& mb_args, uhd::rpc_client::sptr rpc)
- : _mb_args(mb_args), _rpc(rpc), _link_if_mgr(xport::mpmd_link_if_mgr::make(mb_args))
+ : _mb_args(mb_args)
+ , _rpc(rpc)
+ , _link_if_mgr(xport::mpmd_link_if_mgr::make(mb_args))
{
_remote_device_id = allocate_device_id();
UHD_LOG_TRACE("MPMD::MB_IFACE", "Assigning device_id " << _remote_device_id);
@@ -153,16 +165,18 @@ uhd::rfnoc::chdr_ctrl_xport::sptr mpmd_mboard_impl::mpmd_mb_iface::make_ctrl_tra
+ std::to_string(local_device_id));
}
const size_t link_idx = _local_device_id_map.at(local_device_id);
- uhd::transport::io_service::sptr io_srv;
uhd::transport::send_link_if::sptr send_link;
uhd::transport::recv_link_if::sptr recv_link;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, std::ignore) =
+ std::tie(send_link, std::ignore, recv_link, std::ignore, std::ignore) =
_link_if_mgr->get_link(
link_idx, uhd::transport::link_type_t::CTRL, uhd::device_addr_t());
/* Associate local device ID with the adapter */
_adapter_map[local_device_id] = send_link->get_send_adapter_id();
+ auto io_srv = get_io_srv_mgr()->connect_links(
+ recv_link, send_link, transport::link_type_t::CTRL);
+
auto pkt_factory = _link_if_mgr->get_packet_factory(link_idx);
auto xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
send_link,
@@ -181,7 +195,8 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.second;
@@ -192,12 +207,11 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
}
const size_t link_idx = _local_device_id_map.at(local_sep_addr.first);
- uhd::transport::io_service::sptr io_srv;
uhd::transport::send_link_if::sptr send_link;
uhd::transport::recv_link_if::sptr recv_link;
bool lossy_xport;
size_t recv_buff_size;
- std::tie(io_srv, send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
_link_if_mgr->get_link(
link_idx, uhd::transport::link_type_t::RX_DATA, xport_args);
@@ -217,9 +231,12 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
stream_buff_params_t fc_headroom = {0, 0};
+ auto cfg_io_srv = get_io_srv_mgr()->connect_links(
+ recv_link, send_link, transport::link_type_t::CTRL);
+
// Create the data transport
auto pkt_factory = _link_if_mgr->get_packet_factory(link_idx);
- auto fc_params = chdr_rx_data_xport::configure_sep(io_srv,
+ auto fc_params = chdr_rx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
pkt_factory,
@@ -231,7 +248,18 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
fc_freq,
fc_headroom,
lossy_xport);
- auto rx_xport = std::make_unique<chdr_rx_data_xport>(io_srv,
+
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ transport::link_type_t::RX_DATA,
+ usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
+ auto rx_xport = std::make_unique<chdr_rx_data_xport>(io_srv,
recv_link,
send_link,
pkt_factory,
@@ -249,7 +277,8 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.first;
@@ -260,11 +289,10 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
}
const size_t link_idx = _local_device_id_map.at(local_sep_addr.first);
- uhd::transport::io_service::sptr io_srv;
uhd::transport::send_link_if::sptr send_link;
uhd::transport::recv_link_if::sptr recv_link;
bool lossy_xport;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
_link_if_mgr->get_link(
link_idx, uhd::transport::link_type_t::TX_DATA, xport_args);
@@ -275,8 +303,11 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
const double fc_freq_ratio = 1.0 / 8;
const double fc_headroom_ratio = 0;
+ auto cfg_io_srv = get_io_srv_mgr()->connect_links(
+ recv_link, send_link, transport::link_type_t::CTRL);
+
auto pkt_factory = _link_if_mgr->get_packet_factory(link_idx);
- const auto buff_capacity = chdr_tx_data_xport::configure_sep(io_srv,
+ const auto buff_capacity = chdr_tx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
pkt_factory,
@@ -287,6 +318,16 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
fc_freq_ratio,
fc_headroom_ratio);
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ transport::link_type_t::TX_DATA,
+ usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
// Create the data transport
auto tx_xport = std::make_unique<chdr_tx_data_xport>(io_srv,
recv_link,
@@ -296,6 +337,5 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
send_link->get_num_send_frames(),
buff_capacity);
-
return tx_xport;
}
diff --git a/host/lib/usrp/mpmd/mpmd_mb_iface.hpp b/host/lib/usrp/mpmd/mpmd_mb_iface.hpp
index 4e47dd35a..4e54cfc12 100644
--- a/host/lib/usrp/mpmd/mpmd_mb_iface.hpp
+++ b/host/lib/usrp/mpmd/mpmd_mb_iface.hpp
@@ -10,8 +10,9 @@
#include "mpmd_impl.hpp"
#include "mpmd_link_if_mgr.hpp"
#include <uhdlib/rfnoc/mb_iface.hpp>
-#include <map>
+#include <uhdlib/usrp/common/io_service_mgr.hpp>
#include <unordered_map>
+#include <map>
namespace uhd { namespace mpmd {
@@ -33,7 +34,8 @@ public:
uhd::endianness_t get_endianness(const uhd::rfnoc::device_id_t local_device_id);
uhd::rfnoc::device_id_t get_remote_device_id();
std::vector<uhd::rfnoc::device_id_t> get_local_device_ids();
- uhd::transport::adapter_id_t get_adapter_id(const uhd::rfnoc::device_id_t local_device_id);
+ uhd::transport::adapter_id_t get_adapter_id(
+ const uhd::rfnoc::device_id_t local_device_id);
void reset_network();
uhd::rfnoc::clock_iface::sptr get_clock_iface(const std::string& clock_name);
uhd::rfnoc::chdr_ctrl_xport::sptr make_ctrl_transport(
@@ -44,14 +46,16 @@ public:
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
uhd::rfnoc::chdr_tx_data_xport::uptr make_tx_data_transport(
uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const uhd::rfnoc::sep_addr_pair_t& addrs,
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
private:
uhd::device_addr_t _mb_args;
@@ -59,8 +63,10 @@ private:
xport::mpmd_link_if_mgr::uptr _link_if_mgr;
uhd::rfnoc::device_id_t _remote_device_id;
std::map<uhd::rfnoc::device_id_t, size_t> _local_device_id_map;
- std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t> _adapter_map;
+ std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t>
+ _adapter_map;
std::map<std::string, uhd::rfnoc::clock_iface::sptr> _clock_ifaces;
+ uhd::usrp::io_service_mgr::sptr _io_srv_mgr;
};
}} /* namespace uhd::mpmd */
diff --git a/host/lib/usrp/x300/x300_eth_mgr.cpp b/host/lib/usrp/x300/x300_eth_mgr.cpp
index 8ff63b050..7177032c6 100644
--- a/host/lib/usrp/x300/x300_eth_mgr.cpp
+++ b/host/lib/usrp/x300/x300_eth_mgr.cpp
@@ -19,8 +19,9 @@
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhdlib/rfnoc/device_id.hpp>
-#include <uhdlib/transport/inline_io_service.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/transport/udp_boost_asio_link.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <uhdlib/usrp/cores/i2c_core_100_wb32.hpp>
//#ifdef HAVE_DPDK
//# include <uhdlib/transport/dpdk_simple.hpp>
@@ -287,53 +288,32 @@ both_links_t eth_manager::get_links(link_type_t link_type,
// Buffering is done in the socket buffers, so size them relative to
// the link rate
- default_buff_args.send_buff_size = conn.link_rate / 50; // 20ms
- default_buff_args.recv_buff_size = std::max(conn.link_rate / 50,
- ETH_MSG_NUM_FRAMES * ETH_MSG_FRAME_SIZE); // enough to hold greater of 20ms or
- // number of msg frames
+ link_params_t default_link_params;
// There is no need for more than 1 send and recv frame since the
// buffering is done in the socket buffers
- default_buff_args.num_send_frames = 1; // or 2?
- default_buff_args.num_recv_frames = 1;
- if (link_type == link_type_t::CTRL) {
- // Increasing number of recv frames here because ctrl_iface uses it
- // to determine how many control packets can be in flight before it
- // must wait for an ACK
- // FIXME this is no longer true, find a good value
- default_buff_args.num_recv_frames = 85; // 256/3
- } else if (link_type == link_type_t::TX_DATA) {
- size_t default_frame_size = conn.link_rate == MAX_RATE_1GIGE
- ? GE_DATA_FRAME_SEND_SIZE
- : XGE_DATA_FRAME_SEND_SIZE;
- default_buff_args.send_frame_size = link_args.cast<size_t>(
- "send_frame_size", std::min(default_frame_size, send_mtu));
- default_buff_args.num_send_frames = link_args.cast<size_t>(
- "num_send_frames", default_buff_args.num_send_frames);
- default_buff_args.send_buff_size = link_args.cast<size_t>(
- "send_buff_size", default_buff_args.send_buff_size);
- } else if (link_type == link_type_t::RX_DATA) {
- size_t default_frame_size = conn.link_rate == MAX_RATE_1GIGE
- ? GE_DATA_FRAME_RECV_SIZE
- : XGE_DATA_FRAME_RECV_SIZE;
- default_buff_args.recv_frame_size = link_args.cast<size_t>(
- "recv_frame_size", std::min(default_frame_size, recv_mtu));
- // set some buffers so the offload thread actually offloads the
- // socket I/O
- default_buff_args.num_recv_frames =
- link_args.cast<size_t>("num_recv_frames", 2);
- default_buff_args.recv_buff_size = link_args.cast<size_t>(
- "recv_buff_size", default_buff_args.recv_buff_size);
- }
+ default_link_params.num_send_frames = 1; // or 2?
+ default_link_params.num_recv_frames = 2;
+ default_link_params.send_frame_size = conn.link_rate == MAX_RATE_1GIGE
+ ? GE_DATA_FRAME_SEND_SIZE
+ : XGE_DATA_FRAME_SEND_SIZE;
+ default_link_params.recv_frame_size = conn.link_rate == MAX_RATE_1GIGE
+ ? GE_DATA_FRAME_RECV_SIZE
+ : XGE_DATA_FRAME_RECV_SIZE;
+ default_link_params.send_buff_size = conn.link_rate / 50;
+ default_link_params.recv_buff_size = std::max(conn.link_rate / 50,
+ ETH_MSG_NUM_FRAMES * ETH_MSG_FRAME_SIZE); // enough to hold greater of 20 ms or
+ // number of msg frames
+
+ link_params_t link_params = calculate_udp_link_params(link_type,
+ get_mtu(uhd::TX_DIRECTION),
+ get_mtu(uhd::RX_DIRECTION),
+ default_link_params,
+ _args.get_orig_args(),
+ link_args);
- /* FIXME: Should have common infrastructure for creating I/O services */
- auto io_srv = uhd::transport::inline_io_service::make();
- link_params_t link_params;
- link_params.num_recv_frames = default_buff_args.num_recv_frames;
- link_params.num_send_frames = default_buff_args.num_send_frames;
- link_params.recv_frame_size = default_buff_args.recv_frame_size;
- link_params.send_frame_size = default_buff_args.send_frame_size;
- link_params.recv_buff_size = default_buff_args.recv_buff_size;
- link_params.send_buff_size = default_buff_args.send_buff_size;
+ // Enforce a minimum bound of the number of receive and send frames.
+ link_params.num_send_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_send_frames);
+ link_params.num_recv_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_recv_frames);
size_t recv_buff_size, send_buff_size;
auto link = uhd::transport::udp_boost_asio_link::make(conn.addr,
@@ -341,9 +321,7 @@ both_links_t eth_manager::get_links(link_type_t link_type,
link_params,
recv_buff_size,
send_buff_size);
- io_srv->attach_send_link(link);
- io_srv->attach_recv_link(link);
- return std::make_tuple(io_srv, link, send_buff_size, link, recv_buff_size, true);
+ return std::make_tuple(link, send_buff_size, link, recv_buff_size, true);
}
/******************************************************************************
diff --git a/host/lib/usrp/x300/x300_impl.hpp b/host/lib/usrp/x300/x300_impl.hpp
index 600d224a5..a3276152a 100644
--- a/host/lib/usrp/x300/x300_impl.hpp
+++ b/host/lib/usrp/x300/x300_impl.hpp
@@ -108,7 +108,8 @@ private:
uhd::endianness_t get_endianness(const uhd::rfnoc::device_id_t local_device_id);
uhd::rfnoc::device_id_t get_remote_device_id();
std::vector<uhd::rfnoc::device_id_t> get_local_device_ids();
- uhd::transport::adapter_id_t get_adapter_id(const uhd::rfnoc::device_id_t local_device_id);
+ uhd::transport::adapter_id_t get_adapter_id(
+ const uhd::rfnoc::device_id_t local_device_id);
void reset_network();
uhd::rfnoc::clock_iface::sptr get_clock_iface(const std::string& clock_name);
uhd::rfnoc::chdr_ctrl_xport::sptr make_ctrl_transport(
@@ -120,18 +121,21 @@ private:
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
uhd::rfnoc::chdr_tx_data_xport::uptr make_tx_data_transport(
uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const uhd::rfnoc::sep_addr_pair_t& addrs,
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
private:
const uhd::rfnoc::device_id_t _remote_dev_id;
- std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t> _adapter_map;
+ std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t>
+ _adapter_map;
uhd::rfnoc::clock_iface::sptr _bus_clk;
uhd::rfnoc::clock_iface::sptr _radio_clk;
uhd::usrp::x300::conn_manager::sptr _conn_mgr;
diff --git a/host/lib/usrp/x300/x300_mb_iface.cpp b/host/lib/usrp/x300/x300_mb_iface.cpp
index 5642ffc98..5ba92f52c 100644
--- a/host/lib/usrp/x300/x300_mb_iface.cpp
+++ b/host/lib/usrp/x300/x300_mb_iface.cpp
@@ -10,6 +10,15 @@
using namespace uhd::rfnoc;
using uhd::transport::link_type_t;
+static uhd::usrp::io_service_args_t get_default_io_srv_args()
+{
+ // TODO: Need better defaults, taking into account the link type and ensuring
+ // that the number of frames is appropriate
+ uhd::usrp::io_service_args_t args;
+ args.recv_offload = false;
+ args.send_offload = false;
+ return args;
+}
x300_impl::x300_mb_iface::x300_mb_iface(uhd::usrp::x300::conn_manager::sptr conn_mgr,
const double radio_clk_freq,
@@ -84,10 +93,12 @@ uhd::rfnoc::clock_iface::sptr x300_impl::x300_mb_iface::get_clock_iface(
uhd::rfnoc::chdr_ctrl_xport::sptr x300_impl::x300_mb_iface::make_ctrl_transport(
uhd::rfnoc::device_id_t local_device_id, const uhd::rfnoc::sep_id_t& local_epid)
{
- uhd::transport::io_service::sptr io_srv;
- uhd::transport::send_link_if::sptr send_link;
- uhd::transport::recv_link_if::sptr recv_link;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, std::ignore) =
+ using namespace uhd::transport;
+
+ send_link_if::sptr send_link;
+ recv_link_if::sptr recv_link;
+ bool lossy_xport;
+ std::tie(send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
_conn_mgr->get_links(link_type_t::CTRL,
local_device_id,
local_epid,
@@ -97,7 +108,10 @@ uhd::rfnoc::chdr_ctrl_xport::sptr x300_impl::x300_mb_iface::make_ctrl_transport(
/* Associate local device ID with the adapter */
_adapter_map[local_device_id] = send_link->get_send_adapter_id();
- auto xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ auto io_srv =
+ get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::CTRL);
+
+ auto xport = chdr_ctrl_xport::make(io_srv,
send_link,
recv_link,
_pkt_factory,
@@ -113,18 +127,20 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
+ using namespace uhd::transport;
+
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.second;
const uhd::rfnoc::sep_id_t remote_epid = epids.first;
const uhd::rfnoc::sep_id_t local_epid = epids.second;
- uhd::transport::io_service::sptr io_srv;
- uhd::transport::send_link_if::sptr send_link;
- uhd::transport::recv_link_if::sptr recv_link;
+ send_link_if::sptr send_link;
+ recv_link_if::sptr recv_link;
size_t recv_buff_size;
bool lossy_xport;
- std::tie(io_srv, send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
_conn_mgr->get_links(link_type_t::RX_DATA,
local_sep_addr.first,
local_epid,
@@ -147,8 +163,10 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran
uhd::rfnoc::stream_buff_params_t fc_headroom = {0, 0};
- // Create the data transport
- auto fc_params = uhd::rfnoc::chdr_rx_data_xport::configure_sep(io_srv,
+ auto cfg_io_srv =
+ get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::CTRL);
+
+ auto fc_params = uhd::rfnoc::chdr_rx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
_pkt_factory,
@@ -161,6 +179,17 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran
fc_headroom,
lossy_xport);
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ link_type_t::RX_DATA,
+ uhd::usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
+ // Create the data transport
auto rx_xport = std::make_unique<uhd::rfnoc::chdr_rx_data_xport>(io_srv,
recv_link,
send_link,
@@ -178,17 +207,19 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
+ using namespace uhd::transport;
+
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.first;
const uhd::rfnoc::sep_id_t remote_epid = epids.second;
const uhd::rfnoc::sep_id_t local_epid = epids.first;
- uhd::transport::io_service::sptr io_srv;
- uhd::transport::send_link_if::sptr send_link;
- uhd::transport::recv_link_if::sptr recv_link;
+ send_link_if::sptr send_link;
+ recv_link_if::sptr recv_link;
bool lossy_xport;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
_conn_mgr->get_links(link_type_t::TX_DATA,
local_sep_addr.first,
local_epid,
@@ -202,7 +233,10 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran
const double fc_freq_ratio = 1.0 / 8;
const double fc_headroom_ratio = 0;
- const auto buff_capacity = chdr_tx_data_xport::configure_sep(io_srv,
+ auto cfg_io_srv =
+ get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::CTRL);
+
+ const auto buff_capacity = chdr_tx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
_pkt_factory,
@@ -213,6 +247,16 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran
fc_freq_ratio,
fc_headroom_ratio);
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ link_type_t::TX_DATA,
+ uhd::usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
// Create the data transport
auto tx_xport = std::make_unique<chdr_tx_data_xport>(io_srv,
recv_link,