aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/usrp')
-rw-r--r--host/lib/usrp/common/CMakeLists.txt2
-rw-r--r--host/lib/usrp/common/io_service_args.cpp101
-rw-r--r--host/lib/usrp/common/io_service_mgr.cpp511
-rw-r--r--host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp66
-rw-r--r--host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp2
-rw-r--r--host/lib/usrp/mpmd/mpmd_mb_iface.cpp66
-rw-r--r--host/lib/usrp/mpmd/mpmd_mb_iface.hpp16
-rw-r--r--host/lib/usrp/x300/x300_eth_mgr.cpp74
-rw-r--r--host/lib/usrp/x300/x300_impl.hpp12
-rw-r--r--host/lib/usrp/x300/x300_mb_iface.cpp80
10 files changed, 815 insertions, 115 deletions
diff --git a/host/lib/usrp/common/CMakeLists.txt b/host/lib/usrp/common/CMakeLists.txt
index bdc8a5977..e4048fdf7 100644
--- a/host/lib/usrp/common/CMakeLists.txt
+++ b/host/lib/usrp/common/CMakeLists.txt
@@ -33,4 +33,6 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/apply_corrections.cpp
${CMAKE_CURRENT_SOURCE_DIR}/validate_subdev_spec.cpp
${CMAKE_CURRENT_SOURCE_DIR}/recv_packet_demuxer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/io_service_mgr.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/io_service_args.cpp
)
diff --git a/host/lib/usrp/common/io_service_args.cpp b/host/lib/usrp/common/io_service_args.cpp
new file mode 100644
index 000000000..09af74f36
--- /dev/null
+++ b/host/lib/usrp/common/io_service_args.cpp
@@ -0,0 +1,101 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/usrp/common/io_service_args.hpp>
+#include <uhdlib/usrp/constrained_device_args.hpp>
+#include <string>
+
+static const std::string LOG_ID = "IO_SRV";
+static const size_t MAX_NUM_XPORT_ADAPTERS = 2;
+
+namespace uhd { namespace usrp {
+
+namespace {
+
+bool get_bool_arg(const device_addr_t& args, const std::string& key, const bool def)
+{
+ constrained_device_args_t::bool_arg arg(key, def);
+ if (args.has_key(key)) {
+ arg.parse(args[key]);
+ }
+ return arg.get();
+}
+
+io_service_args_t::wait_mode_t get_wait_mode_arg(const device_addr_t& args,
+ const std::string& key,
+ const io_service_args_t::wait_mode_t def)
+{
+ constrained_device_args_t::enum_arg<io_service_args_t::wait_mode_t> arg(key,
+ def,
+ {{"poll", io_service_args_t::POLL}, {"block", io_service_args_t::BLOCK}});
+
+ if (args.has_key(key)) {
+ arg.parse(args[key]);
+ }
+ return arg.get();
+}
+
+}; // namespace
+
+io_service_args_t read_io_service_args(
+ const device_addr_t& args, const io_service_args_t& defaults)
+{
+ io_service_args_t io_srv_args;
+ std::string tmp_str, default_str;
+
+ io_srv_args.recv_offload = get_bool_arg(args, "recv_offload", defaults.recv_offload);
+ io_srv_args.send_offload = get_bool_arg(args, "send_offload", defaults.send_offload);
+
+ io_srv_args.recv_offload_wait_mode = get_wait_mode_arg(
+ args, "recv_offload_wait_mode", defaults.recv_offload_wait_mode);
+ io_srv_args.send_offload_wait_mode = get_wait_mode_arg(
+ args, "send_offload_wait_mode", defaults.send_offload_wait_mode);
+
+ io_srv_args.num_poll_offload_threads =
+ args.cast<size_t>("num_poll_offload_threads", defaults.num_poll_offload_threads);
+ if (io_srv_args.num_poll_offload_threads == 0) {
+ UHD_LOG_WARNING(LOG_ID,
+ "Invalid value for num_poll_offload_threads. "
+ "Value must be greater than 0.");
+ io_srv_args.num_poll_offload_threads = 1;
+ }
+
+ auto create_key = [](const std::string& base, size_t index) {
+ return base + "_" + std::to_string(index);
+ };
+
+ for (size_t i = 0; i < MAX_NUM_XPORT_ADAPTERS; i++) {
+ std::string key = create_key("recv_offload_thread_cpu", i);
+ if (args.has_key(key)) {
+ io_srv_args.recv_offload_thread_cpu.push_back(args.cast<size_t>(key, 0));
+ } else {
+ io_srv_args.recv_offload_thread_cpu.push_back({});
+ }
+ }
+
+ for (size_t i = 0; i < MAX_NUM_XPORT_ADAPTERS; i++) {
+ std::string key = create_key("send_offload_thread_cpu", i);
+ if (args.has_key(key)) {
+ io_srv_args.send_offload_thread_cpu.push_back(args.cast<size_t>(key, 0));
+ } else {
+ io_srv_args.send_offload_thread_cpu.push_back({});
+ }
+ }
+
+ for (size_t i = 0; i < io_srv_args.num_poll_offload_threads; i++) {
+ std::string key = create_key("poll_offload_thread_cpu", i);
+ if (args.has_key(key)) {
+ io_srv_args.poll_offload_thread_cpu.push_back(args.cast<size_t>(key, 0));
+ } else {
+ io_srv_args.poll_offload_thread_cpu.push_back({});
+ }
+ }
+
+ return io_srv_args;
+}
+
+}} // namespace uhd::usrp
diff --git a/host/lib/usrp/common/io_service_mgr.cpp b/host/lib/usrp/common/io_service_mgr.cpp
new file mode 100644
index 000000000..bf55ed228
--- /dev/null
+++ b/host/lib/usrp/common/io_service_mgr.cpp
@@ -0,0 +1,511 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/config.hpp>
+#include <uhd/transport/adapter_id.hpp>
+#include <uhd/utils/algorithm.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/inline_io_service.hpp>
+#include <uhdlib/transport/offload_io_service.hpp>
+#include <uhdlib/usrp/common/io_service_mgr.hpp>
+#include <map>
+#include <vector>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+static const std::string LOG_ID = "IO_SRV";
+
+namespace uhd { namespace usrp {
+
+/* Inline I/O service manager
+ *
+ * I/O service manager for inline I/O services. Creates a new inline_io_service
+ * for every new pair of links, unless they are already attached to an I/O
+ * service (muxed links).
+ */
+class inline_io_service_mgr : public io_service_mgr
+{
+public:
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct link_info_t
+ {
+ io_service::sptr io_srv;
+ size_t mux_ref_count;
+ };
+
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+ std::map<link_pair_t, link_info_t> _link_info_map;
+};
+
+io_service::sptr inline_io_service_mgr::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t /*link_type*/,
+ const io_service_args_t& /*args*/,
+ const std::string& /*streamer_id*/)
+{
+ // Check if links are already connected
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+
+ if (it != _link_info_map.end()) {
+ // Muxing links, add to mux ref count
+ it->second.mux_ref_count++;
+ return it->second.io_srv;
+ }
+
+ // Links are not muxed, create a new inline I/O service
+ auto io_srv = inline_io_service::make();
+
+ if (recv_link) {
+ io_srv->attach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->attach_send_link(send_link);
+ }
+
+ _link_info_map[links] = {io_srv, 1};
+ return io_srv;
+}
+
+void inline_io_service_mgr::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+ UHD_ASSERT_THROW(it != _link_info_map.end());
+
+ it->second.mux_ref_count--;
+ if (it->second.mux_ref_count == 0) {
+ if (recv_link) {
+ it->second.io_srv->detach_recv_link(recv_link);
+ }
+ if (send_link) {
+ it->second.io_srv->detach_send_link(send_link);
+ }
+
+ _link_info_map.erase(it);
+ }
+}
+
+/* Blocking I/O service manager
+ *
+ * I/O service manager for offload I/O services configured to block. This
+ * manager creates one offload I/O service for each transport adapter used by
+ * a streamer. If there are multiple streamers, this manager creates a separate
+ * set of I/O services for each streamer.
+ */
+class blocking_io_service_mgr : public io_service_mgr
+{
+public:
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct link_info_t
+ {
+ std::string streamer_id;
+ adapter_id_t adapter_id;
+ };
+ struct streamer_info_t
+ {
+ adapter_id_t adapter_id;
+ io_service::sptr io_srv;
+ size_t connection_count;
+ };
+ using streamer_map_key_t = std::pair<std::string, adapter_id_t>;
+
+ io_service::sptr _create_new_io_service(const io_service_args_t& args,
+ const link_type_t link_type,
+ const size_t thread_index);
+
+ // Map of links to streamer, so we can look up an I/O service from links
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+ std::map<link_pair_t, link_info_t> _link_info_map;
+
+ // Map of streamer to its I/O services
+ std::map<std::string, std::vector<streamer_info_t>> _streamer_info_map;
+};
+
+io_service::sptr blocking_io_service_mgr::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id)
+{
+ UHD_ASSERT_THROW(
+ link_type == link_type_t::RX_DATA || link_type == link_type_t::TX_DATA);
+
+ auto adapter_id = (link_type == link_type_t::RX_DATA)
+ ? recv_link->get_recv_adapter_id()
+ : send_link->get_send_adapter_id();
+
+ link_pair_t links = {recv_link, send_link};
+ if (_link_info_map.find(links) != _link_info_map.end()) {
+ throw uhd::runtime_error("Block option on offload thread is not "
+ "supported when the transport multiplexes links.");
+ }
+
+ // If this streamer doesn't have an entry, create one
+ if (_streamer_info_map.count(streamer_id) == 0) {
+ _streamer_info_map[streamer_id] = {};
+ _link_info_map[links] = {streamer_id, adapter_id};
+ }
+
+ // Look for whether this streamer already has an I/O service for the same
+ // adapter. If it does, then use it, otherwise create a new one.
+ io_service::sptr io_srv;
+ auto& info_vtr = _streamer_info_map.at(streamer_id);
+ auto it = std::find_if(
+ info_vtr.begin(), info_vtr.end(), [adapter_id](const streamer_info_t& info) {
+ return adapter_id == info.adapter_id;
+ });
+
+ if (it == info_vtr.end()) {
+ const size_t new_thread_index = info_vtr.size();
+ io_srv = _create_new_io_service(args, link_type, new_thread_index);
+ info_vtr.push_back({adapter_id, io_srv, 1 /*connection_count*/});
+ } else {
+ it->connection_count++;
+ io_srv = it->io_srv;
+ }
+
+ if (recv_link) {
+ io_srv->attach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->attach_send_link(send_link);
+ }
+
+ return io_srv;
+}
+
+void blocking_io_service_mgr::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ const link_pair_t links{recv_link, send_link};
+ auto link_info = _link_info_map.at(links);
+
+ // Find the streamer_info using the streamer_id and adapter_id in link_info
+ auto& info_vtr = _streamer_info_map.at(link_info.streamer_id);
+ auto it = std::find_if(info_vtr.begin(),
+ info_vtr.end(),
+ [adapter_id = link_info.adapter_id](
+ const streamer_info_t& info) { return adapter_id == info.adapter_id; });
+
+ UHD_ASSERT_THROW(it != info_vtr.end());
+
+ // Detach links and decrement the connection count in streamer_info
+ if (recv_link) {
+ it->io_srv->detach_recv_link(recv_link);
+ }
+ if (send_link) {
+ it->io_srv->detach_send_link(send_link);
+ }
+
+ it->connection_count--;
+ if (it->connection_count == 0) {
+ it->io_srv.reset();
+ }
+
+ // If all I/O services in the streamers are disconnected, clean up all its info
+ bool still_in_use = false;
+ for (auto info : info_vtr) {
+ still_in_use |= bool(info.io_srv);
+ }
+
+ if (!still_in_use) {
+ _streamer_info_map.erase(link_info.streamer_id);
+ }
+
+ // These links should no longer be connected to any I/O service
+ _link_info_map.erase(links);
+}
+
+io_service::sptr blocking_io_service_mgr::_create_new_io_service(
+ const io_service_args_t& args, const link_type_t link_type, const size_t thread_index)
+{
+ offload_io_service::params_t params;
+ params.wait_mode = offload_io_service::BLOCK;
+ params.client_type = (link_type == link_type_t::RX_DATA)
+ ? offload_io_service::RECV_ONLY
+ : offload_io_service::SEND_ONLY;
+
+ const auto& cpu_vtr = (link_type == link_type_t::RX_DATA)
+ ? args.recv_offload_thread_cpu
+ : args.send_offload_thread_cpu;
+
+ std::string cpu_affinity_str;
+ if (cpu_vtr.size() > thread_index && cpu_vtr[thread_index]) {
+ const size_t cpu = *cpu_vtr[thread_index];
+ params.cpu_affinity_list = {cpu};
+ cpu_affinity_str = ", cpu affinity: " + std::to_string(cpu);
+ } else {
+ cpu_affinity_str = ", cpu affinity: none";
+ }
+
+ std::string link_type_str = (link_type == link_type_t::RX_DATA) ? "RX data"
+ : "TX data";
+
+ UHD_LOG_INFO(LOG_ID,
+ "Creating new blocking I/O service for " << link_type_str << cpu_affinity_str);
+
+ return offload_io_service::make(inline_io_service::make(), params);
+}
+
+/* Polling I/O service manager
+ *
+ * I/O service manager for offload I/O services configured to poll. Creates the
+ * number of I/O services specified by the user in stream_args, and distributes
+ * links among them. New connections always go to the offload thread containing
+ * the fewest connections, with lowest numbered thread as a second criterion.
+ */
+class polling_io_service_mgr : public io_service_mgr
+{
+public:
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct link_info_t
+ {
+ io_service::sptr io_srv;
+ size_t mux_ref_count;
+ };
+ struct io_srv_info_t
+ {
+ size_t connection_count;
+ };
+
+ io_service::sptr _create_new_io_service(
+ const io_service_args_t& args, const size_t thread_index);
+
+ // Map of links to I/O service
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+ std::map<link_pair_t, link_info_t> _link_info_map;
+
+ // For each I/O service, keep track of the number of connections
+ std::map<io_service::sptr, io_srv_info_t> _io_srv_info_map;
+};
+
+io_service::sptr polling_io_service_mgr::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t /*link_type*/,
+ const io_service_args_t& args,
+ const std::string& /*streamer_id*/)
+{
+ // Check if links are already connected
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+ if (it != _link_info_map.end()) {
+ // Muxing links, add to mux ref count and connection count
+ it->second.mux_ref_count++;
+ _io_srv_info_map[it->second.io_srv].connection_count++;
+ return it->second.io_srv;
+ }
+
+ // Links are not muxed. If there are fewer offload threads than requested in
+ // the args, create a new service and add the links to it. Otherwise, add it
+ // to the service that has the fewest connections.
+ io_service::sptr io_srv;
+ if (_io_srv_info_map.size() < args.num_poll_offload_threads) {
+ const size_t thread_index = _io_srv_info_map.size();
+ io_srv = _create_new_io_service(args, thread_index);
+ _link_info_map[links] = {io_srv, 1 /*mux_ref_count*/};
+ _io_srv_info_map[io_srv] = {1 /*connection_count*/};
+ } else {
+ using map_pair_t = std::pair<io_service::sptr, io_srv_info_t>;
+ auto cmp = [](const map_pair_t& left, const map_pair_t& right) {
+ return left.second.connection_count < right.second.connection_count;
+ };
+
+ auto it = std::min_element(_io_srv_info_map.begin(), _io_srv_info_map.end(), cmp);
+ UHD_ASSERT_THROW(it != _io_srv_info_map.end());
+ io_srv = it->first;
+ _io_srv_info_map[io_srv].connection_count++;
+ }
+
+ if (recv_link) {
+ io_srv->attach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->attach_send_link(send_link);
+ }
+ return io_srv;
+}
+
+void polling_io_service_mgr::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ const link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+ UHD_ASSERT_THROW(it != _link_info_map.end());
+
+ auto io_srv = it->second.io_srv;
+ it->second.mux_ref_count--;
+
+ if (it->second.mux_ref_count == 0) {
+ if (recv_link) {
+ io_srv->detach_recv_link(recv_link);
+ }
+ if (send_link) {
+ io_srv->detach_send_link(send_link);
+ }
+
+ _link_info_map.erase(it);
+ _io_srv_info_map.erase(io_srv);
+ }
+}
+
+io_service::sptr polling_io_service_mgr::_create_new_io_service(
+ const io_service_args_t& args, const size_t thread_index)
+{
+ offload_io_service::params_t params;
+ params.client_type = offload_io_service::BOTH_SEND_AND_RECV;
+ params.wait_mode = offload_io_service::POLL;
+
+ const auto& cpu_vtr = args.poll_offload_thread_cpu;
+
+ std::string cpu_affinity_str;
+ if (cpu_vtr.size() > thread_index && cpu_vtr[thread_index]) {
+ const size_t cpu = *cpu_vtr[thread_index];
+ params.cpu_affinity_list = {cpu};
+ cpu_affinity_str = ", cpu affinity: " + std::to_string(cpu);
+ } else {
+ cpu_affinity_str = ", cpu affinity: none";
+ }
+
+ UHD_LOG_INFO(LOG_ID, "Creating new polling I/O service" << cpu_affinity_str);
+
+ return offload_io_service::make(inline_io_service::make(), params);
+}
+
+/* Main I/O service manager implementation class
+ *
+ * Composite I/O service manager that dispatches requests to other managers,
+ * based on transport args and link type.
+ */
+class io_service_mgr_impl : public io_service_mgr
+{
+public:
+ io_service_mgr_impl(const uhd::device_addr_t& args) : _args(args) {}
+
+ io_service::sptr connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id);
+
+ void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link);
+
+private:
+ struct xport_args_t
+ {
+ bool offload = false;
+ offload_io_service::wait_mode_t wait_mode = offload_io_service::BLOCK;
+ };
+ struct link_info_t
+ {
+ io_service::sptr io_srv;
+ io_service_mgr* mgr = nullptr;
+ };
+ using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>;
+
+ const uhd::device_addr_t _args;
+
+ inline_io_service_mgr _inline_io_srv_mgr;
+ blocking_io_service_mgr _blocking_io_srv_mgr;
+ polling_io_service_mgr _polling_io_srv_mgr;
+
+ // Map of links to I/O service
+ std::map<link_pair_t, link_info_t> _link_info_map;
+};
+
+io_service_mgr::sptr io_service_mgr::make(const uhd::device_addr_t& args)
+{
+ return std::make_shared<io_service_mgr_impl>(args);
+}
+
+io_service::sptr io_service_mgr_impl::connect_links(recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const link_type_t link_type,
+ const io_service_args_t& args,
+ const std::string& streamer_id)
+{
+ UHD_ASSERT_THROW(link_type != link_type_t::ASYNC_MSG);
+
+ // Check if the links are already attached to an I/O service. If they are,
+ // then use the same manager to connect, since links can only be connected
+ // to one I/O service at any given a time.
+ link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+
+ io_service::sptr io_srv;
+ io_service_mgr* mgr = nullptr;
+
+ if (it != _link_info_map.end()) {
+ io_srv = it->second.io_srv;
+ mgr = it->second.mgr;
+ } else {
+ // Links not already attached, pick an io_service_mgr to connect based
+ // on user parameters and connect them.
+ if (link_type == link_type_t::CTRL) {
+ mgr = &_inline_io_srv_mgr;
+ } else {
+ bool offload = (link_type == link_type_t::RX_DATA) ? args.recv_offload
+ : args.send_offload;
+ auto wait_mode = (link_type == link_type_t::RX_DATA)
+ ? args.recv_offload_wait_mode
+ : args.send_offload_wait_mode;
+
+ if (offload) {
+ if (wait_mode == io_service_args_t::POLL) {
+ mgr = &_polling_io_srv_mgr;
+ } else {
+ mgr = &_blocking_io_srv_mgr;
+ }
+ } else {
+ mgr = &_inline_io_srv_mgr;
+ }
+ }
+ }
+
+ io_srv = mgr->connect_links(recv_link, send_link, link_type, args, streamer_id);
+
+ _link_info_map[links] = {io_srv, mgr};
+ return io_srv;
+}
+
+void io_service_mgr_impl::disconnect_links(
+ recv_link_if::sptr recv_link, send_link_if::sptr send_link)
+{
+ link_pair_t links{recv_link, send_link};
+ auto it = _link_info_map.find(links);
+
+ UHD_ASSERT_THROW(it != _link_info_map.end());
+ it->second.mgr->disconnect_links(recv_link, send_link);
+ _link_info_map.erase(it);
+}
+
+}} // namespace uhd::usrp
diff --git a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp
index 0e651a996..a87a9cada 100644
--- a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp
+++ b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.cpp
@@ -7,11 +7,13 @@
#include "mpmd_link_if_ctrl_udp.hpp"
#include "mpmd_impl.hpp"
#include "mpmd_link_if_mgr.hpp"
+#include <uhd/rfnoc/constants.hpp>
#include <uhd/transport/udp_constants.hpp>
#include <uhd/transport/udp_simple.hpp>
#include <uhd/transport/udp_zero_copy.hpp>
-#include <uhdlib/transport/inline_io_service.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/transport/udp_boost_asio_link.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <uhdlib/utils/narrow.hpp>
#include <string>
@@ -26,9 +28,8 @@ namespace {
//! Maximum CHDR packet size in bytes
const size_t MPMD_10GE_DATA_FRAME_MAX_SIZE = 8000;
-
-//! Maximum CHDR packet size in bytes
-const size_t MPMD_10GE_ASYNCMSG_FRAME_MAX_SIZE = 1472;
+const size_t MPMD_1GE_DATA_FRAME_MAX_SIZE = 1472;
+const size_t MPMD_1GE_ASYNCMSG_FRAME_MAX_SIZE = 1472;
//! Number of send/recv frames
const size_t MPMD_ETH_NUM_FRAMES = 32;
@@ -194,8 +195,6 @@ size_t discover_mtu(const std::string& address,
mpmd_link_if_ctrl_udp::mpmd_link_if_ctrl_udp(const uhd::device_addr_t& mb_args,
const mpmd_link_if_mgr::xport_info_list_t& xport_info)
: _mb_args(mb_args)
- , _recv_args(filter_args(mb_args, "recv"))
- , _send_args(filter_args(mb_args, "send"))
, _udp_info(get_udp_info_from_xport_info(xport_info))
, _mtu(MPMD_10GE_DATA_FRAME_MAX_SIZE)
{
@@ -228,36 +227,52 @@ mpmd_link_if_ctrl_udp::mpmd_link_if_ctrl_udp(const uhd::device_addr_t& mb_args,
* API
*****************************************************************************/
uhd::transport::both_links_t mpmd_link_if_ctrl_udp::get_link(const size_t link_idx,
- const uhd::transport::link_type_t /*link_type*/,
- const uhd::device_addr_t& /*link_args*/)
+ const uhd::transport::link_type_t link_type,
+ const uhd::device_addr_t& link_args)
{
UHD_ASSERT_THROW(link_idx < _available_addrs.size());
const std::string ip_addr = _available_addrs.at(link_idx);
const std::string udp_port = _udp_info.at(ip_addr).udp_port;
- /* FIXME: Should have common infrastructure for creating I/O services */
- auto io_srv = uhd::transport::inline_io_service::make();
- link_params_t link_params;
- link_params.num_recv_frames = MPMD_ETH_NUM_FRAMES; // FIXME
- link_params.num_send_frames = MPMD_ETH_NUM_FRAMES; // FIXME
- link_params.recv_frame_size = get_mtu(uhd::RX_DIRECTION); // FIXME
- link_params.send_frame_size = get_mtu(uhd::TX_DIRECTION); // FIXME
- link_params.recv_buff_size = MPMD_BUFFER_DEPTH * MAX_RATE_10GIGE; // FIXME
- link_params.send_buff_size = MPMD_BUFFER_DEPTH * MAX_RATE_10GIGE; // FIXME
- auto link = uhd::transport::udp_boost_asio_link::make(ip_addr,
+ const size_t link_rate = get_link_rate(link_idx);
+ link_params_t default_link_params;
+ default_link_params.num_send_frames = MPMD_ETH_NUM_FRAMES;
+ default_link_params.num_recv_frames = MPMD_ETH_NUM_FRAMES;
+ default_link_params.send_frame_size = (link_rate == MAX_RATE_10GIGE)
+ ? MPMD_10GE_DATA_FRAME_MAX_SIZE
+ : (link_rate == MAX_RATE_1GIGE)
+ ? MPMD_1GE_DATA_FRAME_MAX_SIZE
+ : get_mtu(uhd::TX_DIRECTION);
+ default_link_params.recv_frame_size = (link_rate == MAX_RATE_10GIGE)
+ ? MPMD_10GE_DATA_FRAME_MAX_SIZE
+ : (link_rate == MAX_RATE_1GIGE)
+ ? MPMD_1GE_DATA_FRAME_MAX_SIZE
+ : get_mtu(uhd::RX_DIRECTION);
+ default_link_params.send_buff_size = get_link_rate(link_idx) * MPMD_BUFFER_DEPTH;
+ default_link_params.recv_buff_size = get_link_rate(link_idx) * MPMD_BUFFER_DEPTH;
+
+ link_params_t link_params = calculate_udp_link_params(link_type,
+ get_mtu(uhd::TX_DIRECTION),
+ get_mtu(uhd::RX_DIRECTION),
+ default_link_params,
+ _mb_args,
+ link_args);
+
+ // Enforce a minimum bound of the number of receive and send frames.
+ link_params.num_send_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_send_frames);
+ link_params.num_recv_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_recv_frames);
+
+ auto link = uhd::transport::udp_boost_asio_link::make(ip_addr,
udp_port,
link_params,
- link_params.recv_buff_size, // FIXME
- link_params.send_buff_size); // FIXME
- io_srv->attach_send_link(link);
- io_srv->attach_recv_link(link);
- return std::tuple<io_service::sptr,
- send_link_if::sptr,
+ link_params.recv_buff_size,
+ link_params.send_buff_size);
+ return std::tuple<send_link_if::sptr,
size_t,
recv_link_if::sptr,
size_t,
bool>(
- io_srv, link, link_params.send_buff_size, link, link_params.recv_buff_size, true);
+ link, link_params.send_buff_size, link, link_params.recv_buff_size, true);
}
size_t mpmd_link_if_ctrl_udp::get_num_links() const
@@ -277,3 +292,4 @@ mpmd_link_if_ctrl_udp::get_packet_factory() const
{
return _pkt_factory;
}
+
diff --git a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp
index 4c8ecade7..33db83b47 100644
--- a/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp
+++ b/host/lib/usrp/mpmd/mpmd_link_if_ctrl_udp.hpp
@@ -45,8 +45,6 @@ public:
private:
const uhd::device_addr_t _mb_args;
- const uhd::dict<std::string, std::string> _recv_args;
- const uhd::dict<std::string, std::string> _send_args;
//!
udp_link_info_map _udp_info;
//! A list of IP addresses we can connect our CHDR connections to
diff --git a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp
index e713cc7a3..403e53949 100644
--- a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp
+++ b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp
@@ -14,9 +14,21 @@
using namespace uhd::rfnoc;
using namespace uhd::mpmd;
+static uhd::usrp::io_service_args_t get_default_io_srv_args()
+{
+ // TODO: Need better defaults, taking into account the link type and ensuring
+ // that the number of frames is appropriate
+ uhd::usrp::io_service_args_t args;
+ args.recv_offload = false;
+ args.send_offload = false;
+ return args;
+}
+
mpmd_mboard_impl::mpmd_mb_iface::mpmd_mb_iface(
const uhd::device_addr_t& mb_args, uhd::rpc_client::sptr rpc)
- : _mb_args(mb_args), _rpc(rpc), _link_if_mgr(xport::mpmd_link_if_mgr::make(mb_args))
+ : _mb_args(mb_args)
+ , _rpc(rpc)
+ , _link_if_mgr(xport::mpmd_link_if_mgr::make(mb_args))
{
_remote_device_id = allocate_device_id();
UHD_LOG_TRACE("MPMD::MB_IFACE", "Assigning device_id " << _remote_device_id);
@@ -153,16 +165,18 @@ uhd::rfnoc::chdr_ctrl_xport::sptr mpmd_mboard_impl::mpmd_mb_iface::make_ctrl_tra
+ std::to_string(local_device_id));
}
const size_t link_idx = _local_device_id_map.at(local_device_id);
- uhd::transport::io_service::sptr io_srv;
uhd::transport::send_link_if::sptr send_link;
uhd::transport::recv_link_if::sptr recv_link;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, std::ignore) =
+ std::tie(send_link, std::ignore, recv_link, std::ignore, std::ignore) =
_link_if_mgr->get_link(
link_idx, uhd::transport::link_type_t::CTRL, uhd::device_addr_t());
/* Associate local device ID with the adapter */
_adapter_map[local_device_id] = send_link->get_send_adapter_id();
+ auto io_srv = get_io_srv_mgr()->connect_links(
+ recv_link, send_link, transport::link_type_t::CTRL);
+
auto pkt_factory = _link_if_mgr->get_packet_factory(link_idx);
auto xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
send_link,
@@ -181,7 +195,8 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.second;
@@ -192,12 +207,11 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
}
const size_t link_idx = _local_device_id_map.at(local_sep_addr.first);
- uhd::transport::io_service::sptr io_srv;
uhd::transport::send_link_if::sptr send_link;
uhd::transport::recv_link_if::sptr recv_link;
bool lossy_xport;
size_t recv_buff_size;
- std::tie(io_srv, send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
_link_if_mgr->get_link(
link_idx, uhd::transport::link_type_t::RX_DATA, xport_args);
@@ -217,9 +231,12 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
stream_buff_params_t fc_headroom = {0, 0};
+ auto cfg_io_srv = get_io_srv_mgr()->connect_links(
+ recv_link, send_link, transport::link_type_t::CTRL);
+
// Create the data transport
auto pkt_factory = _link_if_mgr->get_packet_factory(link_idx);
- auto fc_params = chdr_rx_data_xport::configure_sep(io_srv,
+ auto fc_params = chdr_rx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
pkt_factory,
@@ -231,7 +248,18 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport(
fc_freq,
fc_headroom,
lossy_xport);
- auto rx_xport = std::make_unique<chdr_rx_data_xport>(io_srv,
+
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ transport::link_type_t::RX_DATA,
+ usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
+ auto rx_xport = std::make_unique<chdr_rx_data_xport>(io_srv,
recv_link,
send_link,
pkt_factory,
@@ -249,7 +277,8 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.first;
@@ -260,11 +289,10 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
}
const size_t link_idx = _local_device_id_map.at(local_sep_addr.first);
- uhd::transport::io_service::sptr io_srv;
uhd::transport::send_link_if::sptr send_link;
uhd::transport::recv_link_if::sptr recv_link;
bool lossy_xport;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
_link_if_mgr->get_link(
link_idx, uhd::transport::link_type_t::TX_DATA, xport_args);
@@ -275,8 +303,11 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
const double fc_freq_ratio = 1.0 / 8;
const double fc_headroom_ratio = 0;
+ auto cfg_io_srv = get_io_srv_mgr()->connect_links(
+ recv_link, send_link, transport::link_type_t::CTRL);
+
auto pkt_factory = _link_if_mgr->get_packet_factory(link_idx);
- const auto buff_capacity = chdr_tx_data_xport::configure_sep(io_srv,
+ const auto buff_capacity = chdr_tx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
pkt_factory,
@@ -287,6 +318,16 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
fc_freq_ratio,
fc_headroom_ratio);
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ transport::link_type_t::TX_DATA,
+ usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
// Create the data transport
auto tx_xport = std::make_unique<chdr_tx_data_xport>(io_srv,
recv_link,
@@ -296,6 +337,5 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport(
send_link->get_num_send_frames(),
buff_capacity);
-
return tx_xport;
}
diff --git a/host/lib/usrp/mpmd/mpmd_mb_iface.hpp b/host/lib/usrp/mpmd/mpmd_mb_iface.hpp
index 4e47dd35a..4e54cfc12 100644
--- a/host/lib/usrp/mpmd/mpmd_mb_iface.hpp
+++ b/host/lib/usrp/mpmd/mpmd_mb_iface.hpp
@@ -10,8 +10,9 @@
#include "mpmd_impl.hpp"
#include "mpmd_link_if_mgr.hpp"
#include <uhdlib/rfnoc/mb_iface.hpp>
-#include <map>
+#include <uhdlib/usrp/common/io_service_mgr.hpp>
#include <unordered_map>
+#include <map>
namespace uhd { namespace mpmd {
@@ -33,7 +34,8 @@ public:
uhd::endianness_t get_endianness(const uhd::rfnoc::device_id_t local_device_id);
uhd::rfnoc::device_id_t get_remote_device_id();
std::vector<uhd::rfnoc::device_id_t> get_local_device_ids();
- uhd::transport::adapter_id_t get_adapter_id(const uhd::rfnoc::device_id_t local_device_id);
+ uhd::transport::adapter_id_t get_adapter_id(
+ const uhd::rfnoc::device_id_t local_device_id);
void reset_network();
uhd::rfnoc::clock_iface::sptr get_clock_iface(const std::string& clock_name);
uhd::rfnoc::chdr_ctrl_xport::sptr make_ctrl_transport(
@@ -44,14 +46,16 @@ public:
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
uhd::rfnoc::chdr_tx_data_xport::uptr make_tx_data_transport(
uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const uhd::rfnoc::sep_addr_pair_t& addrs,
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
private:
uhd::device_addr_t _mb_args;
@@ -59,8 +63,10 @@ private:
xport::mpmd_link_if_mgr::uptr _link_if_mgr;
uhd::rfnoc::device_id_t _remote_device_id;
std::map<uhd::rfnoc::device_id_t, size_t> _local_device_id_map;
- std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t> _adapter_map;
+ std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t>
+ _adapter_map;
std::map<std::string, uhd::rfnoc::clock_iface::sptr> _clock_ifaces;
+ uhd::usrp::io_service_mgr::sptr _io_srv_mgr;
};
}} /* namespace uhd::mpmd */
diff --git a/host/lib/usrp/x300/x300_eth_mgr.cpp b/host/lib/usrp/x300/x300_eth_mgr.cpp
index 8ff63b050..7177032c6 100644
--- a/host/lib/usrp/x300/x300_eth_mgr.cpp
+++ b/host/lib/usrp/x300/x300_eth_mgr.cpp
@@ -19,8 +19,9 @@
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhdlib/rfnoc/device_id.hpp>
-#include <uhdlib/transport/inline_io_service.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
#include <uhdlib/transport/udp_boost_asio_link.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <uhdlib/usrp/cores/i2c_core_100_wb32.hpp>
//#ifdef HAVE_DPDK
//# include <uhdlib/transport/dpdk_simple.hpp>
@@ -287,53 +288,32 @@ both_links_t eth_manager::get_links(link_type_t link_type,
// Buffering is done in the socket buffers, so size them relative to
// the link rate
- default_buff_args.send_buff_size = conn.link_rate / 50; // 20ms
- default_buff_args.recv_buff_size = std::max(conn.link_rate / 50,
- ETH_MSG_NUM_FRAMES * ETH_MSG_FRAME_SIZE); // enough to hold greater of 20ms or
- // number of msg frames
+ link_params_t default_link_params;
// There is no need for more than 1 send and recv frame since the
// buffering is done in the socket buffers
- default_buff_args.num_send_frames = 1; // or 2?
- default_buff_args.num_recv_frames = 1;
- if (link_type == link_type_t::CTRL) {
- // Increasing number of recv frames here because ctrl_iface uses it
- // to determine how many control packets can be in flight before it
- // must wait for an ACK
- // FIXME this is no longer true, find a good value
- default_buff_args.num_recv_frames = 85; // 256/3
- } else if (link_type == link_type_t::TX_DATA) {
- size_t default_frame_size = conn.link_rate == MAX_RATE_1GIGE
- ? GE_DATA_FRAME_SEND_SIZE
- : XGE_DATA_FRAME_SEND_SIZE;
- default_buff_args.send_frame_size = link_args.cast<size_t>(
- "send_frame_size", std::min(default_frame_size, send_mtu));
- default_buff_args.num_send_frames = link_args.cast<size_t>(
- "num_send_frames", default_buff_args.num_send_frames);
- default_buff_args.send_buff_size = link_args.cast<size_t>(
- "send_buff_size", default_buff_args.send_buff_size);
- } else if (link_type == link_type_t::RX_DATA) {
- size_t default_frame_size = conn.link_rate == MAX_RATE_1GIGE
- ? GE_DATA_FRAME_RECV_SIZE
- : XGE_DATA_FRAME_RECV_SIZE;
- default_buff_args.recv_frame_size = link_args.cast<size_t>(
- "recv_frame_size", std::min(default_frame_size, recv_mtu));
- // set some buffers so the offload thread actually offloads the
- // socket I/O
- default_buff_args.num_recv_frames =
- link_args.cast<size_t>("num_recv_frames", 2);
- default_buff_args.recv_buff_size = link_args.cast<size_t>(
- "recv_buff_size", default_buff_args.recv_buff_size);
- }
+ default_link_params.num_send_frames = 1; // or 2?
+ default_link_params.num_recv_frames = 2;
+ default_link_params.send_frame_size = conn.link_rate == MAX_RATE_1GIGE
+ ? GE_DATA_FRAME_SEND_SIZE
+ : XGE_DATA_FRAME_SEND_SIZE;
+ default_link_params.recv_frame_size = conn.link_rate == MAX_RATE_1GIGE
+ ? GE_DATA_FRAME_RECV_SIZE
+ : XGE_DATA_FRAME_RECV_SIZE;
+ default_link_params.send_buff_size = conn.link_rate / 50;
+ default_link_params.recv_buff_size = std::max(conn.link_rate / 50,
+ ETH_MSG_NUM_FRAMES * ETH_MSG_FRAME_SIZE); // enough to hold greater of 20 ms or
+ // number of msg frames
+
+ link_params_t link_params = calculate_udp_link_params(link_type,
+ get_mtu(uhd::TX_DIRECTION),
+ get_mtu(uhd::RX_DIRECTION),
+ default_link_params,
+ _args.get_orig_args(),
+ link_args);
- /* FIXME: Should have common infrastructure for creating I/O services */
- auto io_srv = uhd::transport::inline_io_service::make();
- link_params_t link_params;
- link_params.num_recv_frames = default_buff_args.num_recv_frames;
- link_params.num_send_frames = default_buff_args.num_send_frames;
- link_params.recv_frame_size = default_buff_args.recv_frame_size;
- link_params.send_frame_size = default_buff_args.send_frame_size;
- link_params.recv_buff_size = default_buff_args.recv_buff_size;
- link_params.send_buff_size = default_buff_args.send_buff_size;
+ // Enforce a minimum bound of the number of receive and send frames.
+ link_params.num_send_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_send_frames);
+ link_params.num_recv_frames = std::max(uhd::rfnoc::MIN_NUM_FRAMES, link_params.num_recv_frames);
size_t recv_buff_size, send_buff_size;
auto link = uhd::transport::udp_boost_asio_link::make(conn.addr,
@@ -341,9 +321,7 @@ both_links_t eth_manager::get_links(link_type_t link_type,
link_params,
recv_buff_size,
send_buff_size);
- io_srv->attach_send_link(link);
- io_srv->attach_recv_link(link);
- return std::make_tuple(io_srv, link, send_buff_size, link, recv_buff_size, true);
+ return std::make_tuple(link, send_buff_size, link, recv_buff_size, true);
}
/******************************************************************************
diff --git a/host/lib/usrp/x300/x300_impl.hpp b/host/lib/usrp/x300/x300_impl.hpp
index 600d224a5..a3276152a 100644
--- a/host/lib/usrp/x300/x300_impl.hpp
+++ b/host/lib/usrp/x300/x300_impl.hpp
@@ -108,7 +108,8 @@ private:
uhd::endianness_t get_endianness(const uhd::rfnoc::device_id_t local_device_id);
uhd::rfnoc::device_id_t get_remote_device_id();
std::vector<uhd::rfnoc::device_id_t> get_local_device_ids();
- uhd::transport::adapter_id_t get_adapter_id(const uhd::rfnoc::device_id_t local_device_id);
+ uhd::transport::adapter_id_t get_adapter_id(
+ const uhd::rfnoc::device_id_t local_device_id);
void reset_network();
uhd::rfnoc::clock_iface::sptr get_clock_iface(const std::string& clock_name);
uhd::rfnoc::chdr_ctrl_xport::sptr make_ctrl_transport(
@@ -120,18 +121,21 @@ private:
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
uhd::rfnoc::chdr_tx_data_xport::uptr make_tx_data_transport(
uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal,
const uhd::rfnoc::sep_addr_pair_t& addrs,
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args);
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id);
private:
const uhd::rfnoc::device_id_t _remote_dev_id;
- std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t> _adapter_map;
+ std::unordered_map<uhd::rfnoc::device_id_t, uhd::transport::adapter_id_t>
+ _adapter_map;
uhd::rfnoc::clock_iface::sptr _bus_clk;
uhd::rfnoc::clock_iface::sptr _radio_clk;
uhd::usrp::x300::conn_manager::sptr _conn_mgr;
diff --git a/host/lib/usrp/x300/x300_mb_iface.cpp b/host/lib/usrp/x300/x300_mb_iface.cpp
index 5642ffc98..5ba92f52c 100644
--- a/host/lib/usrp/x300/x300_mb_iface.cpp
+++ b/host/lib/usrp/x300/x300_mb_iface.cpp
@@ -10,6 +10,15 @@
using namespace uhd::rfnoc;
using uhd::transport::link_type_t;
+static uhd::usrp::io_service_args_t get_default_io_srv_args()
+{
+ // TODO: Need better defaults, taking into account the link type and ensuring
+ // that the number of frames is appropriate
+ uhd::usrp::io_service_args_t args;
+ args.recv_offload = false;
+ args.send_offload = false;
+ return args;
+}
x300_impl::x300_mb_iface::x300_mb_iface(uhd::usrp::x300::conn_manager::sptr conn_mgr,
const double radio_clk_freq,
@@ -84,10 +93,12 @@ uhd::rfnoc::clock_iface::sptr x300_impl::x300_mb_iface::get_clock_iface(
uhd::rfnoc::chdr_ctrl_xport::sptr x300_impl::x300_mb_iface::make_ctrl_transport(
uhd::rfnoc::device_id_t local_device_id, const uhd::rfnoc::sep_id_t& local_epid)
{
- uhd::transport::io_service::sptr io_srv;
- uhd::transport::send_link_if::sptr send_link;
- uhd::transport::recv_link_if::sptr recv_link;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, std::ignore) =
+ using namespace uhd::transport;
+
+ send_link_if::sptr send_link;
+ recv_link_if::sptr recv_link;
+ bool lossy_xport;
+ std::tie(send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
_conn_mgr->get_links(link_type_t::CTRL,
local_device_id,
local_epid,
@@ -97,7 +108,10 @@ uhd::rfnoc::chdr_ctrl_xport::sptr x300_impl::x300_mb_iface::make_ctrl_transport(
/* Associate local device ID with the adapter */
_adapter_map[local_device_id] = send_link->get_send_adapter_id();
- auto xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ auto io_srv =
+ get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::CTRL);
+
+ auto xport = chdr_ctrl_xport::make(io_srv,
send_link,
recv_link,
_pkt_factory,
@@ -113,18 +127,20 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
+ using namespace uhd::transport;
+
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.second;
const uhd::rfnoc::sep_id_t remote_epid = epids.first;
const uhd::rfnoc::sep_id_t local_epid = epids.second;
- uhd::transport::io_service::sptr io_srv;
- uhd::transport::send_link_if::sptr send_link;
- uhd::transport::recv_link_if::sptr recv_link;
+ send_link_if::sptr send_link;
+ recv_link_if::sptr recv_link;
size_t recv_buff_size;
bool lossy_xport;
- std::tie(io_srv, send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, recv_buff_size, lossy_xport) =
_conn_mgr->get_links(link_type_t::RX_DATA,
local_sep_addr.first,
local_epid,
@@ -147,8 +163,10 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran
uhd::rfnoc::stream_buff_params_t fc_headroom = {0, 0};
- // Create the data transport
- auto fc_params = uhd::rfnoc::chdr_rx_data_xport::configure_sep(io_srv,
+ auto cfg_io_srv =
+ get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::CTRL);
+
+ auto fc_params = uhd::rfnoc::chdr_rx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
_pkt_factory,
@@ -161,6 +179,17 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran
fc_headroom,
lossy_xport);
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ link_type_t::RX_DATA,
+ uhd::usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
+ // Create the data transport
auto rx_xport = std::make_unique<uhd::rfnoc::chdr_rx_data_xport>(io_srv,
recv_link,
send_link,
@@ -178,17 +207,19 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran
const uhd::rfnoc::sep_id_pair_t& epids,
const uhd::rfnoc::sw_buff_t pyld_buff_fmt,
const uhd::rfnoc::sw_buff_t mdata_buff_fmt,
- const uhd::device_addr_t& xport_args)
+ const uhd::device_addr_t& xport_args,
+ const std::string& streamer_id)
{
+ using namespace uhd::transport;
+
const uhd::rfnoc::sep_addr_t local_sep_addr = addrs.first;
const uhd::rfnoc::sep_id_t remote_epid = epids.second;
const uhd::rfnoc::sep_id_t local_epid = epids.first;
- uhd::transport::io_service::sptr io_srv;
- uhd::transport::send_link_if::sptr send_link;
- uhd::transport::recv_link_if::sptr recv_link;
+ send_link_if::sptr send_link;
+ recv_link_if::sptr recv_link;
bool lossy_xport;
- std::tie(io_srv, send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
+ std::tie(send_link, std::ignore, recv_link, std::ignore, lossy_xport) =
_conn_mgr->get_links(link_type_t::TX_DATA,
local_sep_addr.first,
local_epid,
@@ -202,7 +233,10 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran
const double fc_freq_ratio = 1.0 / 8;
const double fc_headroom_ratio = 0;
- const auto buff_capacity = chdr_tx_data_xport::configure_sep(io_srv,
+ auto cfg_io_srv =
+ get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::CTRL);
+
+ const auto buff_capacity = chdr_tx_data_xport::configure_sep(cfg_io_srv,
recv_link,
send_link,
_pkt_factory,
@@ -213,6 +247,16 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran
fc_freq_ratio,
fc_headroom_ratio);
+ get_io_srv_mgr()->disconnect_links(recv_link, send_link);
+ cfg_io_srv.reset();
+
+ // Connect the links to an I/O service
+ auto io_srv = get_io_srv_mgr()->connect_links(recv_link,
+ send_link,
+ link_type_t::TX_DATA,
+ uhd::usrp::read_io_service_args(xport_args, get_default_io_srv_args()),
+ streamer_id);
+
// Create the data transport
auto tx_xport = std::make_unique<chdr_tx_data_xport>(io_srv,
recv_link,