aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/mpmd/mpmd_xport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/usrp/mpmd/mpmd_xport.cpp')
-rw-r--r--host/lib/usrp/mpmd/mpmd_xport.cpp283
1 files changed, 29 insertions, 254 deletions
diff --git a/host/lib/usrp/mpmd/mpmd_xport.cpp b/host/lib/usrp/mpmd/mpmd_xport.cpp
index 5dd29a6f7..c21da3839 100644
--- a/host/lib/usrp/mpmd/mpmd_xport.cpp
+++ b/host/lib/usrp/mpmd/mpmd_xport.cpp
@@ -4,57 +4,39 @@
// SPDX-License-Identifier: GPL-3.0
//
-// make_transport logic for mpmd_impl.
-
+// make_transport logic for mpmd_impl. Note that mpmd_xport_mgr.* has most of
+// the actual transport logic, this for transport-related APIs.
#include "mpmd_impl.hpp"
-#include "../transport/liberio_zero_copy.hpp"
-#include <uhd/transport/udp_zero_copy.hpp>
-#include <uhd/utils/byteswap.hpp>
+#include "mpmd_xport_mgr.hpp"
using namespace uhd;
+using namespace uhd::mpmd;
-
-// TODO this does not consider the liberio use case!
-uhd::device_addr_t mpmd_impl::get_rx_hints(size_t /* mb_index */)
+uhd::device_addr_t mpmd_impl::get_rx_hints(size_t mb_index)
{
- //device_addr_t rx_hints = _mb[mb_index].recv_args;
- device_addr_t rx_hints; // TODO don't ignore what the user tells us
- // (default to a large recv buff)
- if (not rx_hints.has_key("recv_buff_size"))
- {
- //For the ethernet transport, the buffer has to be set before creating
- //the transport because it is independent of the frame size and # frames
- //For nirio, the buffer size is not configurable by the user
- #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
- //limit buffer resize on macos or it will error
- rx_hints["recv_buff_size"] = boost::lexical_cast<std::string>(MPMD_RX_SW_BUFF_SIZE_ETH_MACOS);
- #elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)
- //set to half-a-second of buffering at max rate
- rx_hints["recv_buff_size"] = boost::lexical_cast<std::string>(MPMD_RX_SW_BUFF_SIZE_ETH);
- #endif
- }
- return rx_hints;
+ return _mb.at(mb_index)->get_rx_hints();
}
+uhd::device_addr_t mpmd_impl::get_tx_hints(size_t mb_index)
+{
+ return _mb.at(mb_index)->get_tx_hints();
+}
-/******************************************************************************
- * General make_transport() + helpers
- *****************************************************************************/
-size_t mpmd_impl::identify_mboard_by_sid(const size_t remote_addr)
+size_t mpmd_impl::identify_mboard_by_xbar_addr(const size_t xbar_addr) const
{
for (size_t mb_index = 0; mb_index < _mb.size(); mb_index++) {
for (size_t xbar_index = 0;
xbar_index < _mb[mb_index]->num_xbars;
xbar_index++) {
- if (_mb[mb_index]->get_xbar_local_addr(xbar_index) == remote_addr) {
+ if (_mb.at(mb_index)->get_xbar_local_addr(xbar_index) == xbar_addr) {
return mb_index;
}
}
}
throw uhd::lookup_error(str(
- boost::format("Cannot identify mboard for remote address %d")
- % remote_addr
+ boost::format("Cannot identify mboard for crossbar address %d")
+ % xbar_addr
));
}
@@ -63,59 +45,27 @@ both_xports_t mpmd_impl::make_transport(
usrp::device3_impl::xport_type_t xport_type,
const uhd::device_addr_t& args
) {
- const size_t mb_index = identify_mboard_by_sid(dst_address.get_dst_addr());
- // Could also be a map...
- const std::string xport_type_str = [xport_type](){
- switch (xport_type) {
- case CTRL:
- return "CTRL";
- case ASYNC_MSG:
- return "ASYNC_MSG";
- case RX_DATA:
- return "RX_DATA";
- case TX_DATA:
- return "TX_DATA";
- default:
- UHD_THROW_INVALID_CODE_PATH();
- };
- }();
-
+ const size_t mb_index =
+ identify_mboard_by_xbar_addr(dst_address.get_dst_addr());
+
+ const sid_t sid(
+ 0,
+ _sid_framer++, // FIXME make sure we only increment if actually valid
+ dst_address.get_dst_addr(),
+ dst_address.get_dst_endpoint()
+ );
UHD_LOGGER_TRACE("MPMD")
- << "Creating new transport of type: " << xport_type_str
- << " To mboard: " << mb_index
- << " Destination address: " << dst_address.to_pp_string_hex().substr(6)
+ << "Creating new transport to mboard: " << mb_index
+ << " SID: " << sid.to_pp_string_hex()
<< " User-defined xport args: " << args.to_string()
;
- const auto xport_info_list =
- _mb[mb_index]->rpc->request_with_token<xport_info_list_t>(
- "request_xport",
- dst_address.get_dst(),
- _sid_framer++, // FIXME make sure we only increment if actually valid
- xport_type_str
+ both_xports_t xports = _mb[mb_index]->make_transport(
+ sid,
+ xport_type,
+ args
);
UHD_LOGGER_TRACE("MPMD")
- << "request_xport() gave us " << xport_info_list.size()
- << " option(s)."
- ;
-
- // This is not very elegant, and needs some kind of factory model or
- // whatnot.
- auto xport_info = xport_info_list.at(0); // In the future, actually pick one
- // sensibly. This is what would
- // enable dual Eth support. FIXME
- both_xports_t xports;
- if (xport_info["type"] == "UDP") {
- xports = make_transport_udp(mb_index, xport_info, xport_type, args);
-#ifdef HAVE_LIBERIO
- } else if (xport_info["type"] == "liberio") {
- xports = make_transport_liberio(mb_index, xport_info, xport_type, args);
-#endif
- } else {
- UHD_THROW_INVALID_CODE_PATH();
- }
-
- UHD_LOGGER_TRACE("MPMD")
<< "xport info: send_sid==" << xports.send_sid.to_pp_string_hex()
<< " recv_sid==" << xports.recv_sid.to_pp_string_hex()
<< " endianness=="
@@ -123,181 +73,6 @@ both_xports_t mpmd_impl::make_transport(
<< " recv_buff_size==" << xports.recv_buff_size
<< " send_buff_size==" << xports.send_buff_size
;
-
- return xports;
-}
-
-
-/******************************************************************************
- * UDP Transport
- *****************************************************************************/
-both_xports_t mpmd_impl::make_transport_udp(
- const size_t mb_index,
- xport_info_t &xport_info,
- const xport_type_t /*xport_type*/,
- const uhd::device_addr_t& xport_args
-) {
- auto &rpc = _mb[mb_index]->rpc;
-
- transport::zero_copy_xport_params default_buff_args;
- // Create actual UDP transport
- // TODO don't hardcode these
- default_buff_args.send_frame_size = 8000;
- default_buff_args.recv_frame_size = 8000;
- default_buff_args.num_recv_frames = 32;
- default_buff_args.num_send_frames = 32;
-
- transport::udp_zero_copy::buff_params buff_params;
- auto recv = transport::udp_zero_copy::make(
- xport_info["ipv4"],
- xport_info["port"],
- default_buff_args,
- buff_params,
- xport_args
- );
- const uint16_t port = recv->get_local_port();
- const std::string src_ip_addr = recv->get_local_addr();
- xport_info["src_port"] = std::to_string(port);
- xport_info["src_ipv4"] = src_ip_addr;
-
- // Communicate it all back to MPM
- if (not rpc->request_with_token<bool>("commit_xport", xport_info)) {
- UHD_LOG_ERROR("MPMD", "Failed to create UDP transport!");
- throw uhd::runtime_error("commit_xport() failed!");
- }
-
- // Create both_xports_t object and finish:
- both_xports_t xports;
- xports.endianness = uhd::ENDIANNESS_BIG;
- xports.send_sid = sid_t(xport_info["send_sid"]);
- xports.recv_sid = xports.send_sid.reversed();
- xports.recv_buff_size = buff_params.recv_buff_size;
- xports.send_buff_size = buff_params.send_buff_size;
- xports.recv = recv; // Note: This is a type cast!
- xports.send = recv; // This too
- return xports;
-}
-
-/******************************************************************************
- * Liberio Transport
- *****************************************************************************/
-#ifdef HAVE_LIBERIO
-static uint32_t extract_sid_from_pkt(void* pkt, size_t) {
- return uhd::sid_t(uhd::wtohx(static_cast<const uint32_t*>(pkt)[1]))
- .get_dst();
-}
-
-uhd::transport::muxed_zero_copy_if::sptr mpmd_impl::make_muxed_liberio_xport(
- const std::string &tx_dev,
- const std::string &rx_dev,
- const uhd::transport::zero_copy_xport_params &buff_args,
- const size_t max_muxed_ports
-) {
- auto base_xport = transport::liberio_zero_copy::make(
- tx_dev, rx_dev, buff_args);
-
- return uhd::transport::muxed_zero_copy_if::make(
- base_xport, extract_sid_from_pkt, max_muxed_ports);
-}
-
-both_xports_t mpmd_impl::make_transport_liberio(
- const size_t mb_index,
- xport_info_t &xport_info,
- const xport_type_t xport_type,
- const uhd::device_addr_t& xport_args
-) {
- auto &rpc = _mb[mb_index]->rpc;
-
- transport::zero_copy_xport_params default_buff_args;
- /* default ones for RX / TX, override below */
- default_buff_args.send_frame_size = 4 * getpagesize();
- default_buff_args.recv_frame_size = 4 * getpagesize();
- default_buff_args.num_recv_frames = 128;
- default_buff_args.num_send_frames = 128;
-
- if (xport_type == CTRL) {
- default_buff_args.send_frame_size = 128;
- default_buff_args.recv_frame_size = 128;
- } else if (xport_type == ASYNC_MSG) {
- default_buff_args.send_frame_size = 256;
- default_buff_args.recv_frame_size = 256;
- } else if (xport_type == RX_DATA) {
- default_buff_args.send_frame_size = 64;
- } else {
- default_buff_args.recv_frame_size = 64;
- }
-
- const std::string tx_dev = xport_info["tx_dev"];
- const std::string rx_dev = xport_info["rx_dev"];
-
- both_xports_t xports;
- xports.endianness = uhd::ENDIANNESS_LITTLE;
- xports.send_sid = sid_t(xport_info["send_sid"]);
- xports.recv_sid = xports.send_sid.reversed();
-
- // this is kinda ghetto: scale buffer for muxed xports since we share the
- // buffer...
- float divisor = 1;
- if (xport_type == CTRL)
- divisor = 16;
- else if (xport_type == ASYNC_MSG)
- divisor = 4;
-
-
- //if (xport_info["muxed"] == "True") {
- //// FIXME tbw
- //}
- if (xport_type == CTRL) {
- UHD_ASSERT_THROW(xport_info["muxed"] == "True");
- if (not _ctrl_dma_xport) {
- default_buff_args.send_frame_size = 128;
- default_buff_args.recv_frame_size = 128;
- _ctrl_dma_xport = make_muxed_liberio_xport(tx_dev, rx_dev,
- default_buff_args, int(divisor));
- }
-
- UHD_LOGGER_TRACE("MPMD")
- << "Making (muxed) stream with num " << xports.recv_sid.get_dst();
- xports.recv = _ctrl_dma_xport->make_stream(xports.recv_sid.get_dst());
- } else if (xport_type == ASYNC_MSG) {
- UHD_ASSERT_THROW(xport_info["muxed"] == "True");
- if (not _async_msg_dma_xport) {
- default_buff_args.send_frame_size = 256;
- default_buff_args.recv_frame_size = 256;
- _async_msg_dma_xport = make_muxed_liberio_xport(
- tx_dev, rx_dev, default_buff_args, int(divisor));
- }
-
- UHD_LOGGER_TRACE("MPMD")
- << "making (muxed) stream with num " << xports.recv_sid.get_dst();
- xports.recv =
- _async_msg_dma_xport->make_stream(xports.recv_sid.get_dst());
- } else {
- xports.recv =
- transport::liberio_zero_copy::make(
- tx_dev, rx_dev, default_buff_args);
- }
-
- transport::udp_zero_copy::buff_params buff_params;
- buff_params.recv_buff_size =
- float(default_buff_args.recv_frame_size) *
- float(default_buff_args.num_recv_frames) / divisor;
- buff_params.send_buff_size =
- float(default_buff_args.send_frame_size) *
- float(default_buff_args.num_send_frames) / divisor;
-
-
- // Communicate it all back to MPM
- if (not rpc->request_with_token<bool>("commit_xport", xport_info)) {
- UHD_LOG_ERROR("MPMD", "Failed to create UDP transport!");
- throw uhd::runtime_error("commit_xport() failed!");
- }
-
- // Finish both_xports_t object and return:
- xports.recv_buff_size = buff_params.recv_buff_size;
- xports.send_buff_size = buff_params.send_buff_size;
- xports.send = xports.recv;
return xports;
}
-#endif /* HAVE_LIBERIO */