diff options
-rw-r--r-- | host/lib/include/uhdlib/usrp/common/io_service_args.hpp | 13 | ||||
-rw-r--r-- | host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp | 8 | ||||
-rw-r--r-- | host/lib/usrp/common/io_service_args.cpp | 50 | ||||
-rw-r--r-- | host/lib/usrp/common/io_service_mgr.cpp | 93 | ||||
-rw-r--r-- | host/lib/usrp/mpmd/mpmd_mb_iface.cpp | 6 | ||||
-rw-r--r-- | host/lib/usrp/x300/x300_mb_iface.cpp | 6 |
6 files changed, 131 insertions, 45 deletions
diff --git a/host/lib/include/uhdlib/usrp/common/io_service_args.hpp b/host/lib/include/uhdlib/usrp/common/io_service_args.hpp index a783cc825..a8e46d8c3 100644 --- a/host/lib/include/uhdlib/usrp/common/io_service_args.hpp +++ b/host/lib/include/uhdlib/usrp/common/io_service_args.hpp @@ -88,6 +88,19 @@ struct io_service_args_t io_service_args_t read_io_service_args( const device_addr_t& args, const io_service_args_t& defaults); +/*! Merges device_args with stream_args + * + * Copies args related to I/O services from device args to stream args, and + * returns the merged result. If the same arg is specified in device_args and + * stream args, the value in stream_args is returned. + * + * \param args The device args provided when the graph is created + * \param args The stream args provided when a streamer is created + * \return The merged device args + */ +device_addr_t merge_io_service_dev_args( + const device_addr_t& dev_args, const device_addr_t& stream_args); + }} // namespace uhd::usrp #endif /* INCLUDED_LIBUHD_IO_SERVICE_ARGS_HPP */ diff --git a/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp b/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp index 1093f7bec..2707c564c 100644 --- a/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp +++ b/host/lib/include/uhdlib/usrp/common/io_service_mgr.hpp @@ -72,7 +72,8 @@ public: * \param recv_link The recv link to connect to an I/O service * \param send_link The send link to connect to an I/O service * \param link_type The type of transport in which the links will be used - * \param io_srv_args The user-requested options for the stream + * \param io_srv_args The default stream args for the device + * \param stream_args The user-provided stream args * \param streamer_id A unique ID for the streamer that will use the links * \return The I/O service to which the links are connected */ @@ -80,8 +81,9 @@ public: transport::recv_link_if::sptr recv_link, transport::send_link_if::sptr send_link, const transport::link_type_t link_type, - const io_service_args_t& io_srv_args = io_service_args_t(), - const std::string& streamer_id = "") = 0; + const io_service_args_t& default_args = io_service_args_t(), + const uhd::device_addr_t& stream_args = uhd::device_addr_t(), + const std::string& streamer_id = "") = 0; /*! Disconnects links from their I/O service * diff --git a/host/lib/usrp/common/io_service_args.cpp b/host/lib/usrp/common/io_service_args.cpp index 09af74f36..04b58b047 100644 --- a/host/lib/usrp/common/io_service_args.cpp +++ b/host/lib/usrp/common/io_service_args.cpp @@ -12,6 +12,15 @@ static const std::string LOG_ID = "IO_SRV"; static const size_t MAX_NUM_XPORT_ADAPTERS = 2; +static const char* recv_offload_str = "recv_offload"; +static const char* send_offload_str = "send_offload"; +static const char* recv_offload_wait_mode_str = "recv_offload_wait_mode"; +static const char* send_offload_wait_mode_str = "send_offload_wait_mode"; +static const char* recv_offload_thread_cpu_str = "recv_offload_thread_cpu"; +static const char* send_offload_thread_cpu_str = "send_offload_thread_cpu"; +static const char* num_poll_offload_threads_str = "num_poll_offload_threads"; +static const char* poll_offload_thread_cpu_str = "poll_offload_thread_cpu_str"; + namespace uhd { namespace usrp { namespace { @@ -47,16 +56,16 @@ io_service_args_t read_io_service_args( io_service_args_t io_srv_args; std::string tmp_str, default_str; - io_srv_args.recv_offload = get_bool_arg(args, "recv_offload", defaults.recv_offload); - io_srv_args.send_offload = get_bool_arg(args, "send_offload", defaults.send_offload); + io_srv_args.recv_offload = get_bool_arg(args, recv_offload_str, defaults.recv_offload); + io_srv_args.send_offload = get_bool_arg(args, send_offload_str, defaults.send_offload); io_srv_args.recv_offload_wait_mode = get_wait_mode_arg( - args, "recv_offload_wait_mode", defaults.recv_offload_wait_mode); + args, recv_offload_wait_mode_str, defaults.recv_offload_wait_mode); io_srv_args.send_offload_wait_mode = get_wait_mode_arg( - args, "send_offload_wait_mode", defaults.send_offload_wait_mode); + args, send_offload_wait_mode_str, defaults.send_offload_wait_mode); io_srv_args.num_poll_offload_threads = - args.cast<size_t>("num_poll_offload_threads", defaults.num_poll_offload_threads); + args.cast<size_t>(num_poll_offload_threads_str, defaults.num_poll_offload_threads); if (io_srv_args.num_poll_offload_threads == 0) { UHD_LOG_WARNING(LOG_ID, "Invalid value for num_poll_offload_threads. " @@ -69,7 +78,7 @@ io_service_args_t read_io_service_args( }; for (size_t i = 0; i < MAX_NUM_XPORT_ADAPTERS; i++) { - std::string key = create_key("recv_offload_thread_cpu", i); + std::string key = create_key(recv_offload_thread_cpu_str, i); if (args.has_key(key)) { io_srv_args.recv_offload_thread_cpu.push_back(args.cast<size_t>(key, 0)); } else { @@ -78,7 +87,7 @@ io_service_args_t read_io_service_args( } for (size_t i = 0; i < MAX_NUM_XPORT_ADAPTERS; i++) { - std::string key = create_key("send_offload_thread_cpu", i); + std::string key = create_key(send_offload_thread_cpu_str, i); if (args.has_key(key)) { io_srv_args.send_offload_thread_cpu.push_back(args.cast<size_t>(key, 0)); } else { @@ -87,7 +96,7 @@ io_service_args_t read_io_service_args( } for (size_t i = 0; i < io_srv_args.num_poll_offload_threads; i++) { - std::string key = create_key("poll_offload_thread_cpu", i); + std::string key = create_key(poll_offload_thread_cpu_str, i); if (args.has_key(key)) { io_srv_args.poll_offload_thread_cpu.push_back(args.cast<size_t>(key, 0)); } else { @@ -98,4 +107,29 @@ io_service_args_t read_io_service_args( return io_srv_args; } +device_addr_t merge_io_service_dev_args( + const device_addr_t& dev_args, const device_addr_t& stream_args) +{ + device_addr_t args = stream_args; + + auto merge_args = [&dev_args, stream_args, &args](const char* key) { + if (!stream_args.has_key(key)) { + if (dev_args.has_key(key)) { + args[key] = dev_args[key]; + } + } + }; + + merge_args(recv_offload_str); + merge_args(send_offload_str); + merge_args(recv_offload_wait_mode_str); + merge_args(send_offload_wait_mode_str); + merge_args(recv_offload_thread_cpu_str); + merge_args(send_offload_thread_cpu_str); + merge_args(num_poll_offload_threads_str); + merge_args(poll_offload_thread_cpu_str); + + return args; +} + }} // namespace uhd::usrp diff --git a/host/lib/usrp/common/io_service_mgr.cpp b/host/lib/usrp/common/io_service_mgr.cpp index bf55ed228..c00f36a25 100644 --- a/host/lib/usrp/common/io_service_mgr.cpp +++ b/host/lib/usrp/common/io_service_mgr.cpp @@ -21,20 +21,23 @@ static const std::string LOG_ID = "IO_SRV"; namespace uhd { namespace usrp { +/* This file defines an I/O service manager implementation, io_service_mgr_impl. + * Its implementation is divided into three other classes, inline_io_service_mgr, + * blocking_io_service_mgr, and polling_io_service_mgr. The io_service_mgr_impl + * object selects which one to invoke based on the provided stream args. + */ + /* Inline I/O service manager * * I/O service manager for inline I/O services. Creates a new inline_io_service * for every new pair of links, unless they are already attached to an I/O * service (muxed links). */ -class inline_io_service_mgr : public io_service_mgr +class inline_io_service_mgr { public: io_service::sptr connect_links(recv_link_if::sptr recv_link, - send_link_if::sptr send_link, - const link_type_t link_type, - const io_service_args_t& args, - const std::string& streamer_id); + send_link_if::sptr send_link); void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link); @@ -50,10 +53,7 @@ private: }; io_service::sptr inline_io_service_mgr::connect_links(recv_link_if::sptr recv_link, - send_link_if::sptr send_link, - const link_type_t /*link_type*/, - const io_service_args_t& /*args*/, - const std::string& /*streamer_id*/) + send_link_if::sptr send_link) { // Check if links are already connected const link_pair_t links{recv_link, send_link}; @@ -106,7 +106,7 @@ void inline_io_service_mgr::disconnect_links( * a streamer. If there are multiple streamers, this manager creates a separate * set of I/O services for each streamer. */ -class blocking_io_service_mgr : public io_service_mgr +class blocking_io_service_mgr { public: io_service::sptr connect_links(recv_link_if::sptr recv_link, @@ -276,14 +276,12 @@ io_service::sptr blocking_io_service_mgr::_create_new_io_service( * links among them. New connections always go to the offload thread containing * the fewest connections, with lowest numbered thread as a second criterion. */ -class polling_io_service_mgr : public io_service_mgr +class polling_io_service_mgr { public: io_service::sptr connect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link, - const link_type_t link_type, - const io_service_args_t& args, - const std::string& streamer_id); + const io_service_args_t& args); void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link); @@ -311,9 +309,7 @@ private: io_service::sptr polling_io_service_mgr::connect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link, - const link_type_t /*link_type*/, - const io_service_args_t& args, - const std::string& /*streamer_id*/) + const io_service_args_t& args) { // Check if links are already connected const link_pair_t links{recv_link, send_link}; @@ -414,12 +410,19 @@ public: io_service::sptr connect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link, const link_type_t link_type, - const io_service_args_t& args, + const io_service_args_t& default_args, + const uhd::device_addr_t& stream_args, const std::string& streamer_id); void disconnect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link); private: + enum io_service_type_t + { + INLINE_IO_SRV, + BLOCKING_IO_SRV, + POLLING_IO_SRV + }; struct xport_args_t { bool offload = false; @@ -428,7 +431,7 @@ private: struct link_info_t { io_service::sptr io_srv; - io_service_mgr* mgr = nullptr; + io_service_type_t io_srv_type; }; using link_pair_t = std::pair<recv_link_if::sptr, send_link_if::sptr>; @@ -450,11 +453,15 @@ io_service_mgr::sptr io_service_mgr::make(const uhd::device_addr_t& args) io_service::sptr io_service_mgr_impl::connect_links(recv_link_if::sptr recv_link, send_link_if::sptr send_link, const link_type_t link_type, - const io_service_args_t& args, + const io_service_args_t& default_args, + const uhd::device_addr_t& stream_args, const std::string& streamer_id) { UHD_ASSERT_THROW(link_type != link_type_t::ASYNC_MSG); + const io_service_args_t args = read_io_service_args( + merge_io_service_dev_args(_args, stream_args), default_args); + // Check if the links are already attached to an I/O service. If they are, // then use the same manager to connect, since links can only be connected // to one I/O service at any given a time. @@ -462,16 +469,16 @@ io_service::sptr io_service_mgr_impl::connect_links(recv_link_if::sptr recv_link auto it = _link_info_map.find(links); io_service::sptr io_srv; - io_service_mgr* mgr = nullptr; + io_service_type_t io_srv_type; if (it != _link_info_map.end()) { - io_srv = it->second.io_srv; - mgr = it->second.mgr; + io_srv = it->second.io_srv; + io_srv_type = it->second.io_srv_type; } else { // Links not already attached, pick an io_service_mgr to connect based // on user parameters and connect them. if (link_type == link_type_t::CTRL) { - mgr = &_inline_io_srv_mgr; + io_srv_type = INLINE_IO_SRV; } else { bool offload = (link_type == link_type_t::RX_DATA) ? args.recv_offload : args.send_offload; @@ -481,19 +488,32 @@ io_service::sptr io_service_mgr_impl::connect_links(recv_link_if::sptr recv_link if (offload) { if (wait_mode == io_service_args_t::POLL) { - mgr = &_polling_io_srv_mgr; + io_srv_type = POLLING_IO_SRV; } else { - mgr = &_blocking_io_srv_mgr; + io_srv_type = BLOCKING_IO_SRV; } } else { - mgr = &_inline_io_srv_mgr; + io_srv_type = INLINE_IO_SRV; } } } - io_srv = mgr->connect_links(recv_link, send_link, link_type, args, streamer_id); + switch (io_srv_type) { + case INLINE_IO_SRV: + io_srv = _inline_io_srv_mgr.connect_links(recv_link, send_link); + break; + case BLOCKING_IO_SRV: + io_srv = _blocking_io_srv_mgr.connect_links( + recv_link, send_link, link_type, args, streamer_id); + break; + case POLLING_IO_SRV: + io_srv = _polling_io_srv_mgr.connect_links(recv_link, send_link, args); + break; + default: + UHD_THROW_INVALID_CODE_PATH(); + } - _link_info_map[links] = {io_srv, mgr}; + _link_info_map[links] = {io_srv, io_srv_type}; return io_srv; } @@ -504,7 +524,20 @@ void io_service_mgr_impl::disconnect_links( auto it = _link_info_map.find(links); UHD_ASSERT_THROW(it != _link_info_map.end()); - it->second.mgr->disconnect_links(recv_link, send_link); + switch (it->second.io_srv_type) { + case INLINE_IO_SRV: + _inline_io_srv_mgr.disconnect_links(recv_link, send_link); + break; + case BLOCKING_IO_SRV: + _blocking_io_srv_mgr.disconnect_links(recv_link, send_link); + break; + case POLLING_IO_SRV: + _polling_io_srv_mgr.disconnect_links(recv_link, send_link); + break; + default: + UHD_THROW_INVALID_CODE_PATH(); + } + _link_info_map.erase(it); } diff --git a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp index 403e53949..b113ad596 100644 --- a/host/lib/usrp/mpmd/mpmd_mb_iface.cpp +++ b/host/lib/usrp/mpmd/mpmd_mb_iface.cpp @@ -256,7 +256,8 @@ mpmd_mboard_impl::mpmd_mb_iface::make_rx_data_transport( auto io_srv = get_io_srv_mgr()->connect_links(recv_link, send_link, transport::link_type_t::RX_DATA, - usrp::read_io_service_args(xport_args, get_default_io_srv_args()), + get_default_io_srv_args(), + xport_args, streamer_id); auto rx_xport = std::make_unique<chdr_rx_data_xport>(io_srv, @@ -325,7 +326,8 @@ mpmd_mboard_impl::mpmd_mb_iface::make_tx_data_transport( auto io_srv = get_io_srv_mgr()->connect_links(recv_link, send_link, transport::link_type_t::TX_DATA, - usrp::read_io_service_args(xport_args, get_default_io_srv_args()), + get_default_io_srv_args(), + xport_args, streamer_id); // Create the data transport diff --git a/host/lib/usrp/x300/x300_mb_iface.cpp b/host/lib/usrp/x300/x300_mb_iface.cpp index 5ba92f52c..e0c426138 100644 --- a/host/lib/usrp/x300/x300_mb_iface.cpp +++ b/host/lib/usrp/x300/x300_mb_iface.cpp @@ -186,7 +186,8 @@ uhd::rfnoc::chdr_rx_data_xport::uptr x300_impl::x300_mb_iface::make_rx_data_tran auto io_srv = get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::RX_DATA, - uhd::usrp::read_io_service_args(xport_args, get_default_io_srv_args()), + get_default_io_srv_args(), + xport_args, streamer_id); // Create the data transport @@ -254,7 +255,8 @@ uhd::rfnoc::chdr_tx_data_xport::uptr x300_impl::x300_mb_iface::make_tx_data_tran auto io_srv = get_io_srv_mgr()->connect_links(recv_link, send_link, link_type_t::TX_DATA, - uhd::usrp::read_io_service_args(xport_args, get_default_io_srv_args()), + get_default_io_srv_args(), + xport_args, streamer_id); // Create the data transport |