diff options
author | Alex Williams <alex.williams@ni.com> | 2019-11-04 16:01:20 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:33 -0800 |
commit | f5861562d9fde84d73c381aa51753602a75546f2 (patch) | |
tree | c9c95799e3ce14b6d48da9187384f14f57e5d9ae /host/lib | |
parent | 07e04cd76015d13cbb100fc8eb61dae935fd632b (diff) | |
download | uhd-f5861562d9fde84d73c381aa51753602a75546f2.tar.gz uhd-f5861562d9fde84d73c381aa51753602a75546f2.tar.bz2 uhd-f5861562d9fde84d73c381aa51753602a75546f2.zip |
transport: Add new base for DPDK links, based on 18.11
dpdk_ctx represents the central context and manager of all memory
and threads allocated via the DPDK EAL. In this commit, it parses
the user's arguments, configures all the ports, and brings them up.
dpdk_port represents each DPDK NIC port's configuration, and it
manages the allocation of individual queues and their flow rules.
It also would provide access to an ARP table and functions for
handling ARP requests and responses. The flow rules and ARP
functions are not yet implemented.
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/include/uhdlib/transport/dpdk/common.hpp | 319 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_common.cpp | 635 |
3 files changed, 956 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk/common.hpp b/host/lib/include/uhdlib/transport/dpdk/common.hpp new file mode 100644 index 000000000..1f4466a0f --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/common.hpp @@ -0,0 +1,319 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ + +#include <uhd/config.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhd/utils/noncopyable.hpp> +#include <uhd/utils/static.hpp> +#include <rte_ethdev.h> +#include <rte_ether.h> +#include <rte_flow.h> +#include <rte_mbuf.h> +#include <rte_mempool.h> +#include <rte_version.h> +#include <unordered_map> +#include <array> +#include <atomic> +#include <mutex> +#include <string> + +/* NOTE: There are changes to rte_eth_addr in 19.x */ + +namespace uhd { namespace transport { namespace dpdk { + +using queue_id_t = uint16_t; +using port_id_t = uint16_t; +using ipv4_addr = uint32_t; + +/*! + * Class representing a DPDK NIC port + * + * The dpdk_port object possesses all the data needed to send and receive + * packets between this port and a remote host. A logical link should specify + * which packets are destined for it and allocate a DMA queue with the + * dpdk_port::alloc_queue() function. A logical link should not, however, + * specify ARP packets for its set of received packets. That functionality is + * reserved for the special queue 0. + * + * The logical link can then get the packet buffer pools associated with this + * NIC port and use them to send and receive packets. + * + * dpdk_port objects are _only_ created by the dpdk_ctx. + */ +class dpdk_port +{ +public: + using uptr = std::unique_ptr<dpdk_port>; + + /*! Construct a DPDK NIC port and bring the link up + * + * \param port The port ID + * \param mtu The intended MTU for the port + * \param num_queues Number of DMA queues to reserve for this port + * \param num_mbufs The number of packet buffers per queue + * \param rx_pktbuf_pool A pointer to the port's RX packet buffer pool + * \param tx_pktbuf_pool A pointer to the port's TX packet buffer pool + * \param ipv4_address The IPv4 network address (w/ netmask) + * \return A unique_ptr to a dpdk_port object + */ + static dpdk_port::uptr make(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address); + + dpdk_port(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address); + + /*! Getter for this port's ID + * \return this port's ID + */ + inline port_id_t get_port_id() const + { + return _port; + } + + /*! Getter for this port's MTU + * \return this port's MTU + */ + inline size_t get_mtu() const + { + return _mtu; + } + + /*! Getter for this port's IPv4 address + * \return this port's IPv4 address (in network order) + */ + inline ipv4_addr get_ipv4() const + { + return _ipv4; + } + + /*! Getter for this port's subnet mask + * \return this port's subnet mask (in network order) + */ + inline ipv4_addr get_netmask() const + { + return _netmask; + } + + /*! Getter for this port's total DMA queue count, including initialized, + * but unallocated queues + * + * \return The number of queues initialized on this port + */ + inline size_t get_queue_count() const + { + return _num_queues; + } + + /*! Getter for this port's RX packet buffer memory pool + * + * \return The RX packet buffer pool + */ + inline struct rte_mempool* get_rx_pktbuf_pool() const + { + return _rx_pktbuf_pool; + } + + /*! Getter for this port's TX packet buffer memory pool + * + * \return The TX packet buffer pool + */ + inline struct rte_mempool* get_tx_pktbuf_pool() const + { + return _tx_pktbuf_pool; + } + + /*! Determine if the destination address is a broadcast address for this port + * \param dst_ipv4_addr The destination IPv4 address (in network order) + * \return whether the destination address matches this port's broadcast address + */ + inline bool dst_is_broadcast(const uint32_t dst_ipv4_addr) const + { + uint32_t network = _netmask | ((~_netmask) & dst_ipv4_addr); + return (network == 0xffffffff); + } + + /*! Allocate a DMA queue (TX/RX pair) and use the specified flow pattern + * to route packets to the RX queue. + * + * \pattern recv_pattern The flow pattern to use for directing traffic to + * the allocated RX queue. + * \return The queue ID for the allocated queue + * \throw uhd::runtime_error when there are no free queues + */ + queue_id_t alloc_queue(struct rte_flow_pattern recv_pattern[]); + + /*! Free a previously allocated queue and tear down the associated flow rule + * \param queue The ID of the queue to free + * \throw std::out_of_range when the queue ID is not currently allocated + */ + void free_queue(queue_id_t queue); + + /*! + * Process ARP request/reply + */ + // int process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr *arp_frame); + +private: + /*! + * Construct and transmit an ARP reply (for the given ARP request) + */ + int _arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req); + + port_id_t _port; + size_t _mtu; + struct rte_mempool* _rx_pktbuf_pool; + struct rte_mempool* _tx_pktbuf_pool; + struct ether_addr _mac_addr; + ipv4_addr _ipv4; + ipv4_addr _netmask; + size_t _num_queues; + std::vector<queue_id_t> _free_queues; + std::unordered_map<queue_id_t, struct rte_flow*> _flow_rules; + /* Need ARP table + * To implement ARP service, maybe create ARP xport + * Then need dpdk_udp_link and dpdk_raw_link + * + * ...Or just do it inline with dpdk_ctx + * + * And link can just save the result (do it during constructor) + * + * + * But what about the service that _responds_ to ARP requests?! + * + * Maybe have to connect a DPDK link in stages: + * First, create the ARP service and attach it to the dpdk_ctx + * dpdk_ctx must own the links...? + * Or! Always burn a DMA engine for ARP + * + * Maybe have a shared_ptr to an ARP service here? + */ + std::mutex _mutex; +}; + + +/*! + * Handles initialization of DPDK, configuration of ports, setting up DMA + * engines/queues, etc. + */ +class dpdk_ctx : uhd::noncopyable, public std::enable_shared_from_this<dpdk_ctx> +{ +public: + using sptr = std::shared_ptr<dpdk_ctx>; + + /*! Factory to generate the single dpdk_ctx instance, but with the ability + * to release the resources + * + * FIXME: Without C++17, this mechanism has a race condition between + * creating and destroying the single instance. Don't do those at the + * same time. (shared_from_this() doesn't check the weak_ptr in C++14) + */ + static dpdk_ctx::sptr get(); + + dpdk_ctx(void); + ~dpdk_ctx(void); + + /*! + * Init DPDK environment, including DPDK's EAL. + * This will make available information about the DPDK-assigned NIC devices. + * + * \param user_args User args passed in to override config files + */ + void init(const device_addr_t& user_args); + + /*! + * Get port from provided MAC address + * \param mac_addr MAC address + * \return pointer to port if match found, else nullptr + */ + dpdk_port* get_port(struct ether_addr mac_addr) const; + + /*! + * Get port structure from provided port ID + * \param port Port ID + * \return pointer to port if match found, else nullptr + */ + dpdk_port* get_port(port_id_t port) const; + + /*! + * \return Returns number of ports registered to DPDK. + */ + int get_port_count(void); + + /*! + * \param port_id NIC port ID + * \return Returns number of DMA queues available for a given port + */ + int get_port_queue_count(port_id_t portid); + + /*! + * \param portid NIC port ID + * \return Returns 0 if link is down, 1 if link is up + */ + int get_port_link_status(port_id_t portid) const; + + /*! + * Get port ID for routing packet destined for given address + * \param addr Destination address + * \return port ID from routing table + */ + int get_route(const std::string& addr) const; + + /*! + * \return whether init() has been called + */ + bool is_init_done(void) const; + +private: + /*! Convert the args to DPDK's EAL args and Initialize the EAL + * + * \param eal_args The global DPDK configuration + */ + void _eal_init(const device_addr_t& eal_args); + + /*! Either allocate or return a pointer to the RX packet buffer pool for the + * given CPU socket + * + * \param cpu_socket The CPU socket ID + * \param num_bufs Number of buffers to allocate to the pool + * \return A pointer to the memory pool + */ + struct rte_mempool* _get_rx_pktbuf_pool(unsigned int cpu_socket, size_t num_bufs); + + /*! Either allocate or return a pointer to the TX packet buffer pool for the + * given CPU socket + * + * \param cpu_socket The CPU socket ID + * \param num_bufs Number of buffers to allocate to the pool + * \return A pointer to the memory pool + */ + struct rte_mempool* _get_tx_pktbuf_pool(unsigned int cpu_socket, size_t num_bufs); + + size_t _mtu; + int _num_mbufs; + int _mbuf_cache_size; + std::mutex _init_mutex; + std::atomic<bool> _init_done; + uhd::dict<uint32_t, port_id_t> _routes; + std::unordered_map<port_id_t, dpdk_port::uptr> _ports; + std::vector<struct rte_mempool*> _rx_pktbuf_pools; + std::vector<struct rte_mempool*> _tx_pktbuf_pools; +}; + +}}} // namespace uhd::transport::dpdk + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ */ diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index 653c09f39..01cd0957c 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -23,6 +23,7 @@ if(ENABLE_DPDK) ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp ) set_source_files_properties( ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c @@ -30,6 +31,7 @@ if(ENABLE_DPDK) ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" ) include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp new file mode 100644 index 000000000..43a1507bb --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -0,0 +1,635 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/utils/prefs.hpp> +#include <arpa/inet.h> +#include <rte_arp.h> +#include <boost/algorithm/string.hpp> + +namespace uhd { namespace transport { namespace dpdk { + +namespace { +constexpr uint64_t USEC = 1000000; +constexpr size_t DEFAULT_FRAME_SIZE = 8000; +constexpr int DEFAULT_NUM_MBUFS = 1024; +constexpr int DEFAULT_MBUF_CACHE_SIZE = 315; +constexpr size_t DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP +constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; + +inline char* eal_add_opt( + std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg) +{ + char* ptr = dst; + strncpy(ptr, opt, n); + argv.push_back(ptr); + ptr += strlen(opt) + 1; + n -= ptr - dst; + strncpy(ptr, arg, n); + argv.push_back(ptr); + ptr += strlen(arg) + 1; + return ptr; +} + +inline std::string eth_addr_to_string(const struct ether_addr mac_addr) +{ + auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); + mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] + % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] + % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; + return mac_stream.str(); +} + +inline void separate_ipv4_addr( + const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) +{ + std::vector<std::string> result; + boost::algorithm::split( + result, ipv4, [](const char& in) { return in == '/'; }, boost::token_compress_on); + UHD_ASSERT_THROW(result.size() == 2); + ipv4_addr = (uint32_t)inet_addr(result[0].c_str()); + int netbits = std::atoi(result[1].c_str()); + netmask = htonl(0xffffffff << (32 - netbits)); +} +} // namespace + +dpdk_port::uptr dpdk_port::make(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address) +{ + return std::make_unique<dpdk_port>( + port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); +} + +dpdk_port::dpdk_port(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address) + : _port(port) + , _mtu(mtu) + , _rx_pktbuf_pool(rx_pktbuf_pool) + , _tx_pktbuf_pool(tx_pktbuf_pool) +{ + /* 1. Set MTU and IPv4 address */ + int retval; + + retval = rte_eth_dev_set_mtu(_port, _mtu); + if (retval) { + uint16_t actual_mtu; + UHD_LOGGER_WARNING("DPDK") + << boost::format("Port %d: Could not set mtu to %d") % _port % _mtu; + rte_eth_dev_get_mtu(_port, &actual_mtu); + UHD_LOGGER_WARNING("DPDK") + << boost::format("Port %d: Current mtu=%d") % _port % _mtu; + _mtu = actual_mtu; + } + + separate_ipv4_addr(ipv4_address, _ipv4, _netmask); + + /* 2. Set hardware offloads */ + struct rte_eth_dev_info dev_info; + rte_eth_dev_info_get(_port, &dev_info); + uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; + uint64_t tx_offloads = DEV_TX_OFFLOAD_IPV4_CKSUM; + if ((dev_info.rx_offload_capa & rx_offloads) != rx_offloads) { + UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports RX offloads 0x%0llx") + % _port % dev_info.rx_offload_capa; + throw uhd::runtime_error("DPDK: Missing required RX offloads"); + } + if ((dev_info.tx_offload_capa & tx_offloads) != tx_offloads) { + UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports TX offloads 0x%0llx") + % _port % dev_info.tx_offload_capa; + throw uhd::runtime_error("DPDK: Missing required TX offloads"); + } + + // Check number of available queues + if (dev_info.max_rx_queues < num_queues || dev_info.max_tx_queues < num_queues) { + _num_queues = std::min(dev_info.max_rx_queues, dev_info.max_tx_queues); + UHD_LOGGER_WARNING("DPDK") + << boost::format("%d: Maximum queues supported is %d") % _port % _num_queues; + } else { + _num_queues = num_queues; + } + + struct rte_eth_conf port_conf = {}; + port_conf.rxmode.offloads = rx_offloads | DEV_RX_OFFLOAD_JUMBO_FRAME; + port_conf.rxmode.max_rx_pkt_len = _mtu; + port_conf.txmode.offloads = tx_offloads; + + retval = rte_eth_dev_configure(_port, _num_queues, _num_queues, &port_conf); + if (retval != 0) { + UHD_LOG_ERROR("DPDK", "Failed to configure the device"); + throw uhd::runtime_error("DPDK: Failed to configure the device"); + } + + /* 3. Set descriptor ring sizes */ + uint16_t rx_desc = num_mbufs; + if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc + || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") + % _port % num_mbufs % dev_info.rx_desc_lim.nb_min + % dev_info.rx_desc_lim.nb_max; + UHD_LOGGER_ERROR("DPDK") + << boost::format("Num RX descriptors must also be aligned to 0x%x") + % dev_info.rx_desc_lim.nb_align; + throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); + } + + uint16_t tx_desc = num_mbufs; + if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc + || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") + % _port % num_mbufs % dev_info.tx_desc_lim.nb_min + % dev_info.tx_desc_lim.nb_max; + UHD_LOGGER_ERROR("DPDK") + << boost::format("Num TX descriptors must also be aligned to 0x%x") + % dev_info.tx_desc_lim.nb_align; + throw uhd::runtime_error("DPDK: Failed to allocate TX descriptors"); + } + + retval = rte_eth_dev_adjust_nb_rx_tx_desc(_port, &rx_desc, &tx_desc); + if (retval != 0) { + UHD_LOG_ERROR("DPDK", "Failed to configure the DMA queues "); + throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); + } + + /* 4. Set up the RX and TX DMA queues (May not be generally supported after + * eth_dev_start) */ + unsigned int cpu_socket = rte_eth_dev_socket_id(_port); + for (uint16_t i = 0; i < _num_queues; i++) { + retval = + rte_eth_rx_queue_setup(_port, i, rx_desc, cpu_socket, NULL, _rx_pktbuf_pool); + if (retval < 0) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Port %d: Could not init RX queue %d") % _port % i; + throw uhd::runtime_error("DPDK: Failure to init RX queue"); + } + + struct rte_eth_txconf txconf = dev_info.default_txconf; + txconf.offloads = DEV_TX_OFFLOAD_IPV4_CKSUM; + retval = rte_eth_tx_queue_setup(_port, i, tx_desc, cpu_socket, &txconf); + if (retval < 0) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Port %d: Could not init TX queue %d") % _port % i; + throw uhd::runtime_error("DPDK: Failure to init TX queue"); + } + } + + /* 5. Set up initial flow table */ + + /* Append all free queues except 0, which is reserved for ARP */ + _free_queues.reserve(_num_queues - 1); + for (unsigned int i = 1; i < _num_queues; i++) { + _free_queues.push_back(i); + } + + // struct rte_flow_attr flow_attr; + // flow_attr.group = 0; + // flow_attr.priority = 1; + // flow_attr.ingress = 1; + // flow_attr.egress = 0; + // flow_attr.transfer = 0; + // flow_attr.reserved = 0; + + // struct rte_flow_item[] flow_pattern = { + //}; + // int rte_flow_validate(uint16_t port_id, + // const struct rte_flow_attr *attr, + // const struct rte_flow_item pattern[], + // const struct rte_flow_action actions[], + // struct rte_flow_error *error); + // struct rte_flow * rte_flow_create(uint16_t port_id, + // const struct rte_flow_attr *attr, + // const struct rte_flow_item pattern[], + // const struct rte_flow_action *actions[], + // struct rte_flow_error *error); + + /* 6. Start the Ethernet device */ + retval = rte_eth_dev_start(_port); + if (retval < 0) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Port %d: Could not start device") % _port; + throw uhd::runtime_error("DPDK: Failure to start device"); + } + + /* Grab and display the port MAC address. */ + rte_eth_macaddr_get(_port, &_mac_addr); + UHD_LOGGER_TRACE("DPDK") << "Port " << _port + << " MAC: " << eth_addr_to_string(_mac_addr); +} + +/* TODO: Do flow directions */ +queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +{ + std::lock_guard<std::mutex> lock(_mutex); + UHD_ASSERT_THROW(_free_queues.size() != 0); + auto queue = _free_queues.back(); + _free_queues.pop_back(); + return queue; +} + +void dpdk_port::free_queue(queue_id_t queue) +{ + std::lock_guard<std::mutex> lock(_mutex); + auto flow = _flow_rules.at(queue); + int status = rte_flow_destroy(_port, flow, NULL); + if (status) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port + % queue; + throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); + } else { + _flow_rules.erase(queue); + } + _free_queues.push_back(queue); +} + +int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +{ + struct rte_mbuf* mbuf; + struct ether_hdr* hdr; + struct arp_hdr* arp_frame; + + mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); + if (unlikely(mbuf == NULL)) { + UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + arp_frame = (struct arp_hdr*)&hdr[1]; + + ether_addr_copy(&arp_req->arp_data.arp_sha, &hdr->d_addr); + ether_addr_copy(&_mac_addr, &hdr->s_addr); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY); + ether_addr_copy(&_mac_addr, &arp_frame->arp_data.arp_sha); + arp_frame->arp_data.arp_sip = _ipv4; + ether_addr_copy(&hdr->d_addr, &arp_frame->arp_data.arp_tha); + arp_frame->arp_data.arp_tip = arp_req->arp_data.arp_sip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + + // ARP replies always on queue 0 + if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { + UHD_LOGGER_WARNING("DPDK") + << boost::format("%s: TX descriptor ring is full") % __func__; + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + +// TODO: ARP processing for queue 0 +// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr +// *arp_frame) +//{ +// std::lock_guard<std::mutex> lock(_mutex); +// uint32_t dest_ip = arp_frame->arp_data.arp_sip; +// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; +// +// /* Add entry to ARP table */ +// struct uhd_dpdk_arp_entry *entry = NULL; +// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); +// if (!entry) { +// entry = rte_zmalloc(NULL, sizeof(*entry), 0); +// if (!entry) { +// return -ENOMEM; +// } +// LIST_INIT(&entry->pending_list); +// ether_addr_copy(&dest_addr, &entry->mac_addr); +// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { +// rte_free(entry); +// return -ENOSPC; +// } +// } else { +// struct uhd_dpdk_config_req *req = NULL; +// ether_addr_copy(&dest_addr, &entry->mac_addr); +// /* Now wake any config reqs waiting for the ARP */ +// LIST_FOREACH(req, &entry->pending_list, entry) { +// _uhd_dpdk_config_req_compl(req, 0); +// } +// while (entry->pending_list.lh_first != NULL) { +// LIST_REMOVE(entry->pending_list.lh_first, entry); +// } +// } +// /* Respond if this was an ARP request */ +// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && +// arp_frame->arp_data.arp_tip == port->ipv4_addr) { +// _arp_reply(tx_pktbuf_pool, arp_frame); +// } +// +// return 0; +// +//} + +static dpdk_ctx* global_ctx = nullptr; +static std::mutex global_ctx_mutex; + +dpdk_ctx::sptr dpdk_ctx::get() +{ + std::lock_guard<std::mutex> lock(global_ctx_mutex); + if (!global_ctx) { + auto new_ctx = std::make_shared<dpdk_ctx>(); + global_ctx = new_ctx.get(); + return new_ctx; + } + return global_ctx->shared_from_this(); +} + +dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} + +dpdk_ctx::~dpdk_ctx(void) +{ + std::lock_guard<std::mutex> lock(global_ctx_mutex); + global_ctx = nullptr; + // Stop all the ports + for (auto& port : _ports) { + rte_eth_dev_stop(port.first); + } + // Free mempools + for (auto& pool : _rx_pktbuf_pools) { + rte_mempool_free(pool); + } + for (auto& pool : _tx_pktbuf_pools) { + rte_mempool_free(pool); + } + // Free EAL resources + rte_eal_cleanup(); +} + +void dpdk_ctx::_eal_init(const device_addr_t& eal_args) +{ + /* Build up argc and argv */ + std::vector<const char*> argv; + argv.push_back("uhd::transport::dpdk"); + auto args = new std::array<char, 4096>(); + char* opt = args->data(); + char* end = args->data() + args->size(); + for (std::string& key : eal_args.keys()) { + std::string val = eal_args[key]; + if (key == "dpdk_coremask") { + opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str()); + } else if (key == "dpdk_corelist") { + /* NOTE: This arg may have commas, so limited to config file */ + opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str()); + } else if (key == "dpdk_coremap") { + opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str()); + } else if (key == "dpdk_master_lcore") { + opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str()); + } else if (key == "dpdk_pci_blacklist") { + opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str()); + } else if (key == "dpdk_pci_whitelist") { + opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str()); + } else if (key == "dpdk_log_level") { + opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str()); + } else if (key == "dpdk_huge_dir") { + opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str()); + } else if (key == "dpdk_file_prefix") { + opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str()); + } else if (key == "dpdk_driver") { + opt = eal_add_opt(argv, end - opt, opt, "-d", val.c_str()); + } + /* TODO: Change where log goes? + int rte_openlog_stream( FILE * f) + */ + } + /* Init DPDK's EAL */ + int ret = rte_eal_init(argv.size(), (char**)argv.data()); + /* Done with the temporaries */ + delete args; + + if (ret < 0) { + UHD_LOG_ERROR("DPDK", "Error with EAL initialization"); + throw uhd::runtime_error("Error with EAL initialization"); + } + + /* Create pktbuf pool entries, but only allocate on use */ + int socket_count = rte_socket_count(); + for (int i = 0; i < socket_count; i++) { + _rx_pktbuf_pools.push_back(NULL); + _tx_pktbuf_pools.push_back(NULL); + } +} + +/** + * Init DPDK environment, including DPDK's EAL. + * This will make available information about the DPDK-assigned NIC devices. + * + * \param user_args User args passed in to override config files + */ +void dpdk_ctx::init(const device_addr_t& user_args) +{ + unsigned int i; + std::lock_guard<std::mutex> lock(_init_mutex); + if (!_init_done) { + /* Gather global config, build args for EAL, and init UHD-DPDK */ + const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args); + UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl << dpdk_args.to_pp_string()); + _eal_init(dpdk_args); + + /* TODO: Should MTU be defined per-port? */ + _mtu = dpdk_args.cast<size_t>("dpdk_mtu", DEFAULT_FRAME_SIZE); + /* This is per queue */ + _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); + _mbuf_cache_size = + dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); + + /* Get device info for all the NIC ports */ + int num_dpdk_ports = rte_eth_dev_count_avail(); + UHD_ASSERT_THROW(num_dpdk_ports > 0); + device_addrs_t nics(num_dpdk_ports); + RTE_ETH_FOREACH_DEV(i) + { + struct ether_addr mac_addr; + rte_eth_macaddr_get(i, &mac_addr); + nics[i]["dpdk_mac"] = eth_addr_to_string(mac_addr); + } + + /* Get user configuration for each NIC port */ + device_addrs_t args = separate_device_addr(user_args); + size_t queue_count = 0; + RTE_ETH_FOREACH_DEV(i) + { + auto& nic = nics.at(i); + for (const auto& arg : args) { + /* Match DPDK-discovered NICs and user config via MAC addr */ + if (arg.has_key("dpdk_mac") && nic["dpdk_mac"] == arg["dpdk_mac"]) { + /* Copy user args for discovered NICs */ + nic.update(arg, false); + break; + } + } + /* Now combine user args with conf file */ + auto conf = uhd::prefs::get_dpdk_nic_args(nic); + + /* Update config, and remove ports that aren't fully configured */ + if (conf.has_key("dpdk_ipv4")) { + nics[i] = conf; + /* Update queue count, to generate a large enough mempool */ + queue_count += conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()); + } else { + nics[i] = device_addr_t(); + } + } + + RTE_ETH_FOREACH_DEV(i) + { + auto& conf = nics.at(i); + if (conf.has_key("dpdk_ipv4")) { + // Allocating enough buffers for all DMA queues for each CPU socket + // - This is a bit inefficient for larger systems, since NICs may not + // all be on one socket + auto cpu_socket = rte_eth_dev_socket_id(i); + auto rx_pool = _get_rx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count); + auto tx_pool = _get_tx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count); + UHD_LOG_TRACE("DPDK", + "Initializing NIC(" << i << "):" << std::endl + << conf.to_pp_string()); + _ports[i] = dpdk_port::make(i, + _mtu, + conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), + _num_mbufs, + rx_pool, + tx_pool, + conf["dpdk_ipv4"]); + } + } + + UHD_LOG_TRACE("DPDK", "Waiting for links to come up..."); + rte_delay_ms(1000); + for (auto& port : _ports) { + struct rte_eth_link link; + auto portid = port.second->get_port_id(); + rte_eth_link_get(portid, &link); + unsigned int link_status = link.link_status; + unsigned int link_speed = link.link_speed; + UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") + % portid % link_status % link_speed; + } + UHD_LOG_TRACE("DPDK", "Init DONE!"); + + _init_done = true; + } +} + +dpdk_port* dpdk_ctx::get_port(port_id_t port) const +{ + assert(is_init_done()); + if (_ports.count(port) == 0) { + return nullptr; + } + return _ports.at(port).get(); +} + +dpdk_port* dpdk_ctx::get_port(struct ether_addr mac_addr) const +{ + assert(is_init_done()); + for (const auto& port : _ports) { + struct ether_addr port_mac_addr; + rte_eth_macaddr_get(port.first, &port_mac_addr); + for (int j = 0; j < 6; j++) { + if (mac_addr.addr_bytes[j] != port_mac_addr.addr_bytes[j]) { + break; + } + if (j == 5) { + return port.second.get(); + } + } + } + return nullptr; +} + +int dpdk_ctx::get_port_count(void) +{ + assert(is_init_done()); + return _ports.size(); +} + +int dpdk_ctx::get_port_queue_count(port_id_t portid) +{ + assert(is_init_done()); + return _ports.at(portid)->get_queue_count(); +} + +int dpdk_ctx::get_port_link_status(port_id_t portid) const +{ + struct rte_eth_link link; + rte_eth_link_get_nowait(portid, &link); + return link.link_status; +} + +int dpdk_ctx::get_route(const std::string& addr) const +{ + const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); + for (const auto& port : _ports) { + if (get_port_link_status(port.first) < 1) + continue; + uint32_t src_ipv4 = port.second->get_ipv4(); + uint32_t netmask = port.second->get_netmask(); + if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { + return (int)port.first; + } + } + return -ENODEV; +} + + +bool dpdk_ctx::is_init_done(void) const +{ + return _init_done.load(); +} + +struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( + unsigned int cpu_socket, size_t num_bufs) +{ + if (!_rx_pktbuf_pools.at(cpu_socket)) { + const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; + char name[32]; + snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); + _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( + name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + if (!_rx_pktbuf_pools.at(cpu_socket)) { + UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); + throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); + } + } + return _rx_pktbuf_pools.at(cpu_socket); +} + +struct rte_mempool* dpdk_ctx::_get_tx_pktbuf_pool( + unsigned int cpu_socket, size_t num_bufs) +{ + if (!_tx_pktbuf_pools.at(cpu_socket)) { + const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; + char name[32]; + snprintf(name, sizeof(name), "tx_mbuf_pool_%u", cpu_socket); + _tx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( + name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + if (!_tx_pktbuf_pools.at(cpu_socket)) { + UHD_LOG_ERROR("DPDK", "Could not allocate TX pktbuf pool"); + throw uhd::runtime_error("DPDK: Could not allocate TX pktbuf pool"); + } + } + return _tx_pktbuf_pools.at(cpu_socket); +} + +}}} // namespace uhd::transport::dpdk |