diff options
author | Alex Williams <alex.williams@ni.com> | 2018-09-25 14:49:45 -0700 |
---|---|---|
committer | Ashish Chaudhari <ashish.chaudhari@ettus.com> | 2019-01-25 13:30:22 -0800 |
commit | ad2720d7188aece35e8aa4c65118a33b8b9ae690 (patch) | |
tree | 2d834deb728e0d59a60b8c23fa85da9db59df8af | |
parent | 3c821adfedf859ffb689136eea2ac6fa6b48916a (diff) | |
download | uhd-ad2720d7188aece35e8aa4c65118a33b8b9ae690.tar.gz uhd-ad2720d7188aece35e8aa4c65118a33b8b9ae690.tar.bz2 uhd-ad2720d7188aece35e8aa4c65118a33b8b9ae690.zip |
mpmd,transport,prefs: Add xport_mgr for dpdk_zero_copy
Add configuration sections to the UHD config file for NIC entries. Keys
are based on MAC addresses, and the entries beneath the section describe
which CPU and I/O thread to use for the NIC and its IPv4 address.
Make ring sizes configurable for uhd-dpdk. Ring size is now an argument
for packet buffers. Note that the maximum number of available buffers
is still determined at init!
Add ability to receive broadcasts to uhd-dpdk. This is controllable by
a boolean in the sockarg during socket creation. dpdk_zero_copy will
filter broadcast packets out.
Add dpdk_simple transport (to mirror udp_simple). This transport allows
receiving from broadcast addresses, but it only permits one outstanding
buffer at a time.
Fix IP checksum handling in UHD-DPDK.
TX checksums were not being calculated in the NIC, and in RX, the check
for IP checksums allowed values of zero (reported as none). Now packets
with bad IP checksums will be dropped.
22 files changed, 1220 insertions, 344 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk_common.hpp b/host/lib/include/uhdlib/transport/dpdk_common.hpp new file mode 100644 index 000000000..2f320e79e --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_common.hpp @@ -0,0 +1,80 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_DPDK_COMMON_HPP +#define INCLUDED_DPDK_COMMON_HPP + +#include <uhd/config.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhd/utils/static.hpp> +#include <array> +#include <atomic> +#include <mutex> +#include <string> + +namespace uhd { namespace transport { + +class uhd_dpdk_ctx : boost::noncopyable { +public: + UHD_SINGLETON_FCN(uhd_dpdk_ctx, get); + + ~uhd_dpdk_ctx(void); + + /*! + * Initialize uhd-dpdk (and do only once) + * \param user_args User args passed in to override config files + */ + void init(const device_addr_t &user_args); + + /*! + * Get MTU of NICs used by DPDK + * + * \return Number of Bytes in MTU + */ + size_t get_mtu(void) const; + + /*! + * Get port ID from provided MAC address + * \param mac_addr MAC address + * \param port_id Int to write ID of port corresponding to MAC address + * \return 0 if match found, else no match + */ + int get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int &port_id) const; + + /*! + * Get port ID for routing packet destined for given address + * \param addr Destination address + * \return port ID from routing table + */ + int get_route(const std::string &addr) const; + + /*! + * Set IPv4 address and subnet mask of given NIC port + * Not thread-safe. Should only be written before ports are in use. + * \param port_id NIC port ID + * \param ipv4_addr IPv4 address to write + * \param netmask Subnet mask identifying network number in ipv4_addr + * \return 0 if successful, else error + */ + int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask); + + /*! + * \return whether init() has been called + */ + bool is_init_done(void) const; + +private: + uhd_dpdk_ctx(void); + + size_t _mtu; + std::mutex _init_mutex; + std::atomic<bool> _init_done; + uhd::dict<uint32_t, unsigned int> _routes; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_DPDK_COMMON_HPP */ diff --git a/host/lib/include/uhdlib/transport/dpdk_simple.hpp b/host/lib/include/uhdlib/transport/dpdk_simple.hpp new file mode 100644 index 000000000..62728b38d --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_simple.hpp @@ -0,0 +1,95 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_DPDK_SIMPLE_HPP +#define INCLUDED_DPDK_SIMPLE_HPP + +#include <uhdlib/transport/dpdk_common.hpp> + +namespace uhd { namespace transport { + +class dpdk_simple : boost::noncopyable +{ +public: + typedef boost::shared_ptr<dpdk_simple> sptr; + + virtual ~dpdk_simple(void) = 0; + + /*! + * Make a new connected dpdk transport: + * This transport is for sending and receiving + * between this host and a single endpoint. + * The primary usage for this transport will be control transactions. + * + * The address must be an ipv4 address. + * The port must be a number. + * + * \param addr a string representing the destination address + * \param port a string representing the destination port + */ + static sptr make_connected(struct uhd_dpdk_ctx &ctx, + const std::string &addr, const std::string &port); + + /*! + * Make a new broadcasting dpdk transport: + * This transport can send broadcast datagrams + * and receive datagrams from multiple sources. + * The primary usage for this transport will be to discover devices. + * + * The address must be an ipv4 address. + * The port must be a number. + * + * \param addr a string representing the destination address + * \param port a string representing the destination port + */ + static sptr make_broadcast(struct uhd_dpdk_ctx &ctx, + const std::string &addr, const std::string &port); + + /*! + * Request a single send buffer of specified size. + * + * \param buf a pointer to place to write buffer location + * \return the maximum length of the buffer in Bytes + */ + virtual size_t get_tx_buf(void** buf) = 0; + + /*! + * Send and release outstanding buffer + * + * \param number of bytes sent (releases buffer if sent) + */ + virtual size_t send(size_t length) = 0; + + /*! + * Receive a single packet. + * Buffer provided by transport (must be freed). + * + * \param buf a pointer to place to write buffer location + * \param timeout the timeout in seconds + * \return the number of bytes received or zero on timeout + */ + virtual size_t recv(void **buf, double timeout = 0.1) = 0; + + /*! + * Return/free receive buffer + */ + virtual void put_rx_buf(void) = 0; + + /*! + * Get the last IP address as seen by recv(). + * Only use this with the broadcast socket. + */ + virtual std::string get_recv_addr(void) = 0; + + /*! + * Get the IP address for the destination + */ + virtual std::string get_send_addr(void) = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_DPDK_SIMPLE_HPP */ diff --git a/host/lib/include/uhdlib/transport/uhd-dpdk.h b/host/lib/include/uhdlib/transport/uhd-dpdk.h index 8d46912bd..ae7d31383 100644 --- a/host/lib/include/uhdlib/transport/uhd-dpdk.h +++ b/host/lib/include/uhdlib/transport/uhd-dpdk.h @@ -31,12 +31,22 @@ enum uhd_dpdk_sock_type { }; /** - * Init UHD-DPDK environment and bring up ports (link UP). - * - * Offload capabilities will be used if available + * Init UHD-DPDK environment, including DPDK's EAL. + * This will make available information about the DPDK-assigned NIC devices. * * @param argc passed directly to rte_eal_init() * @param argv passed directly to rte_eal_init() + * + * @return Returns negative error code if there were issues, else 0 + */ +int uhd_dpdk_init(int argc, const char **argv); + +/** + * Start UHD-DPDK networking stack. Bring ports up (link UP). + * uhd_dpdk_init() must be called first. + * + * Offload capabilities will be used if available + * * @param num_ports number of network interfaces to map * @param port_thread_mapping array of num_ports entries specifying which thread * will drive the I/O for a given port (determined by array index) @@ -47,9 +57,8 @@ enum uhd_dpdk_sock_type { * * @return Returns negative error code if there were issues, else 0 */ -int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, - int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, - int mtu); +int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping, + int num_mbufs, int mbuf_cache_size, int mtu); /** * @return Returns number of ports registered to DPDK. @@ -58,6 +67,12 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, int uhd_dpdk_port_count(void); /** + * @return Returns 0 if link is down, 1 if link is up, and negative error code + * if error occurred. + */ +int uhd_dpdk_port_link_status(unsigned int portid); + +/** * @return Returns Ethernet MAC address of requested port * * @param portid ID number of network interface @@ -121,17 +136,21 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock); /** * Arguments for a UDP socket - * All data should be provided in network format + * All address/port data should be provided in network format */ struct uhd_dpdk_sockarg_udp { /*! True for TX socket, false for RX socket */ bool is_tx; + /*! True to filter broadcast packets, else recv */ + bool filter_bcast; /*! Local udp port. This is dst_port for RX, src_port for TX */ uint16_t local_port; /*! Remote udp port. This is dst_port for TX */ uint16_t remote_port; /*! IPv4 address for destination (TX) */ uint32_t dst_addr; + /*! Number of buffers in ring */ + size_t num_bufs; }; /** diff --git a/host/lib/include/uhdlib/utils/prefs.hpp b/host/lib/include/uhdlib/utils/prefs.hpp index 62831a875..e528450cd 100644 --- a/host/lib/include/uhdlib/utils/prefs.hpp +++ b/host/lib/include/uhdlib/utils/prefs.hpp @@ -48,6 +48,40 @@ namespace uhd { namespace prefs { */ uhd::device_addr_t get_usrp_args(const uhd::device_addr_t &user_args); + /*! Convenience function to update global DPDK args with settings from + * config files. + * + * Searches for a profile attached to the dpdk-conf key, like this: + * [dpdk-conf=myconfig] + * num_mbufs=4095 + * mbuf_cache_size=315 + * mtu=8000 + * + * \param user_args After getting the device args from the config + * files, all of these key/value pairs will be applied + * and will overwrite the settings from config files + * if they exist. + */ + uhd::device_addr_t get_dpdk_args(const uhd::device_addr_t &user_args); + + /*! Convenience function to update per-NIC DPDK args with settings from + * config files. + * + * Grabs settings based on provided MAC address. Sections created like so: + * [dpdk-mac=00:01:02:03:04:05] + * dpdk-ipv4 = 192.168.20.1/24 + * dpdk-io-cpu = 1 + * + * [dpdk-mac=00:01:02:03:04:06] + * dpdk-ipv4 = 192.168.40.1/24 + * dpdk-io-cpu = 1 + * + * \param user_args After getting the device args from the config + * files, all of these key/value pairs will be applied + * and will overwrite the settings from config files + * if they exist. + */ + uhd::device_addr_t get_dpdk_nic_args(const uhd::device_addr_t &user_args); }} /* namespace uhd::prefs */ #endif /* INCLUDED_LIBUHD_UTILS_PREFS_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 63d44c80b..7794f0760 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -141,6 +141,7 @@ if(ENABLE_DPDK) INCLUDE_SUBDIRECTORY(uhd-dpdk) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_zero_copy.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp ) endif(ENABLE_DPDK) diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp new file mode 100644 index 000000000..74bb979ef --- /dev/null +++ b/host/lib/transport/dpdk_simple.cpp @@ -0,0 +1,192 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk_simple.hpp> +#include <uhdlib/transport/uhd-dpdk.h> +#include <arpa/inet.h> + +namespace uhd { namespace transport { + +namespace { + constexpr uint64_t USEC = 1000000; + // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC + constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4; +} + +class dpdk_simple_impl : public dpdk_simple { +public: + dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr, + const std::string &port, bool filter_bcast) + { + UHD_ASSERT_THROW(ctx.is_init_done()); + + // Get NIC that can route to addr + int port_id = ctx.get_route(addr); + UHD_ASSERT_THROW(port_id >= 0); + + _port_id = port_id; + uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); + uint16_t dst_port = htons(std::stoi(port, NULL, 0)); + + struct uhd_dpdk_sockarg_udp sockarg = { + .is_tx = false, + .filter_bcast = filter_bcast, + .local_port = 0, + .remote_port = dst_port, + .dst_addr = dst_ipv4, + .num_bufs = 1 + }; + _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); + UHD_ASSERT_THROW(_rx_sock != nullptr); + + // Backfill the local port, in case it was auto-assigned + uhd_dpdk_udp_get_info(_rx_sock, &sockarg); + sockarg.is_tx = true; + sockarg.remote_port = dst_port; + sockarg.dst_addr = dst_ipv4; + sockarg.num_bufs = 1; + _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); + UHD_ASSERT_THROW(_tx_sock != nullptr); + UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":" + << ntohs(dst_port) << " and NIC(" << _port_id + << "):" << ntohs(sockarg.local_port)); + } + + ~dpdk_simple_impl(void) + { + if (_rx_mbuf) + uhd_dpdk_free_buf(_rx_mbuf); + if (_tx_mbuf) + uhd_dpdk_free_buf(_tx_mbuf); + } + + /*! + * Request a single send buffer of specified size. + * + * \param buf a pointer to place to write buffer location + * \return the maximum length of the buffer + */ + size_t get_tx_buf(void** buf) + { + UHD_ASSERT_THROW(!_tx_mbuf); + int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, &_tx_mbuf, 1, 0); + if (bufs != 1 || !_tx_mbuf) { + *buf = nullptr; + return 0; + } + *buf = uhd_dpdk_buf_to_data(_tx_sock, _tx_mbuf); + return _mtu - DPDK_SIMPLE_NONDATA_SIZE; + } + + /*! + * Send and release outstanding buffer + * + * \param length bytes of data to send + * \return number of bytes sent (releases buffer if sent) + */ + size_t send(size_t length) + { + UHD_ASSERT_THROW(_tx_mbuf) + _tx_mbuf->pkt_len = length; + _tx_mbuf->data_len = length; + int num_tx = uhd_dpdk_send(_tx_sock, &_tx_mbuf, 1); + if (num_tx == 0) + return 0; + _tx_mbuf = nullptr; + return length; + } + + /*! + * Receive a single packet. + * Buffer provided by transport (must be freed before next operation). + * + * \param buf a pointer to place to write buffer location + * \param timeout the timeout in seconds + * \return the number of bytes received or zero on timeout + */ + size_t recv(void **buf, double timeout) + { + UHD_ASSERT_THROW(!_rx_mbuf); + int bufs = uhd_dpdk_recv(_rx_sock, &_rx_mbuf, 1, (int) (timeout*USEC)); + if (bufs != 1 || _rx_mbuf == nullptr) { + *buf = nullptr; + return 0; + } + if ((_tx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { + uhd_dpdk_free_buf(_rx_mbuf); + _rx_mbuf = nullptr; + return 0; + } + uhd_dpdk_get_src_ipv4(_rx_sock, _rx_mbuf, &_last_recv_addr); + *buf = uhd_dpdk_buf_to_data(_rx_sock, _rx_mbuf); + return uhd_dpdk_get_len(_rx_sock, _rx_mbuf); + } + + /*! + * Return/free receive buffer + * Can also use to free un-sent TX bufs + */ + void put_rx_buf(void) + { + UHD_ASSERT_THROW(_rx_mbuf) + uhd_dpdk_free_buf(_rx_mbuf); + } + + /*! + * Get the last IP address as seen by recv(). + * Only use this with the broadcast socket. + */ + std::string get_recv_addr(void) + { + char addr_str[INET_ADDRSTRLEN]; + struct in_addr ipv4_addr; + ipv4_addr.s_addr = _last_recv_addr; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); + } + + /*! + * Get the IP address for the destination + */ + std::string get_send_addr(void) + { + struct in_addr ipv4_addr; + int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr); + UHD_ASSERT_THROW(status); + char addr_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); + } +private: + unsigned int _port_id; + size_t _mtu; + struct uhd_dpdk_socket *_tx_sock; + struct rte_mbuf *_tx_mbuf = nullptr; + struct uhd_dpdk_socket *_rx_sock; + struct rte_mbuf *_rx_mbuf = nullptr; + uint32_t _last_recv_addr; +}; + +dpdk_simple::~dpdk_simple(void) {} + +/*********************************************************************** + * DPDK simple transport public make functions + **********************************************************************/ +dpdk_simple::sptr dpdk_simple::make_connected( + struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port +){ + return sptr(new dpdk_simple_impl(ctx, addr, port, true)); +} + +dpdk_simple::sptr dpdk_simple::make_broadcast( + struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port +){ + return sptr(new dpdk_simple_impl(ctx, addr, port, false)); +} +}} // namespace uhd::transport + + diff --git a/host/lib/transport/dpdk_zero_copy.cpp b/host/lib/transport/dpdk_zero_copy.cpp index 51c9b9dba..9649c1cfb 100644 --- a/host/lib/transport/dpdk_zero_copy.cpp +++ b/host/lib/transport/dpdk_zero_copy.cpp @@ -7,24 +7,28 @@ #include "dpdk_zero_copy.hpp" #include <uhd/config.hpp> #include <uhd/utils/log.hpp> -#include <uhd/utils/static.hpp> #include <uhdlib/transport/uhd-dpdk.h> -#include <arpa/inet.h> -#include <sys/syslog.h> +#include <uhdlib/utils/prefs.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/format.hpp> #include <boost/make_shared.hpp> #include <stack> +#include <sys/syslog.h> +#include <arpa/inet.h> namespace uhd { namespace transport { namespace { -static constexpr uint64_t USEC = 1000000; -// FIXME: Make configurable and have uhd-dpdk library track buffer sizes -static constexpr size_t DEFAULT_FRAME_SIZE = 8000; - -inline char* eal_add_opt( - std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg) +constexpr uint64_t USEC = 1000000; +constexpr size_t DEFAULT_FRAME_SIZE = 8000; +constexpr int DEFAULT_NUM_MBUFS = 4095; +constexpr int DEFAULT_MBUF_CACHE_SIZE = 315; +constexpr size_t UHD_DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP + +inline char * eal_add_opt(std::vector<const char*> &argv, size_t n, + char *dst, const char *opt, const char *arg) { - char* ptr = dst; + char *ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); ptr += strlen(opt) + 1; @@ -34,6 +38,78 @@ inline char* eal_add_opt( ptr += strlen(arg) + 1; return ptr; } + +inline void uhd_dpdk_eal_init(const device_addr_t &eal_args) +{ + /* Build up argc and argv */ + std::vector<const char *> argv; + argv.push_back("uhd-dpdk"); + auto args = new std::array<char, 4096>(); + char *opt = args->data(); + char *end = args->data() + args->size(); + for (std::string &key : eal_args.keys()) { + std::string val = eal_args[key]; + if (key == "dpdk-coremask") { + opt = eal_add_opt(argv, end - opt, opt, "-c", + val.c_str()); + } else if (key == "dpdk-corelist") { + /* NOTE: This arg may have commas, so limited to config file */ + opt = eal_add_opt(argv, end - opt, opt, "-l", + val.c_str()); + } else if (key == "dpdk-coremap") { + opt = eal_add_opt(argv, end - opt, opt, "--lcores", + val.c_str()); + } else if (key == "dpdk-master-lcore") { + opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", + val.c_str()); + } else if (key == "dpdk-pci-blacklist") { + opt = eal_add_opt(argv, end - opt, opt, "-b", + val.c_str()); + } else if (key == "dpdk-pci-whitelist") { + opt = eal_add_opt(argv, end - opt, opt, "-w", + val.c_str()); + } else if (key == "dpdk-log-level") { + opt = eal_add_opt(argv, end - opt, opt, "--log-level", + val.c_str()); + } else if (key == "dpdk-huge-dir") { + opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", + val.c_str()); + } else if (key == "dpdk-file-prefix") { + opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", + val.c_str()); + } else if (key == "dpdk-driver") { + opt = eal_add_opt(argv, end - opt, opt, "-d", + val.c_str()); + } + /* TODO: Change where log goes? + int rte_openlog_stream( FILE * f) + */ + } + /* Init DPDK's EAL */ + uhd_dpdk_init(argv.size(), argv.data()); + delete args; +} + +inline std::string eth_addr_to_string(struct eth_addr mac_addr) +{ + auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); + mac_stream % (uint32_t) mac_addr.addr[0] % (uint32_t) mac_addr.addr[1] + % (uint32_t) mac_addr.addr[2] % (uint32_t) mac_addr.addr[3] + % (uint32_t) mac_addr.addr[4] % (uint32_t) mac_addr.addr[5]; + return mac_stream.str(); +} + +inline void separate_ipv4_addr(const std::string ipv4, + uint32_t &ipv4_addr, uint32_t &netmask) +{ + std::vector<std::string> result; + boost::algorithm::split(result, ipv4, + [](const char &in) {return in == '/';}, boost::token_compress_on); + UHD_ASSERT_THROW(result.size() == 2); + ipv4_addr = (uint32_t) inet_addr(result[0].c_str()); + int netbits = std::atoi(result[1].c_str()); + netmask = htonl(0xffffffff << (32-netbits)); +} } // namespace uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {} @@ -41,69 +117,82 @@ uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {} uhd_dpdk_ctx::~uhd_dpdk_ctx(void) {} /* Initialize uhd-dpdk (and do only once) */ -void uhd_dpdk_ctx::init(const dict<std::string, std::string>& eal_args, - unsigned int num_ports, - int* port_thread_mapping, - int num_mbufs, - int mbuf_cache_size, - size_t mtu) +void uhd_dpdk_ctx::init(const device_addr_t &user_args) { std::lock_guard<std::mutex> lock(_init_mutex); if (!_init_done) { - _mtu = mtu; - /* Build up argc and argv */ - std::vector<const char*> argv; - argv.push_back("uhd-dpdk"); - char* args = new char[4096]; - char* opt = args; - char* end = args + sizeof(args); - for (std::string& key : eal_args.keys()) { - std::string val = eal_args[key]; - if (key == "coremask") { - opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str()); - } else if (key == "corelist") { - /* NOTE: This arg may have commas, so limited to config file */ - opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str()); - } else if (key == "coremap") { - opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str()); - } else if (key == "master-lcore") { - opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str()); - } else if (key == "pci-blacklist") { - opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str()); - } else if (key == "pci-whitelist") { - opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str()); - } else if (key == "log-level") { - opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str()); - } else if (key == "huge-dir") { - opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str()); - } else if (key == "file-prefix") { - opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str()); + /* Gather global config, build args for EAL, and init UHD-DPDK */ + const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args); + UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl + << dpdk_args.to_pp_string()); + uhd_dpdk_eal_init(dpdk_args); + + _mtu = dpdk_args.has_key("dpdk-mtu") + ? dpdk_args.cast<size_t>("dpdk-mtu", 0) + : DEFAULT_FRAME_SIZE; + const int num_mbufs = dpdk_args.has_key("dpdk-num-mbufs") + ? dpdk_args.cast<int>("dpdk-num-mbufs", 0) + : DEFAULT_NUM_MBUFS; + const int mbuf_cache_size = dpdk_args.has_key("dpdk-mbuf-cache-size") + ? dpdk_args.cast<int>("dpdk-mbuf-cache-size", 0) + : DEFAULT_MBUF_CACHE_SIZE; + + /* Get configuration for all the NIC ports */ + device_addrs_t args = separate_device_addr(user_args); + int num_ports = uhd_dpdk_port_count(); + std::vector<int> io_cpu_map(num_ports); + device_addrs_t nics(num_ports); + for (ssize_t i = 0; i < num_ports; i++) { + struct eth_addr mac_addr = uhd_dpdk_get_eth_addr(i); + nics[i]["dpdk-mac"] = eth_addr_to_string(mac_addr); + for (const auto &arg: args) { + if (arg.has_key("dpdk-mac") + && arg["dpdk-mac"] == nics[i]["dpdk-mac"]) { + for (const auto& key: arg.keys()) { + nics[i][key] = arg[key]; + } + break; + } } + nics[i] = uhd::prefs::get_dpdk_nic_args(nics[i]); + if (nics[i].has_key("dpdk-ipv4") + && nics[i].has_key("dpdk-io-cpu")) { + uint32_t ipv4_addr, netmask; + io_cpu_map[i] = std::atoi(nics[i]["dpdk-io-cpu"].c_str()); + separate_ipv4_addr(nics[i]["dpdk-ipv4"], ipv4_addr, netmask); + uhd_dpdk_set_ipv4_addr((unsigned int) i, ipv4_addr, netmask); + } else { + /* Not enough configuration to use NIC */ + io_cpu_map[i] = -1; + } + UHD_LOG_TRACE("DPDK", "Found NIC(" << i << "):" << std::endl + << nics[i].to_pp_string()); } - uhd_dpdk_init(argv.size(), - argv.data(), - num_ports, - port_thread_mapping, - num_mbufs, - mbuf_cache_size, - _mtu); - delete args; + uhd_dpdk_start(num_ports, io_cpu_map.data(), num_mbufs, + mbuf_cache_size, _mtu); _init_done = true; } } -int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& port_id) +size_t uhd_dpdk_ctx::get_mtu(void) const +{ + UHD_ASSERT_THROW(is_init_done()); + return _mtu; +} + +int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, + unsigned int &port_id) const { UHD_ASSERT_THROW(is_init_done()); int num_ports = uhd_dpdk_port_count(); for (int i = 0; i < num_ports; i++) { - struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int)i); + struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int) i); for (int j = 0; j < 6; j++) { if (mac_addr[j] != port_mac_addr.addr[j]) { break; } if (j == 5) { - port_id = (unsigned int)i; + port_id = (unsigned int) i; return 0; } } @@ -111,52 +200,54 @@ int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& por return -1; } -int uhd_dpdk_ctx::get_route(const std::string& addr) const +int uhd_dpdk_ctx::get_route(const std::string &addr) const { - const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); + const uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); const unsigned int num_ports = uhd_dpdk_port_count(); for (unsigned int port = 0; port < num_ports; port++) { uint32_t src_ipv4; uint32_t netmask; + if (uhd_dpdk_port_link_status(port) < 1) + continue; uhd_dpdk_get_ipv4_addr(port, &src_ipv4, &netmask); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { - return (int)port; - } + return (int) port; + } } return -ENODEV; } -int uhd_dpdk_ctx::set_ipv4_addr( - unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask) +int uhd_dpdk_ctx::set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, + uint32_t netmask) { return uhd_dpdk_set_ipv4_addr(port_id, ipv4_addr, netmask); } -bool uhd_dpdk_ctx::is_init_done(void) +bool uhd_dpdk_ctx::is_init_done(void) const { return _init_done.load(); } -class dpdk_zero_copy_msb : public managed_send_buffer -{ +class dpdk_zero_copy_msb : public managed_send_buffer { public: - dpdk_zero_copy_msb(struct uhd_dpdk_socket* sock, - std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>>& free_bufs) - : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){}; + dpdk_zero_copy_msb(struct uhd_dpdk_socket *sock, + std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &free_bufs, + size_t frame_size) + : _sock(sock), _buf(nullptr), _free_bufs(free_bufs), + _frame_size(frame_size) {}; ~dpdk_zero_copy_msb(void) {} void release(void) { if (_buf) { - _buf->pkt_len = _length; + _buf->pkt_len = _length; _buf->data_len = _length; - int num_tx = uhd_dpdk_send(_sock, &_buf, 1); + int num_tx = uhd_dpdk_send(_sock, &_buf, 1); if (num_tx == 0) { /* Drop packet and free buffer (do not share sockets!) */ - UHD_LOG_ERROR( - "DPDK", "Invalid shared socket usage detected. Dropping packet..."); + UHD_LOG_ERROR("DPDK", "Invalid shared socket usage detected. Dropping packet..."); uhd_dpdk_free_buf(_buf); } // Push back into pool @@ -170,21 +261,22 @@ public: if (bufs != 1 || !_buf) return sptr(); - return make(this, uhd_dpdk_buf_to_data(_sock, _buf), DEFAULT_FRAME_SIZE); + return make(this, uhd_dpdk_buf_to_data(_sock, _buf), + _frame_size); } private: - struct uhd_dpdk_socket* _sock; - struct rte_mbuf* _buf; - std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>>& _free_bufs; + struct uhd_dpdk_socket *_sock; + struct rte_mbuf *_buf; + std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &_free_bufs; + size_t _frame_size; }; -class dpdk_zero_copy_mrb : public managed_recv_buffer -{ +class dpdk_zero_copy_mrb : public managed_recv_buffer { public: - dpdk_zero_copy_mrb(struct uhd_dpdk_socket* sock, - std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>>& free_bufs) - : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){}; + dpdk_zero_copy_mrb(struct uhd_dpdk_socket *sock, + std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> &free_bufs) + : _sock(sock), _buf(nullptr), _free_bufs(free_bufs) {}; ~dpdk_zero_copy_mrb(void) {} void release(void) @@ -197,78 +289,78 @@ public: sptr get_new(double timeout) { - int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int)(timeout * USEC)); + int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int) (timeout*USEC)); if (bufs != 1 || _buf == nullptr) { // Push back into pool if we didn't get a real buffer _free_bufs.push(this); return sptr(); } - if ((_buf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { - UHD_LOG_WARNING("DPDK", "IP checksum failure detected. Dropping packet..."); - uhd_dpdk_free_buf(_buf); - _free_bufs.push(this); - return sptr(); - } - return make( - this, uhd_dpdk_buf_to_data(_sock, _buf), uhd_dpdk_get_len(_sock, _buf)); + return make(this, uhd_dpdk_buf_to_data(_sock, _buf), + uhd_dpdk_get_len(_sock, _buf)); } private: - struct uhd_dpdk_socket* _sock; - struct rte_mbuf* _buf; - std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>>& _free_bufs; + struct uhd_dpdk_socket *_sock; + struct rte_mbuf *_buf; + std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> &_free_bufs; }; -class dpdk_zero_copy_impl : public dpdk_zero_copy -{ +class dpdk_zero_copy_impl : public dpdk_zero_copy { public: - dpdk_zero_copy_impl(struct uhd_dpdk_ctx& ctx, - const unsigned int dpdk_port_id, - const std::string& addr, - const std::string& remote_port, - const std::string& local_port, - const zero_copy_xport_params& xport_params) - : _num_send_frames(xport_params.num_send_frames) - , _send_frame_size(xport_params.send_frame_size) - , _num_recv_frames(xport_params.num_recv_frames) - , _recv_frame_size(xport_params.recv_frame_size) - , _port_id(dpdk_port_id) - , _rx_empty_count(0) - , _tx_empty_count(0) + + dpdk_zero_copy_impl(const struct uhd_dpdk_ctx &ctx, + const unsigned int dpdk_port_id, + const std::string &addr, + const std::string &remote_port, + const std::string &local_port, + const zero_copy_xport_params& xport_params) + : _num_send_frames(xport_params.num_send_frames), + _send_frame_size(xport_params.send_frame_size), + _num_recv_frames(xport_params.num_recv_frames), + _recv_frame_size(xport_params.recv_frame_size), + _port_id(dpdk_port_id), + _rx_empty_count(0), + _tx_empty_count(0) { - // TODO: Handle xport_params UHD_ASSERT_THROW(xport_params.recv_frame_size > 0); UHD_ASSERT_THROW(xport_params.send_frame_size > 0); UHD_ASSERT_THROW(xport_params.num_send_frames > 0); UHD_ASSERT_THROW(xport_params.num_recv_frames > 0); UHD_ASSERT_THROW(ctx.is_init_done()); + UHD_ASSERT_THROW(xport_params.recv_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE); + UHD_ASSERT_THROW(xport_params.send_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE); const int num_ports = uhd_dpdk_port_count(); UHD_ASSERT_THROW(num_ports > 0); - UHD_ASSERT_THROW(dpdk_port_id < (unsigned int)num_ports); + UHD_ASSERT_THROW(dpdk_port_id < (unsigned int) num_ports); // Convert ipv4 addr from string to uint32_t, network format - uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); + uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); // Convert port from string to uint16_t, network format uint16_t dst_port = htons(std::stoi(remote_port, NULL, 0)); uint16_t src_port = htons(std::stoi(local_port, NULL, 0)); // Create RX socket first - struct uhd_dpdk_sockarg_udp sockarg = {.is_tx = false, - .local_port = src_port, - .remote_port = dst_port, - .dst_addr = dst_ipv4}; + struct uhd_dpdk_sockarg_udp sockarg = { + .is_tx = false, + .filter_bcast = true, + .local_port = src_port, + .remote_port = dst_port, + .dst_addr = dst_ipv4, + .num_bufs = _num_recv_frames + }; _rx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg); UHD_ASSERT_THROW(_rx_sock != nullptr); // Backfill the local port, in case it was auto-assigned uhd_dpdk_udp_get_info(_rx_sock, &sockarg); - sockarg.is_tx = true; + sockarg.is_tx = true; + sockarg.num_bufs = _num_send_frames; sockarg.remote_port = dst_port; - sockarg.dst_addr = dst_ipv4; + sockarg.dst_addr = dst_ipv4; _tx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg); UHD_ASSERT_THROW(_tx_sock != nullptr); @@ -277,13 +369,12 @@ public: _mrb_pool.push(new dpdk_zero_copy_mrb(_rx_sock, _mrb_pool)); } for (size_t i = 0; i < _num_send_frames; i++) { - _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool)); + _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool, _send_frame_size)); } - UHD_LOG_TRACE("DPDK", - "Created transports between " << addr << ":" << remote_port << " and NIC(" - << dpdk_port_id - << "):" << ntohs(sockarg.local_port)); + UHD_LOG_TRACE("DPDK", "Created transports between " << addr << ":" + << remote_port << " and NIC(" << dpdk_port_id + << "):" << ntohs(sockarg.local_port)); } ~dpdk_zero_copy_impl(void) @@ -292,23 +383,18 @@ public: size_t count; uhd_dpdk_udp_get_info(_rx_sock, &sockarg); uhd_dpdk_get_drop_count(_rx_sock, &count); - UHD_LOG_TRACE("DPDK", - "Closing transports between " << sockarg.dst_addr << ":" - << ntohs(sockarg.remote_port) - << " and local:" << ntohs(sockarg.local_port)); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << " Dropped " << count << " packets"); + UHD_LOG_TRACE("DPDK", "Closing transports between " << sockarg.dst_addr << ":" + << ntohs(sockarg.remote_port) << " and local:" + << ntohs(sockarg.local_port)); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << " Dropped "<< count << " packets"); uhd_dpdk_get_xfer_count(_rx_sock, &count); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << " Received " << count << " packets"); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << "RX empty count is " << _rx_empty_count); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << "TX empty count is " << _tx_empty_count); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << " Received "<< count << " packets"); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << "RX empty count is " << _rx_empty_count); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << "TX empty count is " << _tx_empty_count); uhd_dpdk_sock_close(_rx_sock); uhd_dpdk_sock_close(_tx_sock); } @@ -320,7 +406,7 @@ public: return managed_recv_buffer::sptr(); } - dpdk_zero_copy_mrb* mrb = _mrb_pool.top(); + dpdk_zero_copy_mrb *mrb = _mrb_pool.top(); _mrb_pool.pop(); managed_recv_buffer::sptr buff = mrb->get_new(timeout); if (!buff) @@ -345,7 +431,7 @@ public: return managed_send_buffer::sptr(); } - dpdk_zero_copy_msb* msb = _msb_pool.top(); + dpdk_zero_copy_msb *msb = _msb_pool.top(); _msb_pool.pop(); managed_send_buffer::sptr buff = msb->get_new(timeout); if (!buff) @@ -367,18 +453,18 @@ public: { struct uhd_dpdk_sockarg_udp sockarg; int status = uhd_dpdk_udp_get_info(_rx_sock, &sockarg); + UHD_ASSERT_THROW(status == 0); return ntohs(sockarg.local_port); } std::string get_local_addr(void) const { - uint32_t ipv4_addr; - int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr, NULL); - auto retval = std::to_string(ipv4_addr >> 0 & 0xff) + "." - + std::to_string(ipv4_addr >> 8 & 0xff) + "." - + std::to_string(ipv4_addr >> 16 & 0xff) + "." - + std::to_string(ipv4_addr >> 24 & 0xff); - return retval; + struct in_addr ipv4_addr; + int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, NULL); + UHD_ASSERT_THROW(status == 0); + char addr_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); } uint32_t get_drop_count(void) const @@ -387,10 +473,9 @@ public: uhd_dpdk_get_drop_count(_rx_sock, &drop_count); return drop_count; } - private: - struct uhd_dpdk_socket* _rx_sock; - struct uhd_dpdk_socket* _tx_sock; + struct uhd_dpdk_socket *_rx_sock; + struct uhd_dpdk_socket *_tx_sock; const size_t _num_send_frames; const size_t _send_frame_size; const size_t _num_recv_frames; @@ -399,20 +484,22 @@ private: unsigned int _rx_empty_count; unsigned int _tx_empty_count; - std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> _mrb_pool; - std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>> _msb_pool; + std::stack<dpdk_zero_copy_mrb *, std::vector<dpdk_zero_copy_mrb *>> _mrb_pool; + std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> _msb_pool; }; -dpdk_zero_copy::sptr dpdk_zero_copy::make(struct uhd_dpdk_ctx& ctx, +dpdk_zero_copy::sptr dpdk_zero_copy::make( + const struct uhd_dpdk_ctx &ctx, const unsigned int dpdk_port_id, - const std::string& addr, - const std::string& remote_port, - const std::string& local_port, - const zero_copy_xport_params& default_buff_args, - const device_addr_t& hints) + const std::string &addr, + const std::string &remote_port, + const std::string &local_port, + const zero_copy_xport_params &default_buff_args, + const device_addr_t &/*hints*/) { - return dpdk_zero_copy::sptr(new dpdk_zero_copy_impl( - ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args)); + return dpdk_zero_copy::sptr( + new dpdk_zero_copy_impl(ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args) + ); } }} // namespace uhd::transport diff --git a/host/lib/transport/dpdk_zero_copy.hpp b/host/lib/transport/dpdk_zero_copy.hpp index f90e73d0e..8dcce6eee 100644 --- a/host/lib/transport/dpdk_zero_copy.hpp +++ b/host/lib/transport/dpdk_zero_copy.hpp @@ -7,95 +7,31 @@ #ifndef DPDK_ZERO_COPY_HPP #define DPDK_ZERO_COPY_HPP -#include <uhd/config.hpp> -#include <uhd/transport/zero_copy.hpp> +#include <uhdlib/transport/dpdk_common.hpp> #include <uhd/types/device_addr.hpp> -#include <uhd/utils/log.hpp> -#include <uhd/utils/static.hpp> +#include <uhd/transport/zero_copy.hpp> #include <boost/shared_ptr.hpp> -#include <mutex> #include <string> -#include <vector> namespace uhd { namespace transport { -class uhd_dpdk_ctx : boost::noncopyable -{ -public: - UHD_SINGLETON_FCN(uhd_dpdk_ctx, get); - - ~uhd_dpdk_ctx(void); - - /*! - * Initialize uhd-dpdk (and do only once) - * \param eal_args Arguments to pass to DPDK rte_eal_init() function - * \param num_ports Size of port_thread_mapping array (also number in use) - * \param port_thread_mapping Map NICs to threads: index=port, value=thread - * \param num_mbufs Number of packet buffers for each port's memory pool - * \param mbuf_cache_size Size of per-core packet buffer cache from mempool - * \param mtu MTU of NIC ports - */ - void init(const dict<std::string, std::string>& eal_args, - unsigned int num_ports, - int* port_thread_mapping, - int num_mbufs, - int mbuf_cache_size, - size_t mtu); - - /*! - * Get port ID from provided MAC address - * \param mac_addr MAC address - * \param port_id Int to write ID of port corresponding to MAC address - * \return 0 if match found, else no match - */ - int get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& port_id); - - /*! - * Get port ID for routing packet destined for given address - * \param addr Destination address - * \return port ID from routing table - */ - int get_route(const std::string& addr) const; - - /*! - * Set IPv4 address and subnet mask of given NIC port - * Not thread-safe. Should only be written before ports are in use. - * \param port_id NIC port ID - * \param ipv4_addr IPv4 address to write - * \param netmask Subnet mask identifying network number in ipv4_addr - * \return 0 if successful, else error - */ - int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask); - - /*! - * \return whether init() has been called - */ - bool is_init_done(void); - -private: - uhd_dpdk_ctx(void); - - size_t _mtu; - std::mutex _init_mutex; - std::atomic<bool> _init_done; -}; - /*! * A zero copy transport interface to the dpdk DMA library. */ -class dpdk_zero_copy : public virtual zero_copy_if -{ +class dpdk_zero_copy : public virtual zero_copy_if { public: typedef boost::shared_ptr<dpdk_zero_copy> sptr; - static sptr make(struct uhd_dpdk_ctx& ctx, + static sptr make( + const struct uhd_dpdk_ctx &ctx, const unsigned int dpdk_port_id, - const std::string& addr, - const std::string& remote_port, - const std::string& local_port, /* 0 = auto-assign */ - const zero_copy_xport_params& default_buff_args, - const device_addr_t& hints); + const std::string &addr, + const std::string &remote_port, + const std::string &local_port, /* 0 = auto-assign */ + const zero_copy_xport_params &default_buff_args, + const device_addr_t &hints + ); virtual uint16_t get_local_port(void) const = 0; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c index 1be6b2335..09897eb11 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -34,6 +34,20 @@ int uhd_dpdk_port_count(void) return ctx->num_ports; } +int uhd_dpdk_port_link_status(unsigned int portid) +{ + if (!ctx) + return -ENODEV; + + struct uhd_dpdk_port *p = find_port(portid); + if (p) { + struct rte_eth_link link; + rte_eth_link_get_nowait(p->id, &link); + return link.link_status; + } + return -ENODEV; +} + struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) { struct eth_addr retval; @@ -43,6 +57,7 @@ struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) if (p) { memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN); } + return retval; } @@ -211,19 +226,12 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lco return 0; } - -int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, - int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, - int mtu) +int uhd_dpdk_init(int argc, const char **argv) { /* Init context only once */ if (ctx) return 1; - if ((num_ports == 0) || (port_thread_mapping == NULL)) { - return -EINVAL; - } - /* Grabs arguments intended for DPDK's EAL */ int ret = rte_eal_init(argc, (char **) argv); if (ret < 0) @@ -241,8 +249,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, ctx->num_ports = rte_eth_dev_count(); if (ctx->num_ports < 1) rte_exit(EXIT_FAILURE, "Error: Found no ports\n"); - if (ctx->num_ports < num_ports) - rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); /* Get memory for thread and port data structures */ ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0); @@ -252,6 +258,28 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, if (!ctx->ports) rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n"); + for (size_t i = 0; i < ctx->num_ports; i++) { + struct uhd_dpdk_port *port = &ctx->ports[i]; + port->id = i; + rte_eth_macaddr_get(port->id, &port->mac_addr); + } + + return 0; +} + +int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping, + int num_mbufs, int mbuf_cache_size, int mtu) +{ + if (!ctx) + return -EIO; + + if ((num_ports == 0) || (port_thread_mapping == NULL)) { + return -EINVAL; + } + + if (ctx->num_ports < num_ports) + rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); + /* Initialize the thread data structures */ for (int i = rte_get_next_lcore(-1, 1, 0); (i < RTE_MAX_LCORE); @@ -306,7 +334,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); struct uhd_dpdk_port *port = &ctx->ports[i]; - port->id = i; port->parent = &ctx->threads[thread_id]; ctx->threads[thread_id].num_ports++; LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry); diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h index d497e5d5a..6f43ae1cf 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h @@ -24,7 +24,7 @@ #define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT #define UHD_DPDK_TXQ_SIZE 64 #define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1) -#define UHD_DPDK_RXQ_SIZE 64 +#define UHD_DPDK_RXQ_SIZE 128 #define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1) struct uhd_dpdk_port; @@ -256,13 +256,10 @@ static inline struct uhd_dpdk_port * find_port(unsigned int portid) if (!ctx) return NULL; - for (unsigned int i = 0; i < ctx->num_threads; i++) { - struct uhd_dpdk_thread *t = &ctx->threads[i]; - struct uhd_dpdk_port *p; - LIST_FOREACH(p, &t->port_list, port_entry) { - if (p->id == portid) { - return p; - } + for (unsigned int i = 0; i < ctx->num_ports; i++) { + struct uhd_dpdk_port *p = &ctx->ports[i]; + if (p->id == portid) { + return p; } } return NULL; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c index f603f1f8f..61ed836df 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -43,7 +43,6 @@ int _uhd_dpdk_arp_reply(struct uhd_dpdk_port *port, struct arp_hdr *arp_req) mbuf->pkt_len = 42; mbuf->data_len = 42; - mbuf->ol_flags = PKT_TX_IP_CKSUM; if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); @@ -125,7 +124,6 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) mbuf->pkt_len = 42; mbuf->data_len = 42; - mbuf->ol_flags = PKT_TX_IP_CKSUM; if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); @@ -135,7 +133,8 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) return 0; } -int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt) +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct udp_hdr *pkt, bool bcast) { int status = 0; struct uhd_dpdk_ipv4_5tuple ht_key = { @@ -155,6 +154,10 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str } struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv; + if (bcast && pdata->filter_bcast) { + // Filter broadcast packets if not listening + goto udp_rx_drop; + } status = rte_ring_enqueue(entry->sock->rx_ring, mbuf); if (entry->waiter) { _uhd_dpdk_waiter_wake(entry->waiter, port->parent); @@ -172,14 +175,16 @@ udp_rx_drop: return status; } -int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt) +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct ipv4_hdr *pkt) { - if (pkt->dst_addr != port->ipv4_addr) { + bool bcast = is_broadcast(port, pkt->dst_addr); + if (pkt->dst_addr != port->ipv4_addr && !bcast) { rte_pktmbuf_free(mbuf); return -ENODEV; } if (pkt->next_proto_id == 0x11) { - return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]); + return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1], bcast); } rte_pktmbuf_free(mbuf); return -EINVAL; @@ -417,7 +422,7 @@ static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port) rte_pktmbuf_free(bufs[buf]); break; case ETHER_TYPE_IPv4: - if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + if (ol_flags & PKT_RX_IP_CKSUM_BAD) { RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); } else { _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h index b0d3e42cd..f94a678ba 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h @@ -20,7 +20,8 @@ static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_ad int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame); -int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt); +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct udp_hdr *pkt, bool bcast); int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt); int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port, struct uhd_dpdk_socket *sock, diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c index 6ea77b930..4fc375b77 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -19,7 +19,8 @@ * I/O thread ONLY */ -static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue) +static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, + struct uhd_dpdk_tx_queue **queue, size_t num_bufs) { *queue = NULL; struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); @@ -31,25 +32,25 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk LIST_INIT(&q->tx_list); char name[32]; - snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid); + snprintf(name, sizeof(name), "tx_q%u.%0lx", port->id, (unsigned long) q); q->queue = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); - snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid); + snprintf(name, sizeof(name), "free_q%u.%0lx", port->id, (unsigned long) q); q->freebufs = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); /* Set up retry queue */ - snprintf(name, sizeof(name), "retry_queue_%u", port->id); + snprintf(name, sizeof(name), "redo_q%u.%0lx", port->id, (unsigned long) q); q->retry_queue = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); @@ -65,24 +66,39 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk rte_free(q); return -ENOMEM; } - struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; - unsigned int num_bufs = rte_ring_free_count(q->freebufs); - int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); - if (buf_stat) { - rte_ring_free(q->freebufs); - rte_ring_free(q->queue); - rte_ring_free(q->retry_queue); - rte_free(q); - RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); - return -ENOENT; - } - unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); - if (enqd != num_bufs) { - RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); - } + + do { + struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; + num_bufs = rte_ring_free_count(q->freebufs); + if (num_bufs > 0) { + num_bufs = num_bufs > UHD_DPDK_TXQ_SIZE ? UHD_DPDK_TXQ_SIZE : num_bufs; + int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); + if (buf_stat) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); + goto unwind_txq; + } + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); + goto unwind_txq; + } + } + } while (num_bufs > 0); LIST_INSERT_HEAD(&port->txq_list, q, entry); *queue = q; return 0; + +unwind_txq: + while (!rte_ring_empty(q->freebufs)) { + struct rte_mbuf *buf; + if (rte_ring_dequeue(q->freebufs, (void **) &buf) == 0) + rte_free(buf); + } + rte_ring_free(q->freebufs); + rte_ring_free(q->queue); + rte_ring_free(q->retry_queue); + rte_free(q); + return -ENOENT; } /* Finish setting up UDP socket (unless ARP needs to be done) @@ -134,11 +150,14 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) return -EADDRINUSE; } + size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_RX_BURST_SIZE + 1)) ? + UHD_DPDK_RX_BURST_SIZE + 1 : pdata->xferd_pkts; + pdata->xferd_pkts = 0; char name[32]; snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port)); sock->rx_ring = rte_ring_create( name, - UHD_DPDK_RXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); @@ -172,6 +191,9 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) /* Are we doing TX? */ if (sock->tx_queue) { + size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_TX_BURST_SIZE + 1)) ? + UHD_DPDK_TX_BURST_SIZE + 1 : pdata->xferd_pkts; + pdata->xferd_pkts = 0; sock->tx_queue = NULL; struct uhd_dpdk_tx_queue *q = NULL; // FIXME Not sharing txq across all thread's sockets for now @@ -184,7 +206,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) // } //} if (!sock->tx_queue) { - retval = _alloc_txq(port, sock->tid, &q); + retval = _alloc_txq(port, sock->tid, &q, num_bufs); if (retval) { _uhd_dpdk_config_req_compl(req, retval); return retval; @@ -385,10 +407,13 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, data->src_port = arg->local_port; data->dst_port = arg->remote_port; sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock; + data->xferd_pkts = arg->num_bufs; } else { data->src_port = arg->remote_port; data->dst_port = arg->local_port; sock->rx_ring = (struct rte_ring *) sock; + data->xferd_pkts = arg->num_bufs; + data->filter_bcast = arg->filter_bcast; } /* TODO: Add support for I/O thread calling (skip locking and sleep) */ @@ -433,6 +458,7 @@ static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port, ip_hdr->time_to_live = 64; ip_hdr->next_proto_id = proto_id; ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */ + mbuf->ol_flags |= PKT_TX_IP_CKSUM; ip_hdr->src_addr = port->ipv4_addr; ip_hdr->dst_addr = dst_ipv4_addr; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h index 39fcb8597..d7ca5609b 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h @@ -15,6 +15,7 @@ struct uhd_dpdk_udp_priv { uint32_t dst_ipv4_addr; size_t dropped_pkts; size_t xferd_pkts; + bool filter_bcast; /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */ //struct uhd_dpdk_arp_entry *arp_entry; }; diff --git a/host/lib/usrp/mpmd/CMakeLists.txt b/host/lib/usrp/mpmd/CMakeLists.txt index 774ad6593..67e08fc91 100644 --- a/host/lib/usrp/mpmd/CMakeLists.txt +++ b/host/lib/usrp/mpmd/CMakeLists.txt @@ -10,6 +10,11 @@ if(ENABLE_MPMD) add_definitions(-DHAVE_LIBERIO) endif(ENABLE_LIBERIO) + if(ENABLE_DPDK) + message(STATUS "Compiling MPMD with DPDK support...") + add_definitions(-DHAVE_DPDK) + endif(ENABLE_DPDK) + LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_find.cpp ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_image_loader.cpp @@ -27,4 +32,10 @@ if(ENABLE_MPMD) ) endif(ENABLE_LIBERIO) + if(ENABLE_DPDK) + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_xport_ctrl_dpdk_udp.cpp + ) + endif(ENABLE_DPDK) + endif(ENABLE_MPMD) diff --git a/host/lib/usrp/mpmd/mpmd_find.cpp b/host/lib/usrp/mpmd/mpmd_find.cpp index 5d2406b30..2b8e1350d 100644 --- a/host/lib/usrp/mpmd/mpmd_find.cpp +++ b/host/lib/usrp/mpmd/mpmd_find.cpp @@ -8,6 +8,7 @@ #include "mpmd_devices.hpp" #include "mpmd_impl.hpp" +#include <uhdlib/transport/dpdk_common.hpp> #include <uhd/transport/if_addrs.hpp> #include <uhd/transport/udp_simple.hpp> #include <uhd/types/device_addr.hpp> @@ -193,6 +194,15 @@ device_addrs_t mpmd_find_with_bcast(const device_addr_t& hint) */ device_addrs_t mpmd_find(const device_addr_t& hint_) { +#ifdef HAVE_DPDK + // Start DPDK so links come up + if (hint_.has_key("use_dpdk")) { + auto& dpdk_ctx = uhd::transport::uhd_dpdk_ctx::get(); + if (not dpdk_ctx.is_init_done()) { + dpdk_ctx.init(hint_); + } + } +#endif device_addrs_t hints = separate_device_addr(hint_); if (hint_.has_key("type")) { if (std::find(MPM_DEVICE_TYPES.cbegin(), MPM_DEVICE_TYPES.cend(), hint_["type"]) diff --git a/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp new file mode 100644 index 000000000..f739942b4 --- /dev/null +++ b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp @@ -0,0 +1,266 @@ +// +// Copyright 2017 Ettus Research, National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "mpmd_impl.hpp" +#include "mpmd_xport_mgr.hpp" +#include "mpmd_xport_ctrl_dpdk_udp.hpp" +#include "../../transport/dpdk_zero_copy.hpp" +#include <uhd/transport/udp_simple.hpp> +#include <uhd/transport/udp_constants.hpp> +#include <arpa/inet.h> + + +using namespace uhd; +using namespace uhd::mpmd::xport; + +namespace { +constexpr unsigned int MPMD_UDP_RESERVED_FRAME_SIZE = 64; + +//! Maximum CHDR packet size in bytes +const size_t MPMD_10GE_DATA_FRAME_MAX_SIZE = 4000; + +//! Number of send/recv frames +const size_t MPMD_ETH_NUM_SEND_FRAMES = 32; +const size_t MPMD_ETH_NUM_RECV_FRAMES = 128; +const size_t MPMD_ETH_NUM_CTRL_FRAMES = 32; + +//! For MTU discovery, the time we wait for a packet before calling it +// oversized (seconds). +const double MPMD_MTU_DISCOVERY_TIMEOUT = 0.02; + +std::vector<std::string> get_addrs_from_mb_args( + const uhd::device_addr_t& mb_args +) { + // mb_args must always include addr + if (not mb_args.has_key(FIRST_ADDR_KEY)) { + throw uhd::runtime_error("The " + FIRST_ADDR_KEY + " key must be specified in " + "device args to create an Ethernet transport to an RFNoC block"); + } + std::vector<std::string> addrs{mb_args[FIRST_ADDR_KEY]}; + if (mb_args.has_key(SECOND_ADDR_KEY)){ + addrs.push_back(mb_args[SECOND_ADDR_KEY]); + } + return addrs; +} + +/*! Do a binary search to discover MTU + * + * Uses the MPM echo service to figure out MTU. We simply send a bunch of + * packets and see if they come back until we converged on the path MTU. + * The end result must lie between \p min_frame_size and \p max_frame_size. + * + * \param address IP address + * \param port UDP port + * \param min_frame_size Minimum frame size, initialize algorithm to start + * with this value + * \param max_frame_size Maximum frame size, initialize algorithm to start + * with this value + * \param echo_timeout Timeout value in seconds. For frame sizes that + * exceed the MTU, we don't expect a response, and this + * is the amount of time we'll wait before we assume + * the frame size exceeds the MTU. + */ +size_t discover_mtu( + const std::string &address, + const std::string &port, + size_t min_frame_size, + size_t max_frame_size, + const double echo_timeout = 0.020 +) { + const auto &ctx = uhd::transport::uhd_dpdk_ctx::get(); + const size_t echo_prefix_offset = + uhd::mpmd::mpmd_impl::MPM_ECHO_CMD.size(); + const size_t mtu_hdr_len = echo_prefix_offset + 10; + const int port_id = ctx.get_route(address); + UHD_ASSERT_THROW(port_id >= 0); + UHD_ASSERT_THROW(min_frame_size < max_frame_size); + UHD_ASSERT_THROW(min_frame_size % 4 == 0); + UHD_ASSERT_THROW(max_frame_size % 4 == 0); + UHD_ASSERT_THROW(min_frame_size >= echo_prefix_offset + mtu_hdr_len); + using namespace uhd::transport; + uhd::transport::zero_copy_xport_params buff_args; + buff_args.recv_frame_size = max_frame_size; + buff_args.send_frame_size = max_frame_size; + buff_args.num_send_frames = 1; + buff_args.num_recv_frames = 1; + auto dev_addr = uhd::device_addr_t(); + dpdk_zero_copy::sptr sock = dpdk_zero_copy::make(ctx, + (unsigned int) port_id, address, port, "0", buff_args, dev_addr); + std::string send_buf(uhd::mpmd::mpmd_impl::MPM_ECHO_CMD); + send_buf.resize(max_frame_size, '#'); + UHD_ASSERT_THROW(send_buf.size() == max_frame_size); + + // Little helper to check returned packets match the sent ones + auto require_bufs_match = [&send_buf, mtu_hdr_len]( + const uint8_t *recv_buf, + const size_t len + ){ + if (len < mtu_hdr_len or std::memcmp( + (void *) &recv_buf[0], + (void *) &send_buf[0], + mtu_hdr_len + ) != 0) { + throw uhd::runtime_error("Unexpected content of MTU " + "discovery return packet!"); + } + }; + UHD_LOG_TRACE("MPMD", "Determining UDP MTU... "); + size_t seq_no = 0; + while (min_frame_size < max_frame_size) { + managed_send_buffer::sptr msbuf = sock->get_send_buff(0); + UHD_ASSERT_THROW(msbuf.get() != nullptr); + max_frame_size = std::min(msbuf->size(), max_frame_size); + // Only test multiples of 4 bytes! + const size_t test_frame_size = + (max_frame_size/2 + min_frame_size/2 + 3) & ~size_t(3); + // Encode sequence number and current size in the string, makes it + // easy to debug in code or Wireshark. Is also used for identifying + // response packets. + std::sprintf( + &send_buf[echo_prefix_offset], + ";%04lu,%04lu", + seq_no++, + test_frame_size + ); + // Copy to real buffer + UHD_LOG_TRACE("MPMD", "Testing frame size " << test_frame_size); + auto *tx_buf = msbuf->cast<uint8_t *>(); + std::memcpy(tx_buf, &send_buf[0], test_frame_size); + msbuf->commit(test_frame_size); + msbuf.reset(); + + managed_recv_buffer::sptr mrbuf = sock->get_recv_buff(echo_timeout); + if (mrbuf.get() == nullptr || mrbuf->size() == 0) { + // Nothing received, so this is probably too big + max_frame_size = test_frame_size - 4; + } else if (mrbuf->size() >= test_frame_size) { + // Size went through, so bump the minimum + require_bufs_match(mrbuf->cast<uint8_t *>(), mrbuf->size()); + min_frame_size = test_frame_size; + } else if (mrbuf->size() < test_frame_size) { + // This is an odd case. Something must have snipped the packet + // on the way back. Still, we'll just back off and try + // something smaller. + UHD_LOG_DEBUG("MPMD", + "Unexpected packet truncation during MTU discovery."); + require_bufs_match(mrbuf->cast<uint8_t *>(), mrbuf->size()); + max_frame_size = mrbuf->size(); + } + mrbuf.reset(); + } + UHD_LOG_DEBUG("MPMD", + "Path MTU for address " << address << ": " << min_frame_size); + return min_frame_size; +} + +} + + +mpmd_xport_ctrl_dpdk_udp::mpmd_xport_ctrl_dpdk_udp( + const uhd::device_addr_t& mb_args +) : _mb_args(mb_args) + , _ctx(uhd::transport::uhd_dpdk_ctx::get()) + , _recv_args(filter_args(mb_args, "recv")) + , _send_args(filter_args(mb_args, "send")) + , _available_addrs(get_addrs_from_mb_args(mb_args)) + , _mtu(MPMD_10GE_DATA_FRAME_MAX_SIZE) +{ + if (not _ctx.is_init_done()) { + _ctx.init(mb_args); + } + const std::string mpm_discovery_port = _mb_args.get( + mpmd_impl::MPM_DISCOVERY_PORT_KEY, + std::to_string(mpmd_impl::MPM_DISCOVERY_PORT) + ); + auto discover_mtu_for_ip = [mpm_discovery_port](const std::string &ip_addr){ + return discover_mtu( + ip_addr, + mpm_discovery_port, + IP_PROTOCOL_MIN_MTU_SIZE-IP_PROTOCOL_UDP_PLUS_IP_HEADER, + MPMD_10GE_DATA_FRAME_MAX_SIZE, + MPMD_MTU_DISCOVERY_TIMEOUT + ); + }; + + for (const auto &ip_addr : _available_addrs) { + _mtu = std::min(_mtu, discover_mtu_for_ip(ip_addr)); + } +} + +uhd::both_xports_t +mpmd_xport_ctrl_dpdk_udp::make_transport( + mpmd_xport_mgr::xport_info_t &xport_info, + const usrp::device3_impl::xport_type_t xport_type, + const uhd::device_addr_t& xport_args_ +) { + auto xport_args = xport_args_; + + transport::zero_copy_xport_params default_buff_args; + // Create actual UHD-DPDK UDP transport + default_buff_args.recv_frame_size = + xport_args.cast<size_t>("recv_frame_size", get_mtu(uhd::RX_DIRECTION)); + default_buff_args.send_frame_size = + xport_args.cast<size_t>("send_frame_size", get_mtu(uhd::TX_DIRECTION)); + if (xport_type == usrp::device3_impl::ASYNC_MSG or + xport_type == usrp::device3_impl::CTRL) { + default_buff_args.num_recv_frames = + xport_args.cast<size_t>("num_recv_frames", MPMD_ETH_NUM_CTRL_FRAMES); + default_buff_args.num_send_frames = + xport_args.cast<size_t>("num_send_frames", MPMD_ETH_NUM_CTRL_FRAMES); + } else { + default_buff_args.num_recv_frames = + xport_args.cast<size_t>("num_recv_frames", MPMD_ETH_NUM_RECV_FRAMES); + default_buff_args.num_send_frames = + xport_args.cast<size_t>("num_send_frames", MPMD_ETH_NUM_SEND_FRAMES); + } + + UHD_LOG_TRACE("BUFF", "num_recv_frames=" << default_buff_args.num_recv_frames + << ", num_send_frames=" << default_buff_args.num_send_frames + << ", recv_frame_size=" << default_buff_args.recv_frame_size + << ", send_frame_size=" << default_buff_args.send_frame_size); + + int dpdk_port_id = _ctx.get_route(xport_info["ipv4"]); + if (dpdk_port_id < 0) { + throw uhd::runtime_error("Could not find a DPDK port with route to " + + xport_info["ipv4"]); + } + auto recv = transport::dpdk_zero_copy::make( + _ctx, + (const unsigned int) dpdk_port_id, + xport_info["ipv4"], + xport_info["port"], + "0", + default_buff_args, + xport_args + ); + const uint16_t port = recv->get_local_port(); + const std::string src_ip_addr = recv->get_local_addr(); + xport_info["src_port"] = std::to_string(port); + xport_info["src_ipv4"] = src_ip_addr; + + // Create both_xports_t object and finish: + both_xports_t xports; + xports.endianness = uhd::ENDIANNESS_BIG; + xports.send_sid = sid_t(xport_info["send_sid"]); + xports.recv_sid = xports.send_sid.reversed(); + xports.recv_buff_size = (default_buff_args.recv_frame_size-MPMD_UDP_RESERVED_FRAME_SIZE)*default_buff_args.num_recv_frames; + xports.send_buff_size = (default_buff_args.send_frame_size-MPMD_UDP_RESERVED_FRAME_SIZE)*default_buff_args.num_send_frames; + xports.recv = recv; // Note: This is a type cast! + xports.send = recv; // This too + return xports; +} + +bool mpmd_xport_ctrl_dpdk_udp::is_valid( + const mpmd_xport_mgr::xport_info_t& xport_info +) const { + int dpdk_port_id = _ctx.get_route(xport_info.at("ipv4")); + return (dpdk_port_id >= 0); +} + +size_t mpmd_xport_ctrl_dpdk_udp::get_mtu(const uhd::direction_t /*dir*/) const +{ + return _mtu; +} diff --git a/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp new file mode 100644 index 000000000..1ee1549e0 --- /dev/null +++ b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp @@ -0,0 +1,55 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_MPMD_XPORT_CTRL_DPDK_UDP_HPP +#define INCLUDED_MPMD_XPORT_CTRL_DPDK_UDP_HPP + +#include "mpmd_xport_ctrl_base.hpp" +#include "../transport/dpdk_zero_copy.hpp" +#include <uhd/types/device_addr.hpp> +#include "../device3/device3_impl.hpp" + +namespace uhd { namespace mpmd { namespace xport { + +/*! UDP transport manager + * + * Opens UDP sockets + */ +class mpmd_xport_ctrl_dpdk_udp : public mpmd_xport_ctrl_base +{ +public: + mpmd_xport_ctrl_dpdk_udp( + const uhd::device_addr_t& mb_args + ); + + both_xports_t make_transport( + mpmd_xport_mgr::xport_info_t& xport_info, + const usrp::device3_impl::xport_type_t xport_type, + const uhd::device_addr_t& xport_args + ); + + bool is_valid( + const mpmd_xport_mgr::xport_info_t& xport_info + ) const; + + size_t get_mtu( + const uhd::direction_t dir + ) const; + +private: + const uhd::device_addr_t _mb_args; + uhd::transport::uhd_dpdk_ctx &_ctx; + const uhd::dict<std::string, std::string> _recv_args; + const uhd::dict<std::string, std::string> _send_args; + //! A list of IP addresses we can connect our CHDR connections to + const std::vector<std::string> _available_addrs; + //! MTU + size_t _mtu; +}; + +}}} /* namespace uhd::mpmd::xport */ + +#endif /* INCLUDED_MPMD_XPORT_CTRL_DPDK_UDP_HPP */ diff --git a/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp b/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp index c2200c66a..d3023e3af 100644 --- a/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp +++ b/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp @@ -11,6 +11,9 @@ #ifdef HAVE_LIBERIO # include "mpmd_xport_ctrl_liberio.hpp" #endif +#ifdef HAVE_DPDK +# include "mpmd_xport_ctrl_dpdk_udp.hpp" +#endif uhd::dict<std::string, std::string> uhd::mpmd::xport::filter_args( const uhd::device_addr_t& args, const std::string& prefix) @@ -114,6 +117,11 @@ private: const std::string& xport_medium, const uhd::device_addr_t& mb_args) const { if (xport_medium == "UDP") { +#ifdef HAVE_DPDK + if (mb_args.has_key("use_dpdk")) { + return mpmd_xport_ctrl_base::uptr(new mpmd_xport_ctrl_dpdk_udp(mb_args)); + } +#endif return mpmd_xport_ctrl_base::uptr(new mpmd_xport_ctrl_udp(mb_args)); #ifdef HAVE_LIBERIO } else if (xport_medium == "liberio") { diff --git a/host/lib/utils/prefs.cpp b/host/lib/utils/prefs.cpp index 2ccc538fc..88be300cb 100644 --- a/host/lib/utils/prefs.cpp +++ b/host/lib/utils/prefs.cpp @@ -48,6 +48,23 @@ namespace { uhd::prefs::get_uhd_config().get<std::string>(key_str, key); } } + + device_addr_t get_args( + const uhd::device_addr_t& user_args, + const std::vector<std::string>& keys_to_update_from + ) { + device_addr_t args; + for (const auto& key : keys_to_update_from) { + update_from_key(key, user_args.get(key, ""), args); + } + + // Finally, copy over the original user args: + for (const auto& user_key : user_args.keys()) { + args[user_key] = user_args[user_key]; + } + + return args; + } } config_parser& uhd::prefs::get_uhd_config() @@ -81,22 +98,28 @@ config_parser& uhd::prefs::get_uhd_config() device_addr_t uhd::prefs::get_usrp_args( const uhd::device_addr_t &user_args ) { - device_addr_t usrp_args; const std::vector<std::string> keys_to_update_from = { "type", "product", "serial" }; + return get_args(user_args, keys_to_update_from); +} - for (const auto& key : keys_to_update_from) { - update_from_key(key, user_args.get(key, ""), usrp_args); - } - - // Finally, copy over the original user args: - for (const auto &user_key : user_args.keys()) { - usrp_args[user_key] = user_args[user_key]; - } - - return usrp_args; +device_addr_t uhd::prefs::get_dpdk_args( + const uhd::device_addr_t &user_args +) { + const std::vector<std::string> keys_to_update_from = { + "use_dpdk" + }; + return get_args(user_args, keys_to_update_from); } +device_addr_t uhd::prefs::get_dpdk_nic_args( + const uhd::device_addr_t &user_args +) { + const std::vector<std::string> keys_to_update_from = { + "dpdk-mac" + }; + return get_args(user_args, keys_to_update_from); +} diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 9eb48eedb..01076540d 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -113,6 +113,10 @@ if(ENABLE_DPDK) include_directories(${DPDK_INCLUDE_DIR}) add_executable(dpdk_test dpdk_test.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp ${CMAKE_SOURCE_DIR}/lib/transport/dpdk_zero_copy.cpp ) target_link_libraries(dpdk_test uhd ${Boost_LIBRARIES} ${DPDK_LIBRARIES}) diff --git a/host/tests/dpdk_test.cpp b/host/tests/dpdk_test.cpp index 43ef3d388..b506c7b30 100644 --- a/host/tests/dpdk_test.cpp +++ b/host/tests/dpdk_test.cpp @@ -43,7 +43,7 @@ struct dpdk_test_args std::string src_port; std::string dst_ip; std::string dst_port; - pthread_cond_t* cond; + pthread_cond_t *cond; pthread_mutex_t mutex; bool started; int cpu; @@ -61,7 +61,7 @@ struct dpdk_test_stats }; -static void process_udp(int id, uint32_t* udp_data, struct dpdk_test_stats* stats) +static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stats) { if (udp_data[0] != stats[id].last_seqno + 1) { stats[id].lasts[stats[id].dropped_packets & 0xf] = stats[id].last_seqno; @@ -76,21 +76,21 @@ static void process_udp(int id, uint32_t* udp_data, struct dpdk_test_stats* stat static void send_udp(uhd::transport::dpdk_zero_copy::sptr& stream, int id, bool fc_only, - struct dpdk_test_stats* stats) + struct dpdk_test_stats *stats) { uhd::transport::managed_send_buffer::sptr mbuf = stream->get_send_buff(0); if (mbuf.get() == nullptr) { printf("Could not get TX buffer!\n"); return; } - auto* tx_data = mbuf->cast<uint32_t*>(); + auto *tx_data = mbuf->cast<uint32_t *>(); tx_data[0] = fc_only ? stats[id].tx_seqno - 1 : stats[id].tx_seqno; tx_data[1] = stats[id].last_seqno; if (!fc_only) { - memset(&tx_data[2], stats[id].last_seqno, 8 * BENCH_SPP); - stats[id].tx_xfer += 8 * BENCH_SPP; + memset(&tx_data[2], stats[id].last_seqno, 8*BENCH_SPP); + stats[id].tx_xfer += 8*BENCH_SPP; } - size_t num_bytes = 8 + (fc_only ? 0 : 8 * BENCH_SPP); + size_t num_bytes = 8 + (fc_only ? 0 : 8*BENCH_SPP); mbuf->commit(num_bytes); mbuf.reset(); @@ -100,12 +100,11 @@ static void send_udp(uhd::transport::dpdk_zero_copy::sptr& stream, } static void bench( - uhd::transport::dpdk_zero_copy::sptr* stream, uint32_t nb_ports, double timeout) + uhd::transport::dpdk_zero_copy::sptr *stream, uint32_t nb_ports, double timeout) { uint64_t total_xfer[NUM_PORTS]; uint32_t id; - struct dpdk_test_stats* stats = - (struct dpdk_test_stats*)malloc(sizeof(*stats) * nb_ports); + struct dpdk_test_stats *stats = (struct dpdk_test_stats *) malloc(sizeof(*stats)*nb_ports); for (id = 0; id < nb_ports; id++) { stats[id].tx_seqno = 1; stats[id].tx_xfer = 0; @@ -137,7 +136,7 @@ static void bench( if (nb_rx <= 0) { consec_no_rx++; if (consec_no_rx >= 100000) { - uint32_t skt_drops = stream[id]->get_drop_count(); + // uint32_t skt_drops = stream[id]->get_drop_count(); // printf("TX seq %d, TX ack %d, RX seq %d, %d drops!\n", // stats[id].tx_seqno, stats[id].last_ackno, stats[id].last_seqno, // skt_drops); @@ -151,7 +150,7 @@ static void bench( for (unsigned int buf = 0; buf < nb_rx; buf++) { total_xfer[id] += bufs[buf]->size(); - auto data = bufs[buf]->cast<uint32_t*>(); + auto data = bufs[buf]->cast<uint32_t *>(); process_udp(id, data, stats); } @@ -185,15 +184,15 @@ static void bench( printf("Bytes received = %ld\n", total_xfer[id]); printf("Bytes sent = %ld\n", stats[id].tx_xfer); printf("Time taken = %ld us\n", - (bench_end.tv_sec - bench_start.tv_sec) * 1000000 + (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec)); - double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec) * 1000000 - + (bench_end.tv_usec - bench_start.tv_usec); + double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000 + + (bench_end.tv_usec - bench_start.tv_usec); elapsed_time *= 1.0e-6; double elapsed_bytes = total_xfer[id]; - printf("RX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time); + printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); elapsed_bytes = stats[id].tx_xfer; - printf("TX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time); + printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); uint32_t skt_drops = stream[id]->get_drop_count(); printf("Dropped %d packets\n", stats[id].dropped_packets); printf("Socket reports dropped %d packets\n", skt_drops); @@ -221,9 +220,9 @@ static inline void set_cpu(pthread_t t, int cpu) } } -void* prepare_and_bench_blocking(void* arg) +void *prepare_and_bench_blocking(void *arg) { - struct dpdk_test_args* args = (struct dpdk_test_args*)arg; + struct dpdk_test_args *args = (struct dpdk_test_args *) arg; pthread_mutex_lock(&args->mutex); pthread_t t = pthread_self(); set_cpu(t, args->cpu); @@ -236,14 +235,16 @@ void* prepare_and_bench_blocking(void* arg) buff_args.send_frame_size = 8000; buff_args.num_send_frames = 8; buff_args.num_recv_frames = 8; - auto dev_addr = uhd::device_addr_t(); - eth_data[0] = uhd::transport::dpdk_zero_copy::make(ctx, + auto dev_addr = uhd::device_addr_t(); + eth_data[0] = uhd::transport::dpdk_zero_copy::make( + ctx, args->portid, args->dst_ip, args->src_port, args->dst_port, buff_args, - dev_addr); + dev_addr + ); bench(eth_data, 1, 0.1); return 0; @@ -274,29 +275,31 @@ void prepare_and_bench_polling(void) buff_args.num_recv_frames = 8; auto dev_addr = uhd::device_addr_t(); for (unsigned int i = 0; i < NUM_PORTS; i++) { - eth_data[i] = uhd::transport::dpdk_zero_copy::make(ctx, + eth_data[i] = uhd::transport::dpdk_zero_copy::make( + ctx, bench_args[i].portid, bench_args[i].dst_ip, bench_args[i].src_port, bench_args[i].dst_port, buff_args, - dev_addr); + dev_addr + ); } bench(eth_data, NUM_PORTS, 0.0); } -int main(int argc, char** argv) +int main(int argc, char **argv) { - int retval, io0_cpu = 1, io1_cpu = 1, user0_cpu = 0, user1_cpu = 2; + int retval, user0_cpu = 0, user1_cpu = 2; std::string args, cpusets; po::options_description desc("Allowed options"); - desc.add_options()("help", "help message")( - "args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")( - "polling-mode", "Use polling mode (single thread on own core)")("cpusets", - po::value<std::string>(&cpusets)->default_value(""), - "which core(s) to use for a given thread (specify something like " - "\"io0=1,io1=1,user0=0,user1=2\")"); + desc.add_options() + ("help", "help message") + ("args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args") + ("polling-mode", "Use polling mode (single thread on own core)") + ("cpusets", po::value<std::string>(&cpusets)->default_value(""), "which core(s) to use for a given thread (specify something like \"user0=0,user1=2\")") + ; po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); po::notify(vm); @@ -317,20 +320,15 @@ int main(int argc, char** argv) auto cpuset_map = uhd::device_addr_t(cpusets); for (std::string& key : cpuset_map.keys()) { - if (key == "io0") { - io0_cpu = std::stoi(cpuset_map[key], NULL, 0); - } else if (key == "io1") { - io1_cpu = std::stoi(cpuset_map[key], NULL, 0); - } else if (key == "user0") { + if (key == "user0") { user0_cpu = std::stoi(cpuset_map[key], NULL, 0); } else if (key == "user1") { user1_cpu = std::stoi(cpuset_map[key], NULL, 0); } } - int port_thread_mapping[2] = {io0_cpu, io1_cpu}; auto& ctx = uhd::transport::uhd_dpdk_ctx::get(); - ctx.init(dpdk_args, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE, 9000); + ctx.init(dpdk_args); uint32_t eth_ip = htonl(0xc0a80003); uint32_t eth_mask = htonl(0xffffff00); @@ -391,12 +389,12 @@ int main(int argc, char** argv) pthread_cond_broadcast(&cond); - status = pthread_join(threads[0], (void**)&retval); + status = pthread_join(threads[0], (void **) &retval); if (status) { perror("Error while joining thread"); return status; } - status = pthread_join(threads[1], (void**)&retval); + status = pthread_join(threads[1], (void **) &retval); if (status) { perror("Error while joining thread"); return status; |