From b12b9465ed13bf2eb7c0fe379c22dcb3b86c4054 Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Wed, 15 Nov 2017 17:21:27 -0800 Subject: mpm/mpmd: Move to request_xport()/commit_xport() architecture This commit combines code from various branches to finally enable both UDP and Liberio transports. --- host/CMakeLists.txt | 10 +- host/lib/usrp/mpmd/CMakeLists.txt | 6 + host/lib/usrp/mpmd/mpmd_impl.cpp | 113 -------------- host/lib/usrp/mpmd/mpmd_impl.hpp | 40 ++++- host/lib/usrp/mpmd/mpmd_xport.cpp | 303 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 347 insertions(+), 125 deletions(-) create mode 100644 host/lib/usrp/mpmd/mpmd_xport.cpp (limited to 'host') diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt index 61df25579..18c70af24 100644 --- a/host/CMakeLists.txt +++ b/host/CMakeLists.txt @@ -321,11 +321,11 @@ PYTHON_CHECK_MODULE( HAVE_PYTHON_MODULE_MAKO ) -PYTHON_CHECK_MODULE( - "requests 2.0 or greater" - "requests" "requests.__version__ >= '2.0'" - HAVE_PYTHON_MODULE_REQUESTS -) +#PYTHON_CHECK_MODULE( + #"requests 2.0 or greater" + #"requests" "requests.__version__ >= '2.0'" + #HAVE_PYTHON_MODULE_REQUESTS +#) ######################################################################## # Create Uninstall Target diff --git a/host/lib/usrp/mpmd/CMakeLists.txt b/host/lib/usrp/mpmd/CMakeLists.txt index 0530c9531..5bc8e2d08 100644 --- a/host/lib/usrp/mpmd/CMakeLists.txt +++ b/host/lib/usrp/mpmd/CMakeLists.txt @@ -16,9 +16,15 @@ # IF(ENABLE_MPMD) + IF(ENABLE_LIBERIO) + MESSAGE(STATUS "Compiling MPM with liberio support...") + ADD_DEFINITIONS(-DHAVE_LIBERIO) + ENDIF(ENABLE_LIBERIO) + LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_impl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_mboard_impl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_xport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_image_loader.cpp ) ENDIF(ENABLE_MPMD) diff --git a/host/lib/usrp/mpmd/mpmd_impl.cpp b/host/lib/usrp/mpmd/mpmd_impl.cpp index d2d15e32a..cada95345 100644 --- a/host/lib/usrp/mpmd/mpmd_impl.cpp +++ b/host/lib/usrp/mpmd/mpmd_impl.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -433,118 +432,6 @@ size_t mpmd_impl::allocate_xbar_local_addr() return new_local_addr; } -size_t mpmd_impl::identify_mboard_by_sid(const size_t remote_addr) -{ - for (size_t mb_index = 0; mb_index < _mb.size(); mb_index++) { - for (size_t xbar_index = 0; - xbar_index < _mb[mb_index]->num_xbars; - xbar_index++) { - if (_mb[mb_index]->get_xbar_local_addr(xbar_index) == remote_addr) { - return mb_index; - } - } - } - throw uhd::lookup_error(str( - boost::format("Cannot identify mboard for remote address %d") - % remote_addr - )); -} - - -/***************************************************************************** - * API - ****************************************************************************/ -// TODO this does not consider the liberio use case! -uhd::device_addr_t mpmd_impl::get_rx_hints(size_t /* mb_index */) -{ - //device_addr_t rx_hints = _mb[mb_index].recv_args; - device_addr_t rx_hints; // TODO don't ignore what the user tells us - // (default to a large recv buff) - if (not rx_hints.has_key("recv_buff_size")) - { - //For the ethernet transport, the buffer has to be set before creating - //the transport because it is independent of the frame size and # frames - //For nirio, the buffer size is not configurable by the user - #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) - //limit buffer resize on macos or it will error - rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH_MACOS); - #elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) - //set to half-a-second of buffering at max rate - rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH); - #endif - } - return rx_hints; -} - - -// frame_size_t determine_max_frame_size(const std::string &addr, -// const frame_size_t &user_frame_size){ -// transport::udp_simple::sptr udp = -// transport::udp_simple::make_connected(addr, -// std::to_string(MPM_DISCOVERY_PORT)); -// std::vector buffer(std::max(user_frame_size.rec)) -// } -// Everything fake below here - -both_xports_t mpmd_impl::make_transport( - const sid_t& address, - usrp::device3_impl::xport_type_t xport_type, - const uhd::device_addr_t& args -) { - const size_t mb_index = identify_mboard_by_sid(address.get_dst_addr()); - - UHD_LOGGER_TRACE("MPMD") - << "Creating new transport of type: " - << (xport_type == CTRL ? "CTRL" : (xport_type == RX_DATA ? "RX" : "TX")) - << " To mboard: " << mb_index - << " Destination address: " << address.to_pp_string_hex().substr(6) - << " User-defined xport args: " << args.to_string() - ; - - both_xports_t xports; - const uhd::device_addr_t& xport_args = (xport_type == CTRL) ? uhd::device_addr_t() : args; - transport::zero_copy_xport_params default_buff_args; - - std::string interface_addr = _mb[mb_index]->mb_args.get("addr"); - UHD_ASSERT_THROW(not interface_addr.empty()); - const uint32_t xbar_src_addr = address.get_src_addr(); - const uint32_t xbar_src_dst = 0; - - default_buff_args.send_frame_size = 8000; - default_buff_args.recv_frame_size = 8000; - default_buff_args.num_recv_frames = 32; - default_buff_args.num_send_frames = 32; - // hardcode frame size for now - - transport::udp_zero_copy::buff_params buff_params; - auto recv = transport::udp_zero_copy::make( - interface_addr, - BOOST_STRINGIZE(49153), - default_buff_args, - buff_params, - xport_args); - uint16_t port = recv->get_local_port(); - - xports.endianness = uhd::ENDIANNESS_BIG; - xports.send_sid = _mb[mb_index]->allocate_sid(port, - address, xbar_src_addr, xbar_src_dst, _sid_framer++ - ); - xports.recv_sid = xports.send_sid.reversed(); - xports.recv_buff_size = buff_params.recv_buff_size; - xports.send_buff_size = buff_params.send_buff_size; - xports.recv = recv; // Note: This is a type cast! - xports.send = xports.recv; - UHD_LOGGER_TRACE("MPMD") - << "xport info: send_sid==" << xports.send_sid.to_pp_string_hex() - << " recv_sid==" << xports.recv_sid.to_pp_string_hex() - << " endianness==" - << (xports.endianness == uhd::ENDIANNESS_BIG ? "BE" : "LE") - << " recv_buff_size==" << xports.recv_buff_size - << " send_buff_size==" << xports.send_buff_size - ; - - return xports; -} /***************************************************************************** * Find, Factory & Registry diff --git a/host/lib/usrp/mpmd/mpmd_impl.hpp b/host/lib/usrp/mpmd/mpmd_impl.hpp index 7eca982a4..3143378dd 100644 --- a/host/lib/usrp/mpmd/mpmd_impl.hpp +++ b/host/lib/usrp/mpmd/mpmd_impl.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include static const size_t MPMD_RX_SW_BUFF_SIZE_ETH = 0x2000000;//32MiB For an ~8k frame size any size >32MiB is just wasted buffer space @@ -36,13 +37,6 @@ static const char MPM_ECHO_CMD[] = "MPM-ECHO"; static const size_t MPMD_10GE_DATA_FRAME_MAX_SIZE = 8000; // CHDR packet size in bytes -struct frame_size_t -{ - size_t recv_frame_size; - size_t send_frame_size; -}; - - /*! Stores all attributes specific to a single MPM device */ class mpmd_mboard_impl @@ -195,10 +189,42 @@ class mpmd_impl : public uhd::usrp::device3_impl */ size_t identify_mboard_by_sid(const size_t remote_addr); + using xport_info_t = std::map; + using xport_info_list_t = std::vector>; + + uhd::both_xports_t make_transport_udp( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t xport_type, + const uhd::device_addr_t& xport_args + ); + +#ifdef HAVE_LIBERIO + /*! Create a muxed liberio transport for control packets */ + uhd::transport::muxed_zero_copy_if::sptr make_muxed_liberio_xport( + const std::string &tx_dev, + const std::string &rx_dev, + const uhd::transport::zero_copy_xport_params &buff_args, + const size_t max_muxed_ports + ); + + uhd::both_xports_t make_transport_liberio( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t xport_type, + const uhd::device_addr_t& xport_args + ); +#endif /************************************************************************* * Private attributes ************************************************************************/ + // FIXME move the next two into their own transport manager class + //! Control transport for one liberio connection + uhd::transport::muxed_zero_copy_if::sptr _ctrl_dma_xport; + //! Control transport for one liberio connection + uhd::transport::muxed_zero_copy_if::sptr _async_msg_dma_xport; + uhd::dict recv_args; uhd::dict send_args; diff --git a/host/lib/usrp/mpmd/mpmd_xport.cpp b/host/lib/usrp/mpmd/mpmd_xport.cpp new file mode 100644 index 000000000..5dd29a6f7 --- /dev/null +++ b/host/lib/usrp/mpmd/mpmd_xport.cpp @@ -0,0 +1,303 @@ +// +// Copyright 2017 Ettus Research, National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0 +// + +// make_transport logic for mpmd_impl. + + +#include "mpmd_impl.hpp" +#include "../transport/liberio_zero_copy.hpp" +#include +#include + +using namespace uhd; + + +// TODO this does not consider the liberio use case! +uhd::device_addr_t mpmd_impl::get_rx_hints(size_t /* mb_index */) +{ + //device_addr_t rx_hints = _mb[mb_index].recv_args; + device_addr_t rx_hints; // TODO don't ignore what the user tells us + // (default to a large recv buff) + if (not rx_hints.has_key("recv_buff_size")) + { + //For the ethernet transport, the buffer has to be set before creating + //the transport because it is independent of the frame size and # frames + //For nirio, the buffer size is not configurable by the user + #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + //limit buffer resize on macos or it will error + rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH_MACOS); + #elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + //set to half-a-second of buffering at max rate + rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH); + #endif + } + return rx_hints; +} + + +/****************************************************************************** + * General make_transport() + helpers + *****************************************************************************/ +size_t mpmd_impl::identify_mboard_by_sid(const size_t remote_addr) +{ + for (size_t mb_index = 0; mb_index < _mb.size(); mb_index++) { + for (size_t xbar_index = 0; + xbar_index < _mb[mb_index]->num_xbars; + xbar_index++) { + if (_mb[mb_index]->get_xbar_local_addr(xbar_index) == remote_addr) { + return mb_index; + } + } + } + throw uhd::lookup_error(str( + boost::format("Cannot identify mboard for remote address %d") + % remote_addr + )); +} + +both_xports_t mpmd_impl::make_transport( + const sid_t& dst_address, + usrp::device3_impl::xport_type_t xport_type, + const uhd::device_addr_t& args +) { + const size_t mb_index = identify_mboard_by_sid(dst_address.get_dst_addr()); + // Could also be a map... + const std::string xport_type_str = [xport_type](){ + switch (xport_type) { + case CTRL: + return "CTRL"; + case ASYNC_MSG: + return "ASYNC_MSG"; + case RX_DATA: + return "RX_DATA"; + case TX_DATA: + return "TX_DATA"; + default: + UHD_THROW_INVALID_CODE_PATH(); + }; + }(); + + UHD_LOGGER_TRACE("MPMD") + << "Creating new transport of type: " << xport_type_str + << " To mboard: " << mb_index + << " Destination address: " << dst_address.to_pp_string_hex().substr(6) + << " User-defined xport args: " << args.to_string() + ; + + const auto xport_info_list = + _mb[mb_index]->rpc->request_with_token( + "request_xport", + dst_address.get_dst(), + _sid_framer++, // FIXME make sure we only increment if actually valid + xport_type_str + ); + UHD_LOGGER_TRACE("MPMD") + << "request_xport() gave us " << xport_info_list.size() + << " option(s)." + ; + + // This is not very elegant, and needs some kind of factory model or + // whatnot. + auto xport_info = xport_info_list.at(0); // In the future, actually pick one + // sensibly. This is what would + // enable dual Eth support. FIXME + both_xports_t xports; + if (xport_info["type"] == "UDP") { + xports = make_transport_udp(mb_index, xport_info, xport_type, args); +#ifdef HAVE_LIBERIO + } else if (xport_info["type"] == "liberio") { + xports = make_transport_liberio(mb_index, xport_info, xport_type, args); +#endif + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + + UHD_LOGGER_TRACE("MPMD") + << "xport info: send_sid==" << xports.send_sid.to_pp_string_hex() + << " recv_sid==" << xports.recv_sid.to_pp_string_hex() + << " endianness==" + << (xports.endianness == uhd::ENDIANNESS_BIG ? "BE" : "LE") + << " recv_buff_size==" << xports.recv_buff_size + << " send_buff_size==" << xports.send_buff_size + ; + + return xports; +} + + +/****************************************************************************** + * UDP Transport + *****************************************************************************/ +both_xports_t mpmd_impl::make_transport_udp( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t /*xport_type*/, + const uhd::device_addr_t& xport_args +) { + auto &rpc = _mb[mb_index]->rpc; + + transport::zero_copy_xport_params default_buff_args; + // Create actual UDP transport + // TODO don't hardcode these + default_buff_args.send_frame_size = 8000; + default_buff_args.recv_frame_size = 8000; + default_buff_args.num_recv_frames = 32; + default_buff_args.num_send_frames = 32; + + transport::udp_zero_copy::buff_params buff_params; + auto recv = transport::udp_zero_copy::make( + xport_info["ipv4"], + xport_info["port"], + default_buff_args, + buff_params, + xport_args + ); + const uint16_t port = recv->get_local_port(); + const std::string src_ip_addr = recv->get_local_addr(); + xport_info["src_port"] = std::to_string(port); + xport_info["src_ipv4"] = src_ip_addr; + + // Communicate it all back to MPM + if (not rpc->request_with_token("commit_xport", xport_info)) { + UHD_LOG_ERROR("MPMD", "Failed to create UDP transport!"); + throw uhd::runtime_error("commit_xport() failed!"); + } + + // Create both_xports_t object and finish: + both_xports_t xports; + xports.endianness = uhd::ENDIANNESS_BIG; + xports.send_sid = sid_t(xport_info["send_sid"]); + xports.recv_sid = xports.send_sid.reversed(); + xports.recv_buff_size = buff_params.recv_buff_size; + xports.send_buff_size = buff_params.send_buff_size; + xports.recv = recv; // Note: This is a type cast! + xports.send = recv; // This too + return xports; +} + +/****************************************************************************** + * Liberio Transport + *****************************************************************************/ +#ifdef HAVE_LIBERIO +static uint32_t extract_sid_from_pkt(void* pkt, size_t) { + return uhd::sid_t(uhd::wtohx(static_cast(pkt)[1])) + .get_dst(); +} + +uhd::transport::muxed_zero_copy_if::sptr mpmd_impl::make_muxed_liberio_xport( + const std::string &tx_dev, + const std::string &rx_dev, + const uhd::transport::zero_copy_xport_params &buff_args, + const size_t max_muxed_ports +) { + auto base_xport = transport::liberio_zero_copy::make( + tx_dev, rx_dev, buff_args); + + return uhd::transport::muxed_zero_copy_if::make( + base_xport, extract_sid_from_pkt, max_muxed_ports); +} + +both_xports_t mpmd_impl::make_transport_liberio( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t xport_type, + const uhd::device_addr_t& xport_args +) { + auto &rpc = _mb[mb_index]->rpc; + + transport::zero_copy_xport_params default_buff_args; + /* default ones for RX / TX, override below */ + default_buff_args.send_frame_size = 4 * getpagesize(); + default_buff_args.recv_frame_size = 4 * getpagesize(); + default_buff_args.num_recv_frames = 128; + default_buff_args.num_send_frames = 128; + + if (xport_type == CTRL) { + default_buff_args.send_frame_size = 128; + default_buff_args.recv_frame_size = 128; + } else if (xport_type == ASYNC_MSG) { + default_buff_args.send_frame_size = 256; + default_buff_args.recv_frame_size = 256; + } else if (xport_type == RX_DATA) { + default_buff_args.send_frame_size = 64; + } else { + default_buff_args.recv_frame_size = 64; + } + + const std::string tx_dev = xport_info["tx_dev"]; + const std::string rx_dev = xport_info["rx_dev"]; + + both_xports_t xports; + xports.endianness = uhd::ENDIANNESS_LITTLE; + xports.send_sid = sid_t(xport_info["send_sid"]); + xports.recv_sid = xports.send_sid.reversed(); + + // this is kinda ghetto: scale buffer for muxed xports since we share the + // buffer... + float divisor = 1; + if (xport_type == CTRL) + divisor = 16; + else if (xport_type == ASYNC_MSG) + divisor = 4; + + + //if (xport_info["muxed"] == "True") { + //// FIXME tbw + //} + if (xport_type == CTRL) { + UHD_ASSERT_THROW(xport_info["muxed"] == "True"); + if (not _ctrl_dma_xport) { + default_buff_args.send_frame_size = 128; + default_buff_args.recv_frame_size = 128; + _ctrl_dma_xport = make_muxed_liberio_xport(tx_dev, rx_dev, + default_buff_args, int(divisor)); + } + + UHD_LOGGER_TRACE("MPMD") + << "Making (muxed) stream with num " << xports.recv_sid.get_dst(); + xports.recv = _ctrl_dma_xport->make_stream(xports.recv_sid.get_dst()); + } else if (xport_type == ASYNC_MSG) { + UHD_ASSERT_THROW(xport_info["muxed"] == "True"); + if (not _async_msg_dma_xport) { + default_buff_args.send_frame_size = 256; + default_buff_args.recv_frame_size = 256; + _async_msg_dma_xport = make_muxed_liberio_xport( + tx_dev, rx_dev, default_buff_args, int(divisor)); + } + + UHD_LOGGER_TRACE("MPMD") + << "making (muxed) stream with num " << xports.recv_sid.get_dst(); + xports.recv = + _async_msg_dma_xport->make_stream(xports.recv_sid.get_dst()); + } else { + xports.recv = + transport::liberio_zero_copy::make( + tx_dev, rx_dev, default_buff_args); + } + + transport::udp_zero_copy::buff_params buff_params; + buff_params.recv_buff_size = + float(default_buff_args.recv_frame_size) * + float(default_buff_args.num_recv_frames) / divisor; + buff_params.send_buff_size = + float(default_buff_args.send_frame_size) * + float(default_buff_args.num_send_frames) / divisor; + + + // Communicate it all back to MPM + if (not rpc->request_with_token("commit_xport", xport_info)) { + UHD_LOG_ERROR("MPMD", "Failed to create UDP transport!"); + throw uhd::runtime_error("commit_xport() failed!"); + } + + // Finish both_xports_t object and return: + xports.recv_buff_size = buff_params.recv_buff_size; + xports.send_buff_size = buff_params.send_buff_size; + xports.send = xports.recv; + return xports; +} + +#endif /* HAVE_LIBERIO */ -- cgit v1.2.3