diff options
-rw-r--r-- | host/lib/include/uhdlib/transport/dpdk/common.hpp | 319 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_common.cpp | 635 |
3 files changed, 956 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk/common.hpp b/host/lib/include/uhdlib/transport/dpdk/common.hpp new file mode 100644 index 000000000..1f4466a0f --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/common.hpp @@ -0,0 +1,319 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ + +#include <uhd/config.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhd/utils/noncopyable.hpp> +#include <uhd/utils/static.hpp> +#include <rte_ethdev.h> +#include <rte_ether.h> +#include <rte_flow.h> +#include <rte_mbuf.h> +#include <rte_mempool.h> +#include <rte_version.h> +#include <unordered_map> +#include <array> +#include <atomic> +#include <mutex> +#include <string> + +/* NOTE: There are changes to rte_eth_addr in 19.x */ + +namespace uhd { namespace transport { namespace dpdk { + +using queue_id_t = uint16_t; +using port_id_t = uint16_t; +using ipv4_addr = uint32_t; + +/*! + * Class representing a DPDK NIC port + * + * The dpdk_port object possesses all the data needed to send and receive + * packets between this port and a remote host. A logical link should specify + * which packets are destined for it and allocate a DMA queue with the + * dpdk_port::alloc_queue() function. A logical link should not, however, + * specify ARP packets for its set of received packets. That functionality is + * reserved for the special queue 0. + * + * The logical link can then get the packet buffer pools associated with this + * NIC port and use them to send and receive packets. + * + * dpdk_port objects are _only_ created by the dpdk_ctx. + */ +class dpdk_port +{ +public: + using uptr = std::unique_ptr<dpdk_port>; + + /*! Construct a DPDK NIC port and bring the link up + * + * \param port The port ID + * \param mtu The intended MTU for the port + * \param num_queues Number of DMA queues to reserve for this port + * \param num_mbufs The number of packet buffers per queue + * \param rx_pktbuf_pool A pointer to the port's RX packet buffer pool + * \param tx_pktbuf_pool A pointer to the port's TX packet buffer pool + * \param ipv4_address The IPv4 network address (w/ netmask) + * \return A unique_ptr to a dpdk_port object + */ + static dpdk_port::uptr make(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address); + + dpdk_port(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address); + + /*! Getter for this port's ID + * \return this port's ID + */ + inline port_id_t get_port_id() const + { + return _port; + } + + /*! Getter for this port's MTU + * \return this port's MTU + */ + inline size_t get_mtu() const + { + return _mtu; + } + + /*! Getter for this port's IPv4 address + * \return this port's IPv4 address (in network order) + */ + inline ipv4_addr get_ipv4() const + { + return _ipv4; + } + + /*! Getter for this port's subnet mask + * \return this port's subnet mask (in network order) + */ + inline ipv4_addr get_netmask() const + { + return _netmask; + } + + /*! Getter for this port's total DMA queue count, including initialized, + * but unallocated queues + * + * \return The number of queues initialized on this port + */ + inline size_t get_queue_count() const + { + return _num_queues; + } + + /*! Getter for this port's RX packet buffer memory pool + * + * \return The RX packet buffer pool + */ + inline struct rte_mempool* get_rx_pktbuf_pool() const + { + return _rx_pktbuf_pool; + } + + /*! Getter for this port's TX packet buffer memory pool + * + * \return The TX packet buffer pool + */ + inline struct rte_mempool* get_tx_pktbuf_pool() const + { + return _tx_pktbuf_pool; + } + + /*! Determine if the destination address is a broadcast address for this port + * \param dst_ipv4_addr The destination IPv4 address (in network order) + * \return whether the destination address matches this port's broadcast address + */ + inline bool dst_is_broadcast(const uint32_t dst_ipv4_addr) const + { + uint32_t network = _netmask | ((~_netmask) & dst_ipv4_addr); + return (network == 0xffffffff); + } + + /*! Allocate a DMA queue (TX/RX pair) and use the specified flow pattern + * to route packets to the RX queue. + * + * \pattern recv_pattern The flow pattern to use for directing traffic to + * the allocated RX queue. + * \return The queue ID for the allocated queue + * \throw uhd::runtime_error when there are no free queues + */ + queue_id_t alloc_queue(struct rte_flow_pattern recv_pattern[]); + + /*! Free a previously allocated queue and tear down the associated flow rule + * \param queue The ID of the queue to free + * \throw std::out_of_range when the queue ID is not currently allocated + */ + void free_queue(queue_id_t queue); + + /*! + * Process ARP request/reply + */ + // int process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr *arp_frame); + +private: + /*! + * Construct and transmit an ARP reply (for the given ARP request) + */ + int _arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req); + + port_id_t _port; + size_t _mtu; + struct rte_mempool* _rx_pktbuf_pool; + struct rte_mempool* _tx_pktbuf_pool; + struct ether_addr _mac_addr; + ipv4_addr _ipv4; + ipv4_addr _netmask; + size_t _num_queues; + std::vector<queue_id_t> _free_queues; + std::unordered_map<queue_id_t, struct rte_flow*> _flow_rules; + /* Need ARP table + * To implement ARP service, maybe create ARP xport + * Then need dpdk_udp_link and dpdk_raw_link + * + * ...Or just do it inline with dpdk_ctx + * + * And link can just save the result (do it during constructor) + * + * + * But what about the service that _responds_ to ARP requests?! + * + * Maybe have to connect a DPDK link in stages: + * First, create the ARP service and attach it to the dpdk_ctx + * dpdk_ctx must own the links...? + * Or! Always burn a DMA engine for ARP + * + * Maybe have a shared_ptr to an ARP service here? + */ + std::mutex _mutex; +}; + + +/*! + * Handles initialization of DPDK, configuration of ports, setting up DMA + * engines/queues, etc. + */ +class dpdk_ctx : uhd::noncopyable, public std::enable_shared_from_this<dpdk_ctx> +{ +public: + using sptr = std::shared_ptr<dpdk_ctx>; + + /*! Factory to generate the single dpdk_ctx instance, but with the ability + * to release the resources + * + * FIXME: Without C++17, this mechanism has a race condition between + * creating and destroying the single instance. Don't do those at the + * same time. (shared_from_this() doesn't check the weak_ptr in C++14) + */ + static dpdk_ctx::sptr get(); + + dpdk_ctx(void); + ~dpdk_ctx(void); + + /*! + * Init DPDK environment, including DPDK's EAL. + * This will make available information about the DPDK-assigned NIC devices. + * + * \param user_args User args passed in to override config files + */ + void init(const device_addr_t& user_args); + + /*! + * Get port from provided MAC address + * \param mac_addr MAC address + * \return pointer to port if match found, else nullptr + */ + dpdk_port* get_port(struct ether_addr mac_addr) const; + + /*! + * Get port structure from provided port ID + * \param port Port ID + * \return pointer to port if match found, else nullptr + */ + dpdk_port* get_port(port_id_t port) const; + + /*! + * \return Returns number of ports registered to DPDK. + */ + int get_port_count(void); + + /*! + * \param port_id NIC port ID + * \return Returns number of DMA queues available for a given port + */ + int get_port_queue_count(port_id_t portid); + + /*! + * \param portid NIC port ID + * \return Returns 0 if link is down, 1 if link is up + */ + int get_port_link_status(port_id_t portid) const; + + /*! + * Get port ID for routing packet destined for given address + * \param addr Destination address + * \return port ID from routing table + */ + int get_route(const std::string& addr) const; + + /*! + * \return whether init() has been called + */ + bool is_init_done(void) const; + +private: + /*! Convert the args to DPDK's EAL args and Initialize the EAL + * + * \param eal_args The global DPDK configuration + */ + void _eal_init(const device_addr_t& eal_args); + + /*! Either allocate or return a pointer to the RX packet buffer pool for the + * given CPU socket + * + * \param cpu_socket The CPU socket ID + * \param num_bufs Number of buffers to allocate to the pool + * \return A pointer to the memory pool + */ + struct rte_mempool* _get_rx_pktbuf_pool(unsigned int cpu_socket, size_t num_bufs); + + /*! Either allocate or return a pointer to the TX packet buffer pool for the + * given CPU socket + * + * \param cpu_socket The CPU socket ID + * \param num_bufs Number of buffers to allocate to the pool + * \return A pointer to the memory pool + */ + struct rte_mempool* _get_tx_pktbuf_pool(unsigned int cpu_socket, size_t num_bufs); + + size_t _mtu; + int _num_mbufs; + int _mbuf_cache_size; + std::mutex _init_mutex; + std::atomic<bool> _init_done; + uhd::dict<uint32_t, port_id_t> _routes; + std::unordered_map<port_id_t, dpdk_port::uptr> _ports; + std::vector<struct rte_mempool*> _rx_pktbuf_pools; + std::vector<struct rte_mempool*> _tx_pktbuf_pools; +}; + +}}} // namespace uhd::transport::dpdk + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ */ diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index 653c09f39..01cd0957c 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -23,6 +23,7 @@ if(ENABLE_DPDK) ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp ) set_source_files_properties( ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c @@ -30,6 +31,7 @@ if(ENABLE_DPDK) ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" ) include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp new file mode 100644 index 000000000..43a1507bb --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -0,0 +1,635 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/utils/prefs.hpp> +#include <arpa/inet.h> +#include <rte_arp.h> +#include <boost/algorithm/string.hpp> + +namespace uhd { namespace transport { namespace dpdk { + +namespace { +constexpr uint64_t USEC = 1000000; +constexpr size_t DEFAULT_FRAME_SIZE = 8000; +constexpr int DEFAULT_NUM_MBUFS = 1024; +constexpr int DEFAULT_MBUF_CACHE_SIZE = 315; +constexpr size_t DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP +constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; + +inline char* eal_add_opt( + std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg) +{ + char* ptr = dst; + strncpy(ptr, opt, n); + argv.push_back(ptr); + ptr += strlen(opt) + 1; + n -= ptr - dst; + strncpy(ptr, arg, n); + argv.push_back(ptr); + ptr += strlen(arg) + 1; + return ptr; +} + +inline std::string eth_addr_to_string(const struct ether_addr mac_addr) +{ + auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); + mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] + % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] + % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; + return mac_stream.str(); +} + +inline void separate_ipv4_addr( + const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) +{ + std::vector<std::string> result; + boost::algorithm::split( + result, ipv4, [](const char& in) { return in == '/'; }, boost::token_compress_on); + UHD_ASSERT_THROW(result.size() == 2); + ipv4_addr = (uint32_t)inet_addr(result[0].c_str()); + int netbits = std::atoi(result[1].c_str()); + netmask = htonl(0xffffffff << (32 - netbits)); +} +} // namespace + +dpdk_port::uptr dpdk_port::make(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address) +{ + return std::make_unique<dpdk_port>( + port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); +} + +dpdk_port::dpdk_port(port_id_t port, + size_t mtu, + uint16_t num_queues, + size_t num_mbufs, + struct rte_mempool* rx_pktbuf_pool, + struct rte_mempool* tx_pktbuf_pool, + std::string ipv4_address) + : _port(port) + , _mtu(mtu) + , _rx_pktbuf_pool(rx_pktbuf_pool) + , _tx_pktbuf_pool(tx_pktbuf_pool) +{ + /* 1. Set MTU and IPv4 address */ + int retval; + + retval = rte_eth_dev_set_mtu(_port, _mtu); + if (retval) { + uint16_t actual_mtu; + UHD_LOGGER_WARNING("DPDK") + << boost::format("Port %d: Could not set mtu to %d") % _port % _mtu; + rte_eth_dev_get_mtu(_port, &actual_mtu); + UHD_LOGGER_WARNING("DPDK") + << boost::format("Port %d: Current mtu=%d") % _port % _mtu; + _mtu = actual_mtu; + } + + separate_ipv4_addr(ipv4_address, _ipv4, _netmask); + + /* 2. Set hardware offloads */ + struct rte_eth_dev_info dev_info; + rte_eth_dev_info_get(_port, &dev_info); + uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; + uint64_t tx_offloads = DEV_TX_OFFLOAD_IPV4_CKSUM; + if ((dev_info.rx_offload_capa & rx_offloads) != rx_offloads) { + UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports RX offloads 0x%0llx") + % _port % dev_info.rx_offload_capa; + throw uhd::runtime_error("DPDK: Missing required RX offloads"); + } + if ((dev_info.tx_offload_capa & tx_offloads) != tx_offloads) { + UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports TX offloads 0x%0llx") + % _port % dev_info.tx_offload_capa; + throw uhd::runtime_error("DPDK: Missing required TX offloads"); + } + + // Check number of available queues + if (dev_info.max_rx_queues < num_queues || dev_info.max_tx_queues < num_queues) { + _num_queues = std::min(dev_info.max_rx_queues, dev_info.max_tx_queues); + UHD_LOGGER_WARNING("DPDK") + << boost::format("%d: Maximum queues supported is %d") % _port % _num_queues; + } else { + _num_queues = num_queues; + } + + struct rte_eth_conf port_conf = {}; + port_conf.rxmode.offloads = rx_offloads | DEV_RX_OFFLOAD_JUMBO_FRAME; + port_conf.rxmode.max_rx_pkt_len = _mtu; + port_conf.txmode.offloads = tx_offloads; + + retval = rte_eth_dev_configure(_port, _num_queues, _num_queues, &port_conf); + if (retval != 0) { + UHD_LOG_ERROR("DPDK", "Failed to configure the device"); + throw uhd::runtime_error("DPDK: Failed to configure the device"); + } + + /* 3. Set descriptor ring sizes */ + uint16_t rx_desc = num_mbufs; + if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc + || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") + % _port % num_mbufs % dev_info.rx_desc_lim.nb_min + % dev_info.rx_desc_lim.nb_max; + UHD_LOGGER_ERROR("DPDK") + << boost::format("Num RX descriptors must also be aligned to 0x%x") + % dev_info.rx_desc_lim.nb_align; + throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); + } + + uint16_t tx_desc = num_mbufs; + if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc + || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") + % _port % num_mbufs % dev_info.tx_desc_lim.nb_min + % dev_info.tx_desc_lim.nb_max; + UHD_LOGGER_ERROR("DPDK") + << boost::format("Num TX descriptors must also be aligned to 0x%x") + % dev_info.tx_desc_lim.nb_align; + throw uhd::runtime_error("DPDK: Failed to allocate TX descriptors"); + } + + retval = rte_eth_dev_adjust_nb_rx_tx_desc(_port, &rx_desc, &tx_desc); + if (retval != 0) { + UHD_LOG_ERROR("DPDK", "Failed to configure the DMA queues "); + throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); + } + + /* 4. Set up the RX and TX DMA queues (May not be generally supported after + * eth_dev_start) */ + unsigned int cpu_socket = rte_eth_dev_socket_id(_port); + for (uint16_t i = 0; i < _num_queues; i++) { + retval = + rte_eth_rx_queue_setup(_port, i, rx_desc, cpu_socket, NULL, _rx_pktbuf_pool); + if (retval < 0) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Port %d: Could not init RX queue %d") % _port % i; + throw uhd::runtime_error("DPDK: Failure to init RX queue"); + } + + struct rte_eth_txconf txconf = dev_info.default_txconf; + txconf.offloads = DEV_TX_OFFLOAD_IPV4_CKSUM; + retval = rte_eth_tx_queue_setup(_port, i, tx_desc, cpu_socket, &txconf); + if (retval < 0) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Port %d: Could not init TX queue %d") % _port % i; + throw uhd::runtime_error("DPDK: Failure to init TX queue"); + } + } + + /* 5. Set up initial flow table */ + + /* Append all free queues except 0, which is reserved for ARP */ + _free_queues.reserve(_num_queues - 1); + for (unsigned int i = 1; i < _num_queues; i++) { + _free_queues.push_back(i); + } + + // struct rte_flow_attr flow_attr; + // flow_attr.group = 0; + // flow_attr.priority = 1; + // flow_attr.ingress = 1; + // flow_attr.egress = 0; + // flow_attr.transfer = 0; + // flow_attr.reserved = 0; + + // struct rte_flow_item[] flow_pattern = { + //}; + // int rte_flow_validate(uint16_t port_id, + // const struct rte_flow_attr *attr, + // const struct rte_flow_item pattern[], + // const struct rte_flow_action actions[], + // struct rte_flow_error *error); + // struct rte_flow * rte_flow_create(uint16_t port_id, + // const struct rte_flow_attr *attr, + // const struct rte_flow_item pattern[], + // const struct rte_flow_action *actions[], + // struct rte_flow_error *error); + + /* 6. Start the Ethernet device */ + retval = rte_eth_dev_start(_port); + if (retval < 0) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Port %d: Could not start device") % _port; + throw uhd::runtime_error("DPDK: Failure to start device"); + } + + /* Grab and display the port MAC address. */ + rte_eth_macaddr_get(_port, &_mac_addr); + UHD_LOGGER_TRACE("DPDK") << "Port " << _port + << " MAC: " << eth_addr_to_string(_mac_addr); +} + +/* TODO: Do flow directions */ +queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +{ + std::lock_guard<std::mutex> lock(_mutex); + UHD_ASSERT_THROW(_free_queues.size() != 0); + auto queue = _free_queues.back(); + _free_queues.pop_back(); + return queue; +} + +void dpdk_port::free_queue(queue_id_t queue) +{ + std::lock_guard<std::mutex> lock(_mutex); + auto flow = _flow_rules.at(queue); + int status = rte_flow_destroy(_port, flow, NULL); + if (status) { + UHD_LOGGER_ERROR("DPDK") + << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port + % queue; + throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); + } else { + _flow_rules.erase(queue); + } + _free_queues.push_back(queue); +} + +int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +{ + struct rte_mbuf* mbuf; + struct ether_hdr* hdr; + struct arp_hdr* arp_frame; + + mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); + if (unlikely(mbuf == NULL)) { + UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + arp_frame = (struct arp_hdr*)&hdr[1]; + + ether_addr_copy(&arp_req->arp_data.arp_sha, &hdr->d_addr); + ether_addr_copy(&_mac_addr, &hdr->s_addr); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY); + ether_addr_copy(&_mac_addr, &arp_frame->arp_data.arp_sha); + arp_frame->arp_data.arp_sip = _ipv4; + ether_addr_copy(&hdr->d_addr, &arp_frame->arp_data.arp_tha); + arp_frame->arp_data.arp_tip = arp_req->arp_data.arp_sip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + + // ARP replies always on queue 0 + if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { + UHD_LOGGER_WARNING("DPDK") + << boost::format("%s: TX descriptor ring is full") % __func__; + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + +// TODO: ARP processing for queue 0 +// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr +// *arp_frame) +//{ +// std::lock_guard<std::mutex> lock(_mutex); +// uint32_t dest_ip = arp_frame->arp_data.arp_sip; +// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; +// +// /* Add entry to ARP table */ +// struct uhd_dpdk_arp_entry *entry = NULL; +// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); +// if (!entry) { +// entry = rte_zmalloc(NULL, sizeof(*entry), 0); +// if (!entry) { +// return -ENOMEM; +// } +// LIST_INIT(&entry->pending_list); +// ether_addr_copy(&dest_addr, &entry->mac_addr); +// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { +// rte_free(entry); +// return -ENOSPC; +// } +// } else { +// struct uhd_dpdk_config_req *req = NULL; +// ether_addr_copy(&dest_addr, &entry->mac_addr); +// /* Now wake any config reqs waiting for the ARP */ +// LIST_FOREACH(req, &entry->pending_list, entry) { +// _uhd_dpdk_config_req_compl(req, 0); +// } +// while (entry->pending_list.lh_first != NULL) { +// LIST_REMOVE(entry->pending_list.lh_first, entry); +// } +// } +// /* Respond if this was an ARP request */ +// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && +// arp_frame->arp_data.arp_tip == port->ipv4_addr) { +// _arp_reply(tx_pktbuf_pool, arp_frame); +// } +// +// return 0; +// +//} + +static dpdk_ctx* global_ctx = nullptr; +static std::mutex global_ctx_mutex; + +dpdk_ctx::sptr dpdk_ctx::get() +{ + std::lock_guard<std::mutex> lock(global_ctx_mutex); + if (!global_ctx) { + auto new_ctx = std::make_shared<dpdk_ctx>(); + global_ctx = new_ctx.get(); + return new_ctx; + } + return global_ctx->shared_from_this(); +} + +dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} + +dpdk_ctx::~dpdk_ctx(void) +{ + std::lock_guard<std::mutex> lock(global_ctx_mutex); + global_ctx = nullptr; + // Stop all the ports + for (auto& port : _ports) { + rte_eth_dev_stop(port.first); + } + // Free mempools + for (auto& pool : _rx_pktbuf_pools) { + rte_mempool_free(pool); + } + for (auto& pool : _tx_pktbuf_pools) { + rte_mempool_free(pool); + } + // Free EAL resources + rte_eal_cleanup(); +} + +void dpdk_ctx::_eal_init(const device_addr_t& eal_args) +{ + /* Build up argc and argv */ + std::vector<const char*> argv; + argv.push_back("uhd::transport::dpdk"); + auto args = new std::array<char, 4096>(); + char* opt = args->data(); + char* end = args->data() + args->size(); + for (std::string& key : eal_args.keys()) { + std::string val = eal_args[key]; + if (key == "dpdk_coremask") { + opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str()); + } else if (key == "dpdk_corelist") { + /* NOTE: This arg may have commas, so limited to config file */ + opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str()); + } else if (key == "dpdk_coremap") { + opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str()); + } else if (key == "dpdk_master_lcore") { + opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str()); + } else if (key == "dpdk_pci_blacklist") { + opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str()); + } else if (key == "dpdk_pci_whitelist") { + opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str()); + } else if (key == "dpdk_log_level") { + opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str()); + } else if (key == "dpdk_huge_dir") { + opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str()); + } else if (key == "dpdk_file_prefix") { + opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str()); + } else if (key == "dpdk_driver") { + opt = eal_add_opt(argv, end - opt, opt, "-d", val.c_str()); + } + /* TODO: Change where log goes? + int rte_openlog_stream( FILE * f) + */ + } + /* Init DPDK's EAL */ + int ret = rte_eal_init(argv.size(), (char**)argv.data()); + /* Done with the temporaries */ + delete args; + + if (ret < 0) { + UHD_LOG_ERROR("DPDK", "Error with EAL initialization"); + throw uhd::runtime_error("Error with EAL initialization"); + } + + /* Create pktbuf pool entries, but only allocate on use */ + int socket_count = rte_socket_count(); + for (int i = 0; i < socket_count; i++) { + _rx_pktbuf_pools.push_back(NULL); + _tx_pktbuf_pools.push_back(NULL); + } +} + +/** + * Init DPDK environment, including DPDK's EAL. + * This will make available information about the DPDK-assigned NIC devices. + * + * \param user_args User args passed in to override config files + */ +void dpdk_ctx::init(const device_addr_t& user_args) +{ + unsigned int i; + std::lock_guard<std::mutex> lock(_init_mutex); + if (!_init_done) { + /* Gather global config, build args for EAL, and init UHD-DPDK */ + const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args); + UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl << dpdk_args.to_pp_string()); + _eal_init(dpdk_args); + + /* TODO: Should MTU be defined per-port? */ + _mtu = dpdk_args.cast<size_t>("dpdk_mtu", DEFAULT_FRAME_SIZE); + /* This is per queue */ + _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); + _mbuf_cache_size = + dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); + + /* Get device info for all the NIC ports */ + int num_dpdk_ports = rte_eth_dev_count_avail(); + UHD_ASSERT_THROW(num_dpdk_ports > 0); + device_addrs_t nics(num_dpdk_ports); + RTE_ETH_FOREACH_DEV(i) + { + struct ether_addr mac_addr; + rte_eth_macaddr_get(i, &mac_addr); + nics[i]["dpdk_mac"] = eth_addr_to_string(mac_addr); + } + + /* Get user configuration for each NIC port */ + device_addrs_t args = separate_device_addr(user_args); + size_t queue_count = 0; + RTE_ETH_FOREACH_DEV(i) + { + auto& nic = nics.at(i); + for (const auto& arg : args) { + /* Match DPDK-discovered NICs and user config via MAC addr */ + if (arg.has_key("dpdk_mac") && nic["dpdk_mac"] == arg["dpdk_mac"]) { + /* Copy user args for discovered NICs */ + nic.update(arg, false); + break; + } + } + /* Now combine user args with conf file */ + auto conf = uhd::prefs::get_dpdk_nic_args(nic); + + /* Update config, and remove ports that aren't fully configured */ + if (conf.has_key("dpdk_ipv4")) { + nics[i] = conf; + /* Update queue count, to generate a large enough mempool */ + queue_count += conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()); + } else { + nics[i] = device_addr_t(); + } + } + + RTE_ETH_FOREACH_DEV(i) + { + auto& conf = nics.at(i); + if (conf.has_key("dpdk_ipv4")) { + // Allocating enough buffers for all DMA queues for each CPU socket + // - This is a bit inefficient for larger systems, since NICs may not + // all be on one socket + auto cpu_socket = rte_eth_dev_socket_id(i); + auto rx_pool = _get_rx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count); + auto tx_pool = _get_tx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count); + UHD_LOG_TRACE("DPDK", + "Initializing NIC(" << i << "):" << std::endl + << conf.to_pp_string()); + _ports[i] = dpdk_port::make(i, + _mtu, + conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), + _num_mbufs, + rx_pool, + tx_pool, + conf["dpdk_ipv4"]); + } + } + + UHD_LOG_TRACE("DPDK", "Waiting for links to come up..."); + rte_delay_ms(1000); + for (auto& port : _ports) { + struct rte_eth_link link; + auto portid = port.second->get_port_id(); + rte_eth_link_get(portid, &link); + unsigned int link_status = link.link_status; + unsigned int link_speed = link.link_speed; + UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") + % portid % link_status % link_speed; + } + UHD_LOG_TRACE("DPDK", "Init DONE!"); + + _init_done = true; + } +} + +dpdk_port* dpdk_ctx::get_port(port_id_t port) const +{ + assert(is_init_done()); + if (_ports.count(port) == 0) { + return nullptr; + } + return _ports.at(port).get(); +} + +dpdk_port* dpdk_ctx::get_port(struct ether_addr mac_addr) const +{ + assert(is_init_done()); + for (const auto& port : _ports) { + struct ether_addr port_mac_addr; + rte_eth_macaddr_get(port.first, &port_mac_addr); + for (int j = 0; j < 6; j++) { + if (mac_addr.addr_bytes[j] != port_mac_addr.addr_bytes[j]) { + break; + } + if (j == 5) { + return port.second.get(); + } + } + } + return nullptr; +} + +int dpdk_ctx::get_port_count(void) +{ + assert(is_init_done()); + return _ports.size(); +} + +int dpdk_ctx::get_port_queue_count(port_id_t portid) +{ + assert(is_init_done()); + return _ports.at(portid)->get_queue_count(); +} + +int dpdk_ctx::get_port_link_status(port_id_t portid) const +{ + struct rte_eth_link link; + rte_eth_link_get_nowait(portid, &link); + return link.link_status; +} + +int dpdk_ctx::get_route(const std::string& addr) const +{ + const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); + for (const auto& port : _ports) { + if (get_port_link_status(port.first) < 1) + continue; + uint32_t src_ipv4 = port.second->get_ipv4(); + uint32_t netmask = port.second->get_netmask(); + if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { + return (int)port.first; + } + } + return -ENODEV; +} + + +bool dpdk_ctx::is_init_done(void) const +{ + return _init_done.load(); +} + +struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( + unsigned int cpu_socket, size_t num_bufs) +{ + if (!_rx_pktbuf_pools.at(cpu_socket)) { + const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; + char name[32]; + snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); + _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( + name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + if (!_rx_pktbuf_pools.at(cpu_socket)) { + UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); + throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); + } + } + return _rx_pktbuf_pools.at(cpu_socket); +} + +struct rte_mempool* dpdk_ctx::_get_tx_pktbuf_pool( + unsigned int cpu_socket, size_t num_bufs) +{ + if (!_tx_pktbuf_pools.at(cpu_socket)) { + const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; + char name[32]; + snprintf(name, sizeof(name), "tx_mbuf_pool_%u", cpu_socket); + _tx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( + name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + if (!_tx_pktbuf_pools.at(cpu_socket)) { + UHD_LOG_ERROR("DPDK", "Could not allocate TX pktbuf pool"); + throw uhd::runtime_error("DPDK: Could not allocate TX pktbuf pool"); + } + } + return _tx_pktbuf_pools.at(cpu_socket); +} + +}}} // namespace uhd::transport::dpdk |