From b12b9465ed13bf2eb7c0fe379c22dcb3b86c4054 Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Wed, 15 Nov 2017 17:21:27 -0800 Subject: mpm/mpmd: Move to request_xport()/commit_xport() architecture This commit combines code from various branches to finally enable both UDP and Liberio transports. --- host/CMakeLists.txt | 10 +- host/lib/usrp/mpmd/CMakeLists.txt | 6 + host/lib/usrp/mpmd/mpmd_impl.cpp | 113 --------- host/lib/usrp/mpmd/mpmd_impl.hpp | 40 ++- host/lib/usrp/mpmd/mpmd_xport.cpp | 303 ++++++++++++++++++++++ mpm/python/usrp_mpm/liberiotable.py | 18 +- mpm/python/usrp_mpm/mpmtypes.py | 37 ++- mpm/python/usrp_mpm/periph_manager/base.py | 88 +++++-- mpm/python/usrp_mpm/periph_manager/n310.py | 393 +++++++++++++++++++++-------- mpm/python/usrp_mpm/rpc_server.py | 16 +- 10 files changed, 733 insertions(+), 291 deletions(-) create mode 100644 host/lib/usrp/mpmd/mpmd_xport.cpp diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt index 61df25579..18c70af24 100644 --- a/host/CMakeLists.txt +++ b/host/CMakeLists.txt @@ -321,11 +321,11 @@ PYTHON_CHECK_MODULE( HAVE_PYTHON_MODULE_MAKO ) -PYTHON_CHECK_MODULE( - "requests 2.0 or greater" - "requests" "requests.__version__ >= '2.0'" - HAVE_PYTHON_MODULE_REQUESTS -) +#PYTHON_CHECK_MODULE( + #"requests 2.0 or greater" + #"requests" "requests.__version__ >= '2.0'" + #HAVE_PYTHON_MODULE_REQUESTS +#) ######################################################################## # Create Uninstall Target diff --git a/host/lib/usrp/mpmd/CMakeLists.txt b/host/lib/usrp/mpmd/CMakeLists.txt index 0530c9531..5bc8e2d08 100644 --- a/host/lib/usrp/mpmd/CMakeLists.txt +++ b/host/lib/usrp/mpmd/CMakeLists.txt @@ -16,9 +16,15 @@ # IF(ENABLE_MPMD) + IF(ENABLE_LIBERIO) + MESSAGE(STATUS "Compiling MPM with liberio support...") + ADD_DEFINITIONS(-DHAVE_LIBERIO) + ENDIF(ENABLE_LIBERIO) + LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_impl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_mboard_impl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_xport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_image_loader.cpp ) ENDIF(ENABLE_MPMD) diff --git a/host/lib/usrp/mpmd/mpmd_impl.cpp b/host/lib/usrp/mpmd/mpmd_impl.cpp index d2d15e32a..cada95345 100644 --- a/host/lib/usrp/mpmd/mpmd_impl.cpp +++ b/host/lib/usrp/mpmd/mpmd_impl.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -433,118 +432,6 @@ size_t mpmd_impl::allocate_xbar_local_addr() return new_local_addr; } -size_t mpmd_impl::identify_mboard_by_sid(const size_t remote_addr) -{ - for (size_t mb_index = 0; mb_index < _mb.size(); mb_index++) { - for (size_t xbar_index = 0; - xbar_index < _mb[mb_index]->num_xbars; - xbar_index++) { - if (_mb[mb_index]->get_xbar_local_addr(xbar_index) == remote_addr) { - return mb_index; - } - } - } - throw uhd::lookup_error(str( - boost::format("Cannot identify mboard for remote address %d") - % remote_addr - )); -} - - -/***************************************************************************** - * API - ****************************************************************************/ -// TODO this does not consider the liberio use case! -uhd::device_addr_t mpmd_impl::get_rx_hints(size_t /* mb_index */) -{ - //device_addr_t rx_hints = _mb[mb_index].recv_args; - device_addr_t rx_hints; // TODO don't ignore what the user tells us - // (default to a large recv buff) - if (not rx_hints.has_key("recv_buff_size")) - { - //For the ethernet transport, the buffer has to be set before creating - //the transport because it is independent of the frame size and # frames - //For nirio, the buffer size is not configurable by the user - #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) - //limit buffer resize on macos or it will error - rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH_MACOS); - #elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) - //set to half-a-second of buffering at max rate - rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH); - #endif - } - return rx_hints; -} - - -// frame_size_t determine_max_frame_size(const std::string &addr, -// const frame_size_t &user_frame_size){ -// transport::udp_simple::sptr udp = -// transport::udp_simple::make_connected(addr, -// std::to_string(MPM_DISCOVERY_PORT)); -// std::vector buffer(std::max(user_frame_size.rec)) -// } -// Everything fake below here - -both_xports_t mpmd_impl::make_transport( - const sid_t& address, - usrp::device3_impl::xport_type_t xport_type, - const uhd::device_addr_t& args -) { - const size_t mb_index = identify_mboard_by_sid(address.get_dst_addr()); - - UHD_LOGGER_TRACE("MPMD") - << "Creating new transport of type: " - << (xport_type == CTRL ? "CTRL" : (xport_type == RX_DATA ? "RX" : "TX")) - << " To mboard: " << mb_index - << " Destination address: " << address.to_pp_string_hex().substr(6) - << " User-defined xport args: " << args.to_string() - ; - - both_xports_t xports; - const uhd::device_addr_t& xport_args = (xport_type == CTRL) ? uhd::device_addr_t() : args; - transport::zero_copy_xport_params default_buff_args; - - std::string interface_addr = _mb[mb_index]->mb_args.get("addr"); - UHD_ASSERT_THROW(not interface_addr.empty()); - const uint32_t xbar_src_addr = address.get_src_addr(); - const uint32_t xbar_src_dst = 0; - - default_buff_args.send_frame_size = 8000; - default_buff_args.recv_frame_size = 8000; - default_buff_args.num_recv_frames = 32; - default_buff_args.num_send_frames = 32; - // hardcode frame size for now - - transport::udp_zero_copy::buff_params buff_params; - auto recv = transport::udp_zero_copy::make( - interface_addr, - BOOST_STRINGIZE(49153), - default_buff_args, - buff_params, - xport_args); - uint16_t port = recv->get_local_port(); - - xports.endianness = uhd::ENDIANNESS_BIG; - xports.send_sid = _mb[mb_index]->allocate_sid(port, - address, xbar_src_addr, xbar_src_dst, _sid_framer++ - ); - xports.recv_sid = xports.send_sid.reversed(); - xports.recv_buff_size = buff_params.recv_buff_size; - xports.send_buff_size = buff_params.send_buff_size; - xports.recv = recv; // Note: This is a type cast! - xports.send = xports.recv; - UHD_LOGGER_TRACE("MPMD") - << "xport info: send_sid==" << xports.send_sid.to_pp_string_hex() - << " recv_sid==" << xports.recv_sid.to_pp_string_hex() - << " endianness==" - << (xports.endianness == uhd::ENDIANNESS_BIG ? "BE" : "LE") - << " recv_buff_size==" << xports.recv_buff_size - << " send_buff_size==" << xports.send_buff_size - ; - - return xports; -} /***************************************************************************** * Find, Factory & Registry diff --git a/host/lib/usrp/mpmd/mpmd_impl.hpp b/host/lib/usrp/mpmd/mpmd_impl.hpp index 7eca982a4..3143378dd 100644 --- a/host/lib/usrp/mpmd/mpmd_impl.hpp +++ b/host/lib/usrp/mpmd/mpmd_impl.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include static const size_t MPMD_RX_SW_BUFF_SIZE_ETH = 0x2000000;//32MiB For an ~8k frame size any size >32MiB is just wasted buffer space @@ -36,13 +37,6 @@ static const char MPM_ECHO_CMD[] = "MPM-ECHO"; static const size_t MPMD_10GE_DATA_FRAME_MAX_SIZE = 8000; // CHDR packet size in bytes -struct frame_size_t -{ - size_t recv_frame_size; - size_t send_frame_size; -}; - - /*! Stores all attributes specific to a single MPM device */ class mpmd_mboard_impl @@ -195,10 +189,42 @@ class mpmd_impl : public uhd::usrp::device3_impl */ size_t identify_mboard_by_sid(const size_t remote_addr); + using xport_info_t = std::map; + using xport_info_list_t = std::vector>; + + uhd::both_xports_t make_transport_udp( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t xport_type, + const uhd::device_addr_t& xport_args + ); + +#ifdef HAVE_LIBERIO + /*! Create a muxed liberio transport for control packets */ + uhd::transport::muxed_zero_copy_if::sptr make_muxed_liberio_xport( + const std::string &tx_dev, + const std::string &rx_dev, + const uhd::transport::zero_copy_xport_params &buff_args, + const size_t max_muxed_ports + ); + + uhd::both_xports_t make_transport_liberio( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t xport_type, + const uhd::device_addr_t& xport_args + ); +#endif /************************************************************************* * Private attributes ************************************************************************/ + // FIXME move the next two into their own transport manager class + //! Control transport for one liberio connection + uhd::transport::muxed_zero_copy_if::sptr _ctrl_dma_xport; + //! Control transport for one liberio connection + uhd::transport::muxed_zero_copy_if::sptr _async_msg_dma_xport; + uhd::dict recv_args; uhd::dict send_args; diff --git a/host/lib/usrp/mpmd/mpmd_xport.cpp b/host/lib/usrp/mpmd/mpmd_xport.cpp new file mode 100644 index 000000000..5dd29a6f7 --- /dev/null +++ b/host/lib/usrp/mpmd/mpmd_xport.cpp @@ -0,0 +1,303 @@ +// +// Copyright 2017 Ettus Research, National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0 +// + +// make_transport logic for mpmd_impl. + + +#include "mpmd_impl.hpp" +#include "../transport/liberio_zero_copy.hpp" +#include +#include + +using namespace uhd; + + +// TODO this does not consider the liberio use case! +uhd::device_addr_t mpmd_impl::get_rx_hints(size_t /* mb_index */) +{ + //device_addr_t rx_hints = _mb[mb_index].recv_args; + device_addr_t rx_hints; // TODO don't ignore what the user tells us + // (default to a large recv buff) + if (not rx_hints.has_key("recv_buff_size")) + { + //For the ethernet transport, the buffer has to be set before creating + //the transport because it is independent of the frame size and # frames + //For nirio, the buffer size is not configurable by the user + #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + //limit buffer resize on macos or it will error + rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH_MACOS); + #elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + //set to half-a-second of buffering at max rate + rx_hints["recv_buff_size"] = boost::lexical_cast(MPMD_RX_SW_BUFF_SIZE_ETH); + #endif + } + return rx_hints; +} + + +/****************************************************************************** + * General make_transport() + helpers + *****************************************************************************/ +size_t mpmd_impl::identify_mboard_by_sid(const size_t remote_addr) +{ + for (size_t mb_index = 0; mb_index < _mb.size(); mb_index++) { + for (size_t xbar_index = 0; + xbar_index < _mb[mb_index]->num_xbars; + xbar_index++) { + if (_mb[mb_index]->get_xbar_local_addr(xbar_index) == remote_addr) { + return mb_index; + } + } + } + throw uhd::lookup_error(str( + boost::format("Cannot identify mboard for remote address %d") + % remote_addr + )); +} + +both_xports_t mpmd_impl::make_transport( + const sid_t& dst_address, + usrp::device3_impl::xport_type_t xport_type, + const uhd::device_addr_t& args +) { + const size_t mb_index = identify_mboard_by_sid(dst_address.get_dst_addr()); + // Could also be a map... + const std::string xport_type_str = [xport_type](){ + switch (xport_type) { + case CTRL: + return "CTRL"; + case ASYNC_MSG: + return "ASYNC_MSG"; + case RX_DATA: + return "RX_DATA"; + case TX_DATA: + return "TX_DATA"; + default: + UHD_THROW_INVALID_CODE_PATH(); + }; + }(); + + UHD_LOGGER_TRACE("MPMD") + << "Creating new transport of type: " << xport_type_str + << " To mboard: " << mb_index + << " Destination address: " << dst_address.to_pp_string_hex().substr(6) + << " User-defined xport args: " << args.to_string() + ; + + const auto xport_info_list = + _mb[mb_index]->rpc->request_with_token( + "request_xport", + dst_address.get_dst(), + _sid_framer++, // FIXME make sure we only increment if actually valid + xport_type_str + ); + UHD_LOGGER_TRACE("MPMD") + << "request_xport() gave us " << xport_info_list.size() + << " option(s)." + ; + + // This is not very elegant, and needs some kind of factory model or + // whatnot. + auto xport_info = xport_info_list.at(0); // In the future, actually pick one + // sensibly. This is what would + // enable dual Eth support. FIXME + both_xports_t xports; + if (xport_info["type"] == "UDP") { + xports = make_transport_udp(mb_index, xport_info, xport_type, args); +#ifdef HAVE_LIBERIO + } else if (xport_info["type"] == "liberio") { + xports = make_transport_liberio(mb_index, xport_info, xport_type, args); +#endif + } else { + UHD_THROW_INVALID_CODE_PATH(); + } + + UHD_LOGGER_TRACE("MPMD") + << "xport info: send_sid==" << xports.send_sid.to_pp_string_hex() + << " recv_sid==" << xports.recv_sid.to_pp_string_hex() + << " endianness==" + << (xports.endianness == uhd::ENDIANNESS_BIG ? "BE" : "LE") + << " recv_buff_size==" << xports.recv_buff_size + << " send_buff_size==" << xports.send_buff_size + ; + + return xports; +} + + +/****************************************************************************** + * UDP Transport + *****************************************************************************/ +both_xports_t mpmd_impl::make_transport_udp( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t /*xport_type*/, + const uhd::device_addr_t& xport_args +) { + auto &rpc = _mb[mb_index]->rpc; + + transport::zero_copy_xport_params default_buff_args; + // Create actual UDP transport + // TODO don't hardcode these + default_buff_args.send_frame_size = 8000; + default_buff_args.recv_frame_size = 8000; + default_buff_args.num_recv_frames = 32; + default_buff_args.num_send_frames = 32; + + transport::udp_zero_copy::buff_params buff_params; + auto recv = transport::udp_zero_copy::make( + xport_info["ipv4"], + xport_info["port"], + default_buff_args, + buff_params, + xport_args + ); + const uint16_t port = recv->get_local_port(); + const std::string src_ip_addr = recv->get_local_addr(); + xport_info["src_port"] = std::to_string(port); + xport_info["src_ipv4"] = src_ip_addr; + + // Communicate it all back to MPM + if (not rpc->request_with_token("commit_xport", xport_info)) { + UHD_LOG_ERROR("MPMD", "Failed to create UDP transport!"); + throw uhd::runtime_error("commit_xport() failed!"); + } + + // Create both_xports_t object and finish: + both_xports_t xports; + xports.endianness = uhd::ENDIANNESS_BIG; + xports.send_sid = sid_t(xport_info["send_sid"]); + xports.recv_sid = xports.send_sid.reversed(); + xports.recv_buff_size = buff_params.recv_buff_size; + xports.send_buff_size = buff_params.send_buff_size; + xports.recv = recv; // Note: This is a type cast! + xports.send = recv; // This too + return xports; +} + +/****************************************************************************** + * Liberio Transport + *****************************************************************************/ +#ifdef HAVE_LIBERIO +static uint32_t extract_sid_from_pkt(void* pkt, size_t) { + return uhd::sid_t(uhd::wtohx(static_cast(pkt)[1])) + .get_dst(); +} + +uhd::transport::muxed_zero_copy_if::sptr mpmd_impl::make_muxed_liberio_xport( + const std::string &tx_dev, + const std::string &rx_dev, + const uhd::transport::zero_copy_xport_params &buff_args, + const size_t max_muxed_ports +) { + auto base_xport = transport::liberio_zero_copy::make( + tx_dev, rx_dev, buff_args); + + return uhd::transport::muxed_zero_copy_if::make( + base_xport, extract_sid_from_pkt, max_muxed_ports); +} + +both_xports_t mpmd_impl::make_transport_liberio( + const size_t mb_index, + xport_info_t &xport_info, + const xport_type_t xport_type, + const uhd::device_addr_t& xport_args +) { + auto &rpc = _mb[mb_index]->rpc; + + transport::zero_copy_xport_params default_buff_args; + /* default ones for RX / TX, override below */ + default_buff_args.send_frame_size = 4 * getpagesize(); + default_buff_args.recv_frame_size = 4 * getpagesize(); + default_buff_args.num_recv_frames = 128; + default_buff_args.num_send_frames = 128; + + if (xport_type == CTRL) { + default_buff_args.send_frame_size = 128; + default_buff_args.recv_frame_size = 128; + } else if (xport_type == ASYNC_MSG) { + default_buff_args.send_frame_size = 256; + default_buff_args.recv_frame_size = 256; + } else if (xport_type == RX_DATA) { + default_buff_args.send_frame_size = 64; + } else { + default_buff_args.recv_frame_size = 64; + } + + const std::string tx_dev = xport_info["tx_dev"]; + const std::string rx_dev = xport_info["rx_dev"]; + + both_xports_t xports; + xports.endianness = uhd::ENDIANNESS_LITTLE; + xports.send_sid = sid_t(xport_info["send_sid"]); + xports.recv_sid = xports.send_sid.reversed(); + + // this is kinda ghetto: scale buffer for muxed xports since we share the + // buffer... + float divisor = 1; + if (xport_type == CTRL) + divisor = 16; + else if (xport_type == ASYNC_MSG) + divisor = 4; + + + //if (xport_info["muxed"] == "True") { + //// FIXME tbw + //} + if (xport_type == CTRL) { + UHD_ASSERT_THROW(xport_info["muxed"] == "True"); + if (not _ctrl_dma_xport) { + default_buff_args.send_frame_size = 128; + default_buff_args.recv_frame_size = 128; + _ctrl_dma_xport = make_muxed_liberio_xport(tx_dev, rx_dev, + default_buff_args, int(divisor)); + } + + UHD_LOGGER_TRACE("MPMD") + << "Making (muxed) stream with num " << xports.recv_sid.get_dst(); + xports.recv = _ctrl_dma_xport->make_stream(xports.recv_sid.get_dst()); + } else if (xport_type == ASYNC_MSG) { + UHD_ASSERT_THROW(xport_info["muxed"] == "True"); + if (not _async_msg_dma_xport) { + default_buff_args.send_frame_size = 256; + default_buff_args.recv_frame_size = 256; + _async_msg_dma_xport = make_muxed_liberio_xport( + tx_dev, rx_dev, default_buff_args, int(divisor)); + } + + UHD_LOGGER_TRACE("MPMD") + << "making (muxed) stream with num " << xports.recv_sid.get_dst(); + xports.recv = + _async_msg_dma_xport->make_stream(xports.recv_sid.get_dst()); + } else { + xports.recv = + transport::liberio_zero_copy::make( + tx_dev, rx_dev, default_buff_args); + } + + transport::udp_zero_copy::buff_params buff_params; + buff_params.recv_buff_size = + float(default_buff_args.recv_frame_size) * + float(default_buff_args.num_recv_frames) / divisor; + buff_params.send_buff_size = + float(default_buff_args.send_frame_size) * + float(default_buff_args.num_send_frames) / divisor; + + + // Communicate it all back to MPM + if (not rpc->request_with_token("commit_xport", xport_info)) { + UHD_LOG_ERROR("MPMD", "Failed to create UDP transport!"); + throw uhd::runtime_error("commit_xport() failed!"); + } + + // Finish both_xports_t object and return: + xports.recv_buff_size = buff_params.recv_buff_size; + xports.send_buff_size = buff_params.send_buff_size; + xports.send = xports.recv; + return xports; +} + +#endif /* HAVE_LIBERIO */ diff --git a/mpm/python/usrp_mpm/liberiotable.py b/mpm/python/usrp_mpm/liberiotable.py index 8f70c5ebe..6f2454379 100644 --- a/mpm/python/usrp_mpm/liberiotable.py +++ b/mpm/python/usrp_mpm/liberiotable.py @@ -19,15 +19,13 @@ class LiberioDispatcherTable(object): label -- A label that can be used by udev to find a UIO device """ - MTU_OFFSET = 0x80000 - def __init__(self, label): self.log = get_logger(label) self._regs = UIO(label=label, read_only=False) self.poke32 = self._regs.poke32 self.peek32 = self._regs.peek32 - def set_route(self, sid, dma_channel, mtu): + def set_route(self, sid, dma_channel): """ Sets up routing in the Liberio dispatcher. From sid, only the destination part is important. After this call, any CHDR packet with the @@ -35,14 +33,10 @@ class LiberioDispatcherTable(object): sid -- Full SID, but only destination part matters. dma_channel -- The DMA channel to which these packets should get routed. - mtu -- Max size of bytes per packet. This is important to get right. The - DMA implementation will pad packets smaller than MTU up to the - mtu value, so making MTU extra large is inefficient. Packets - larger than MTU will get chopped up. Even worse. """ self.log.debug( - "Routing SID `{sid}' to DMA channel `{chan}', MTU {mtu} bytes.".format( - sid=str(sid), chan=dma_channel, mtu=mtu + "Routing SID `{sid}' to DMA channel `{chan}'.".format( + sid=str(sid), chan=dma_channel ) ) def poke_and_trace(addr, data): @@ -51,18 +45,12 @@ class LiberioDispatcherTable(object): addr, data )) self.poke32(addr, data) - # Poke reg for destination channel - # Poke reg for MTU try: poke_and_trace( 0 + 4 * sid.dst_ep, dma_channel, ) - poke_and_trace( - self.MTU_OFFSET + 4 * sid.dst_ep, - int(mtu / 8), - ) except Exception as ex: self.log.error( "Unexpected exception while setting route: %s", diff --git a/mpm/python/usrp_mpm/mpmtypes.py b/mpm/python/usrp_mpm/mpmtypes.py index 48f40f47e..a7f94fca8 100644 --- a/mpm/python/usrp_mpm/mpmtypes.py +++ b/mpm/python/usrp_mpm/mpmtypes.py @@ -47,22 +47,46 @@ class SharedState(object): class SID(object): - def __init__(self, sid=0): - self.src_addr = sid >> 24 - self.src_ep = (sid >> 16) & 0xFF - self.dst_addr = (sid >> 8) & 0xFF - self.dst_ep = sid & 0xFF + """ + Python representation of a 32-bit SID. + """ + def __init__(self, sid=None): + sid = sid or 0 + if isinstance(sid, str): + src, dst = sid.split(">") + if src.find(':') != -1: + self.src_addr, self.src_ep = \ + [int(x, 16) for x in src.split(':', 2)] + else: + self.src_addr, self.src_ep = \ + [int(x, 10) for x in src.split('.', 2)] + if dst.find(':') != -1: + self.dst_addr, self.dst_ep = \ + [int(x, 16) for x in dst.split(':', 2)] + else: + self.dst_addr, self.dst_ep = \ + [int(x, 10) for x in dst.split('.', 2)] + else: + print(sid) + self.src_addr = sid >> 24 + self.src_ep = (sid >> 16) & 0xFF + self.dst_addr = (sid >> 8) & 0xFF + self.dst_ep = sid & 0xFF def set_src_addr(self, new_addr): - self.src_addr = new_addr & 0xFF + " Return source address (e.g. 02:30>00:01 -> 2) " + self.src_addr = new_addr & 0xFF def set_dst_addr(self, new_addr): + " Return destination address (e.g. 02:30>00:01 -> 0) " self.dst_addr = new_addr & 0xFF def set_src_ep(self, new_addr): + " Return source endpoint (e.g. 02:30>00:01 -> 0x30) " self.src_ep = new_addr & 0xFF def set_dst_ep(self, new_addr): + " Return destination endpoint (e.g. 02:30>00:01 -> 0) " self.dst_ep = new_addr & 0xFF def reversed(self): @@ -75,6 +99,7 @@ class SID(object): return new_sid def get(self): + " Return SID as 32-bit number " return (self.src_addr << 24) | (self.src_ep << 16) | (self.dst_addr << 8) | self.dst_ep def __repr__(self): diff --git a/mpm/python/usrp_mpm/periph_manager/base.py b/mpm/python/usrp_mpm/periph_manager/base.py index 0a3882102..a619247ad 100644 --- a/mpm/python/usrp_mpm/periph_manager/base.py +++ b/mpm/python/usrp_mpm/periph_manager/base.py @@ -28,7 +28,6 @@ from six import iteritems, itervalues from ..mpmlog import get_logger from .udev import get_eeprom_paths from .udev import get_spidev_nodes -from usrp_mpm import net from usrp_mpm import dtoverlay from usrp_mpm import eeprom from usrp_mpm.rpc_server import no_claim, no_rpc @@ -154,8 +153,6 @@ class PeriphManagerBase(object): self._init_dboards(args.override_db_pids) self._available_endpoints = list(range(256)) self._init_args = {} - self.log.info("Identifying available network interfaces...") - self._chdr_interfaces = self._init_interfaces(self.chdr_interfaces) def _init_mboard_with_eeprom(self): """ @@ -309,23 +306,6 @@ class PeriphManagerBase(object): self.dboards.append(db_class(dboard_idx, **dboard_info)) self.log.info("Found {} daughterboard(s).".format(len(self.dboards))) - def _init_interfaces(self, possible_ifaces): - """ - Initialize the list of network interfaces - """ - self.log.trace("Testing available interfaces out of `{}'".format( - possible_ifaces - )) - valid_ifaces = net.get_valid_interfaces(possible_ifaces) - if len(valid_ifaces): - self.log.debug("Found CHDR interfaces: `{}'".format(valid_ifaces)) - else: - self.log.warning("No CHDR interfaces found!") - return { - x: net.get_iface_info(x) - for x in valid_ifaces - } - def init(self, args): """ Run the mboard initialization. This is typically done at the beginning @@ -626,3 +606,71 @@ class PeriphManagerBase(object): "is not implemented.", dboard_idx) raise NotImplementedError + ####################################################################### + # Transport API + ####################################################################### + def request_xport( + self, + dst_address, + suggested_src_address, + xport_type, + ): + """ + When setting up a CHDR connection, this is the first call to be + made. This function will return a list of dictionaries, each + describing a way to open an CHDR connection. + All transports requested are bidirectional. + + The callee must maintain a lock on the available CHDR xports. After + calling request_xport(), the caller needs to pick one of the + dictionaries, possibly amend data (e.g., if the connection is an + Ethernet connection, then we need to know the source port, but more + details on that in commit_xport()'s documentation). + One way to implement a lock is to simply lock a mutex here and + unlock it in commit_xport(), even though there are probably more + nuanced solutions. + + Arguments: + dst_sid -- The destination part of the connection, i.e., which + RFNoC block are we connecting to. Example: 0x0230 + suggested_src_sid -- The source part of the connection, i.e., + what's the source address of packets going to + the destination at dst_sid. This is a + suggestion, MPM can override this. Example: + 0x0001. + xport_type -- One of the following strings: CTRL, ASYNC_MSG, + TX_DATA, RX_DATA. See also xports_type_t in UHD. + + The return value is a list of dictionaries. Every dictionary has + the following key/value pairs: + - type: Type of transport, e.g., "UDP", "liberio". + - ipv4 (UDP only): IPv4 address to connect to. + - port (UDP only): IP port to connect to. + - rx_mtu: In bytes, the max size RX packets can have (RX means going + from device to UHD) + - tx_mtu: In bytes, the max size TX packets can have (TX means going + from UHD to device) + """ + raise NotImplementedError("request_xport() not implemented.") + + def commit_xport(self, xport_info): + """ + When setting up a CHDR connection, this is the second call to be + made. + + Arguments: + xport_info -- A dictionary (string -> string). The dictionary must + have been originally created by request_xport(), but + additional key/value pairs need to be added. + + All transports need to also provide: + - rx_mtu: In bytes, the max number of bytes going from device to UHD + - tx_mtu: In bytes, the max number of bytes going from UHD to device + + UDP transports need to also provide: + - src_ipv4: IPv4 address the connection is coming from. + - src_port: IP port the connection is coming from. + """ + raise NotImplementedError("commit_xport() not implemented.") + + diff --git a/mpm/python/usrp_mpm/periph_manager/n310.py b/mpm/python/usrp_mpm/periph_manager/n310.py index a011d3559..93772b1ad 100644 --- a/mpm/python/usrp_mpm/periph_manager/n310.py +++ b/mpm/python/usrp_mpm/periph_manager/n310.py @@ -22,7 +22,7 @@ from __future__ import print_function import os import copy import shutil -from six import iteritems +from six import iteritems, itervalues from builtins import object from .base import PeriphManagerBase from ..net import get_iface_addrs @@ -31,6 +31,7 @@ from ..net import get_mac_addr from ..mpmtypes import SID from usrp_mpm.uio import UIO from usrp_mpm.rpc_server import no_claim, no_rpc +from usrp_mpm import net from ..sysfs_gpio import SysFSGPIO from ..ethtable import EthDispatcherTable from ..liberiotable import LiberioDispatcherTable @@ -42,6 +43,9 @@ N3XX_DEFAULT_TIME_SOURCE = 'internal' N3XX_DEFAULT_ENABLE_GPS = True N3XX_DEFAULT_ENABLE_FPGPIO = True +############################################################################### +# Additional peripheral controllers specific to Magnesium +############################################################################### class TCA6424(object): """ Abstraction layer for the port/gpio expander @@ -204,6 +208,224 @@ class FP_GPIO(object): assert index in range(self._gpiosize) return self._gpios.get(self._offset+index) +############################################################################### +# Transport managers +############################################################################### +class XportMgrUDP(object): + """ + Transport manager for UDP connections + """ + # Map Eth devices to UIO labels + eth_tables = {'eth1': 'misc-enet-regs0', 'eth2': 'misc-enet-regs1'} + + def __init__(self, possible_chdr_ifaces, log): + self.log = log + self._possible_chdr_ifaces = possible_chdr_ifaces + self.log.info("Identifying available network interfaces...") + self._chdr_ifaces = \ + self._init_interfaces(self._possible_chdr_ifaces) + self._eth_dispatchers = { + x: EthDispatcherTable(self.eth_tables.get(x)) + for x in list(self._chdr_ifaces.keys()) + } + for ifname, table in iteritems(self._eth_dispatchers): + table.set_ipv4_addr(self._chdr_ifaces[ifname]['ip_addr']) + + def _init_interfaces(self, possible_ifaces): + """ + Initialize the list of network interfaces + """ + self.log.trace("Testing available interfaces out of `{}'".format( + possible_ifaces + )) + valid_ifaces = net.get_valid_interfaces(possible_ifaces) + if len(valid_ifaces): + self.log.debug("Found CHDR interfaces: `{}'".format(valid_ifaces)) + else: + self.log.warning("No CHDR interfaces found!") + return { + x: net.get_iface_info(x) + for x in valid_ifaces + } + + def init(self, args): + """ + Call this when the user calls 'init' on the periph manager + """ + # TODO re-run _init_interfaces, IP addresses could have changed since + # bootup + for _, table in iteritems(self._eth_dispatchers): + if 'forward_eth' in args or 'forward_bcast' in args: + table.set_forward_policy( + args.get('forward_eth', False), + args.get('forward_bcast', False) + ) + if 'preload_ethtables' in args: + self._preload_ethtables( + self._eth_dispatchers, + args['preload_ethtables'] + ) + + def deinit(self): + " Clean up after a session terminates " + pass + + def _preload_ethtables(self, eth_dispatchers, table_file): + """ + Populates the ethernet tables from a JSON file + """ + import json + try: + eth_table_data = json.load(open(table_file)) + except ValueError as ex: + self.log.warning( + "Bad values in preloading table file: %s", + str(ex) + ) + return + self.log.info( + "Preloading Ethernet dispatch tables from JSON file `%s'.", + table_file + ) + for eth_iface, data in iteritems(eth_table_data): + if eth_iface not in eth_dispatchers: + self.log.warning( + "Request to preload eth dispatcher table for " + "iface `{}', but no such interface is " + "registered. Known interfaces: {}".format( + str(eth_iface), + ",".join(eth_dispatchers.keys()) + ) + ) + continue + eth_dispatcher = eth_dispatchers[eth_iface] + self.log.debug("Preloading {} dispatch table".format(eth_iface)) + try: + for dst_ep, udp_data in iteritems(data): + sid = SID() + sid.set_dst_ep(int(dst_ep)) + eth_dispatcher.set_route( + sid, + udp_data['ip_addr'], + udp_data['port'], + udp_data.get('mac_addr', None) + ) + except ValueError as ex: + self.log.warning( + "Bad values in preloading table file: %s", + str(ex) + ) + + def request_xport( + self, + sid, + xport_type, + ): + """ + Return UDP xport info + """ + assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA') + xport_info = { + 'type': 'UDP', + # TODO what about eth2, huh? + 'ipv4': str(self._chdr_ifaces['eth1']['ip_addr']), + 'port': '49153', # FIXME no hardcoding + 'send_sid': str(sid) + } + return [xport_info] + + def commit_xport(self, sid, xport_info): + """ + fuu + """ + # TODO do error checking on the xport_info + self.log.trace("Committing UDP transport using xport_info `%s'", + str(xport_info)) + sender_addr = xport_info['src_ipv4'] + sender_port = int(xport_info['src_port']) + self.log.trace("Incoming connection is coming from %s:%d", + sender_addr, sender_port) + mac_addr = get_mac_addr(sender_addr) + if mac_addr is None: + raise RuntimeError( + "Could not find MAC address for IP address {}".format( + sender_addr)) + self.log.trace("Incoming connection is coming from %s", + mac_addr) + # Remove hardcodings in the following lines FIXME + my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO + my_xbar.set_route(sid.src_addr, 0) # TODO + eth_dispatcher = self._eth_dispatchers['eth1'] # TODO + eth_dispatcher.set_route(sid.reversed(), sender_addr, sender_port) + self.log.trace("UDP transport successfully committed!") + return True + +class XportMgrLiberio(object): + """ + Transport manager for UDP connections + """ + # udev label for the UIO device that controls the DMA engine + liberio_label = 'liberio' + + def __init__(self, log): + self.log = log + self._dma_dispatcher = LiberioDispatcherTable(self.liberio_label) + self._data_chan_ctr = 0 + self._max_chan = 10 # TODO get this number from somewhere + + def init(self, args): + """ + Call this when the user calls 'init' on the periph manager + """ + pass + + def deinit(self): + " Clean up after a session terminates " + self._data_chan_ctr = 0 + + def request_xport( + self, + sid, + xport_type, + ): + """ + Return liberio xport info + """ + assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA') + if xport_type == 'CTRL': + chan = 0 + elif xport_type == 'ASYNC_MSG': + chan = 1 + else: + chan = 2 + self._data_chan_ctr + self._data_chan_ctr += 1 + xport_info = { + 'type': 'liberio', + 'send_sid': str(sid), + 'muxed': str(xport_type in ('CTRL', 'ASYNC_MSG')), + 'dma_chan': str(chan), + 'tx_dev': "/dev/tx-dma{}".format(chan), + 'rx_dev': "/dev/rx-dma{}".format(chan), + } + self.log.trace("Liberio: Chan: {} TX Device: {} RX Device: {}".format( + chan, xport_info['tx_dev'], xport_info['rx_dev'])) + self.log.trace("Liberio channel is muxed: %s", + "Yes" if xport_info['muxed'] else "No") + return [xport_info] + + def commit_xport(self, sid, xport_info): + " Commit liberio transport " + chan = int(xport_info['dma_chan']) + my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO + my_xbar.set_route(sid.src_addr, 2) # TODO + self._dma_dispatcher.set_route(sid.reversed(), chan) + self.log.trace("Liberio transport successfully committed!") + return True + + +############################################################################### +# Main Class +############################################################################### class n310(PeriphManagerBase): """ Holds N310 specific attributes and methods @@ -230,7 +452,6 @@ class n310(PeriphManagerBase): dboard_spimaster_addrs = ["e0006000.spi", "e0007000.spi"] chdr_interfaces = ['eth1', 'eth2'] # N310-specific settings - eth_tables = {'eth1': 'misc-enet-regs0', 'eth2': 'misc-enet-regs1'} # Path to N310 FPGA bin file # This file will always contain the current image, regardless of SFP type, # dboard, etc. The host is responsible for providing a compatible image @@ -242,8 +463,6 @@ class n310(PeriphManagerBase): 'callback': "update_fpga", }, } - # udev label for the UIO device that controls the DMA engine - liberio_label = 'liberio' def __init__(self, args): super(n310, self).__init__(args) @@ -273,16 +492,11 @@ class n310(PeriphManagerBase): self._clock_source = None self._time_source = None self._init_ref_clock_and_time(args.default_args) - # Define some attributes so PyLint stays quiet - self._eth_dispatchers = None - self._dma_dispatcher = None - # Init Ethernet - self._eth_dispatchers = { - x: EthDispatcherTable(self.eth_tables.get(x)) - for x in list(self._chdr_interfaces.keys()) + # Init CHDR transports + self._xport_mgrs = { + 'udp': XportMgrUDP(self.chdr_interfaces, self.log), + 'liberio': XportMgrLiberio(self.log), } - for ifname, table in iteritems(self._eth_dispatchers): - table.set_ipv4_addr(self._chdr_interfaces[ifname]['ip_addr']) # Init complete. self.log.info("mboard info: {}".format(self.mboard_info)) @@ -315,112 +529,71 @@ class n310(PeriphManagerBase): dispatchers accordingly. """ result = super(n310, self).init(args) - for _, table in iteritems(self._eth_dispatchers): - if 'forward_eth' in args or 'forward_bcast' in args: - table.set_forward_policy( - args.get('forward_eth', False), - args.get('forward_bcast', False) - ) - if 'preload_ethtables' in args: - self._preload_ethtables( - self._eth_dispatchers, - args['preload_ethtables'] - ) - self._dma_dispatcher = LiberioDispatcherTable(self.liberio_label) + for xport_mgr in itervalues(self._xport_mgrs): + xport_mgr.init(args) return result - def _preload_ethtables(self, eth_dispatchers, table_file): + def deinit(self): """ - Populates the ethernet tables from a JSON file + Clean up after a UHD session terminates. """ - import json - try: - eth_table_data = json.load(open(table_file)) - except ValueError as ex: - self.log.warning( - "Bad values in preloading table file: %s", - str(ex) + super(n310, self).deinit() + for xport_mgr in itervalues(self._xport_mgrs): + xport_mgr.deinit() + + ########################################################################### + # Transport API + ########################################################################### + def request_xport( + self, + dst_address, + suggested_src_address, + xport_type + ): + """ + See PeriphManagerBase.request_xport() for docs. + """ + # For now, we always accept the suggestion if available, or fail + src_address = suggested_src_address + if src_address not in self._available_endpoints: + raise RuntimeError("no more sids yo") + sid = SID(src_address << 16 | dst_address) + self.log.debug( + "request_xport(dst=0x%04X, suggested_src_address=0x%04X, xport_type=%s): " \ + "operating on SID: %s", + dst_address, suggested_src_address, str(xport_type), str(sid)) + # FIXME token! + assert self.mboard_info['rpc_connection'] in ('remote', 'local') + if self.mboard_info['rpc_connection'] == 'remote': + return self._xport_mgrs['udp'].request_xport( + sid, + xport_type, + ) + elif self.mboard_info['rpc_connection'] == 'local': + return self._xport_mgrs['liberio'].request_xport( + sid, + xport_type, ) - return - self.log.info( - "Preloading Ethernet dispatch tables from JSON file `%s'.", - table_file - ) - for eth_iface, data in iteritems(eth_table_data): - if eth_iface not in eth_dispatchers: - self.log.warning( - "Request to preload eth dispatcher table for " - "iface `{}', but no such interface is " - "registered. Known interfaces: {}".format( - str(eth_iface), - ",".join(eth_dispatchers.keys()) - ) - ) - continue - eth_dispatcher = eth_dispatchers[eth_iface] - self.log.debug("Preloading {} dispatch table".format(eth_iface)) - try: - for dst_ep, udp_data in iteritems(data): - sid = SID() - sid.set_dst_ep(int(dst_ep)) - eth_dispatcher.set_route( - sid, - udp_data['ip_addr'], - udp_data['port'], - udp_data.get('mac_addr', None) - ) - except ValueError as ex: - self.log.warning( - "Bad values in preloading table file: %s", - str(ex) - ) - def _allocate_sid(self, sender_addr, port, sid, xbar_src_addr, xbar_src_port, new_ep): # FIXME mtu + def commit_xport(self, xport_info): """ - Get the MAC address of the sender and store it in the FPGA ARP table + See PeriphManagerBase.commit_xport() for docs. + + Reminder: All connections are incoming, i.e. "send" or "TX" means + remote device to local device, and "receive" or "RX" means this local + device to remote device. "Remote device" can be, for example, a UHD + session. """ + ## Go, go, go + assert self.mboard_info['rpc_connection'] in ('remote', 'local') + sid = SID(xport_info['send_sid']) + self._available_endpoints.remove(sid.src_ep) + self.log.debug("Committing transport for SID %s, xport info: %s", + str(sid), str(xport_info)) if self.mboard_info['rpc_connection'] == 'remote': - self.log.debug("Preparing for UDP connection") - mac_addr = get_mac_addr(sender_addr) - if new_ep not in self._available_endpoints: - raise RuntimeError("no more sids yo") - self._available_endpoints.remove(new_ep) - if mac_addr is not None: - if sender_addr not in self.sid_endpoints: - self.sid_endpoints.update({sender_addr: (new_ep,)}) - else: - current_allocation = self.sid_endpoints.get(sender_addr) - new_allocation = current_allocation + (new_ep,) - self.sid_endpoints.update({sender_addr: new_allocation}) - sid = SID(sid) - sid.set_src_addr(xbar_src_addr) - sid.set_src_ep(new_ep) - my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO - my_xbar.set_route(xbar_src_addr, 0) # TODO - eth_dispatcher = self._eth_dispatchers['eth1'] # TODO - eth_dispatcher.set_route(sid.reversed(), sender_addr, port) - return sid.get() - else: - self.log.debug("Preparing for liberio connection") - if new_ep not in self._available_endpoints: - raise RuntimeError("no more sids yo") - self._available_endpoints.remove(new_ep) - if sender_addr not in self.sid_endpoints: - self.sid_endpoints.update({sender_addr: (new_ep,)}) - else: - current_allocation = self.sid_endpoints.get(sender_addr) - new_allocation = current_allocation + (new_ep,) - self.sid_endpoints.update({sender_addr: new_allocation}) - sid = SID(sid) - sid.set_src_addr(xbar_src_addr) - sid.set_src_ep(new_ep) - my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO - my_xbar.set_route(xbar_src_addr, 2) # TODO - mtu = 0 - assert False # This path is not yet done - self._dma_dispatcher.set_route(sid.reversed(), new_ep, mtu) - - return sid.get() + return self._xport_mgrs['udp'].commit_xport(sid, xport_info) + elif self.mboard_info['rpc_connection'] == 'local': + return self._xport_mgrs['liberio'].commit_xport(sid, xport_info) def get_clock_sources(self): " Lists all available clock sources. " diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py index 10d37a3d7..2cd44dfe5 100644 --- a/mpm/python/usrp_mpm/rpc_server.py +++ b/mpm/python/usrp_mpm/rpc_server.py @@ -54,7 +54,7 @@ class MPMServer(RPCServer): RPC calls to appropiate calls in the periph_manager and dboard_managers. """ # This is a list of methods in this class which require a claim - default_claimed_methods = ['init', 'reclaim', 'unclaim', 'allocate_sid'] + default_claimed_methods = ['init', 'reclaim', 'unclaim'] def __init__(self, state, mgr, *args, **kwargs): self.log = get_main_logger().getChild('RPCServer') @@ -332,20 +332,6 @@ class MPMServer(RPCServer): """ return self._last_error - def allocate_sid(self, token, *args): - """ - Forwards the call to periph_manager._allocate_sid with the client ip addresss - as argument. Should be used to setup interfaces - """ - if not self._check_token_valid(token): - self.log.warning("Attempt to allocate SID without valid token!") - return None - try: - return self.periph_manager._allocate_sid(self.client_host, *args) - except Exception as ex: - self._last_error = str(ex) - self.log.error("allocate_sid() failed: %s", str(ex)) - raise def _rpc_server_process(shared_state, port, mgr): -- cgit v1.2.3