// // Copyright 2019 Ettus Research, a National Instruments brand // // SPDX-License-Identifier: GPL-3.0-or-later // #include #include #include #include #include #include namespace uhd { namespace transport { namespace dpdk { namespace { constexpr uint64_t USEC = 1000000; constexpr size_t DEFAULT_FRAME_SIZE = 8000; constexpr int DEFAULT_NUM_MBUFS = 1024; constexpr int DEFAULT_MBUF_CACHE_SIZE = 315; constexpr size_t DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; inline char* eal_add_opt( std::vector& argv, size_t n, char* dst, const char* opt, const char* arg) { char* ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); ptr += strlen(opt) + 1; n -= ptr - dst; strncpy(ptr, arg, n); argv.push_back(ptr); ptr += strlen(arg) + 1; return ptr; } inline std::string eth_addr_to_string(const struct ether_addr mac_addr) { auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; return mac_stream.str(); } inline void separate_ipv4_addr( const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) { std::vector result; boost::algorithm::split( result, ipv4, [](const char& in) { return in == '/'; }, boost::token_compress_on); UHD_ASSERT_THROW(result.size() == 2); ipv4_addr = (uint32_t)inet_addr(result[0].c_str()); int netbits = std::atoi(result[1].c_str()); netmask = htonl(0xffffffff << (32 - netbits)); } } // namespace dpdk_port::uptr dpdk_port::make(port_id_t port, size_t mtu, uint16_t num_queues, size_t num_mbufs, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) { return std::make_unique( port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); } dpdk_port::dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, size_t num_mbufs, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) : _port(port) , _mtu(mtu) , _rx_pktbuf_pool(rx_pktbuf_pool) , _tx_pktbuf_pool(tx_pktbuf_pool) { /* 1. Set MTU and IPv4 address */ int retval; retval = rte_eth_dev_set_mtu(_port, _mtu); if (retval) { uint16_t actual_mtu; UHD_LOGGER_WARNING("DPDK") << boost::format("Port %d: Could not set mtu to %d") % _port % _mtu; rte_eth_dev_get_mtu(_port, &actual_mtu); UHD_LOGGER_WARNING("DPDK") << boost::format("Port %d: Current mtu=%d") % _port % _mtu; _mtu = actual_mtu; } separate_ipv4_addr(ipv4_address, _ipv4, _netmask); /* 2. Set hardware offloads */ struct rte_eth_dev_info dev_info; rte_eth_dev_info_get(_port, &dev_info); uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; uint64_t tx_offloads = DEV_TX_OFFLOAD_IPV4_CKSUM; if ((dev_info.rx_offload_capa & rx_offloads) != rx_offloads) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports RX offloads 0x%0llx") % _port % dev_info.rx_offload_capa; throw uhd::runtime_error("DPDK: Missing required RX offloads"); } if ((dev_info.tx_offload_capa & tx_offloads) != tx_offloads) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports TX offloads 0x%0llx") % _port % dev_info.tx_offload_capa; throw uhd::runtime_error("DPDK: Missing required TX offloads"); } // Check number of available queues if (dev_info.max_rx_queues < num_queues || dev_info.max_tx_queues < num_queues) { _num_queues = std::min(dev_info.max_rx_queues, dev_info.max_tx_queues); UHD_LOGGER_WARNING("DPDK") << boost::format("%d: Maximum queues supported is %d") % _port % _num_queues; } else { _num_queues = num_queues; } struct rte_eth_conf port_conf = {}; port_conf.rxmode.offloads = rx_offloads | DEV_RX_OFFLOAD_JUMBO_FRAME; port_conf.rxmode.max_rx_pkt_len = _mtu; port_conf.txmode.offloads = tx_offloads; retval = rte_eth_dev_configure(_port, _num_queues, _num_queues, &port_conf); if (retval != 0) { UHD_LOG_ERROR("DPDK", "Failed to configure the device"); throw uhd::runtime_error("DPDK: Failed to configure the device"); } /* 3. Set descriptor ring sizes */ uint16_t rx_desc = num_mbufs; if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") % _port % num_mbufs % dev_info.rx_desc_lim.nb_min % dev_info.rx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num RX descriptors must also be aligned to 0x%x") % dev_info.rx_desc_lim.nb_align; throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); } uint16_t tx_desc = num_mbufs; if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") % _port % num_mbufs % dev_info.tx_desc_lim.nb_min % dev_info.tx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num TX descriptors must also be aligned to 0x%x") % dev_info.tx_desc_lim.nb_align; throw uhd::runtime_error("DPDK: Failed to allocate TX descriptors"); } retval = rte_eth_dev_adjust_nb_rx_tx_desc(_port, &rx_desc, &tx_desc); if (retval != 0) { UHD_LOG_ERROR("DPDK", "Failed to configure the DMA queues "); throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); } /* 4. Set up the RX and TX DMA queues (May not be generally supported after * eth_dev_start) */ unsigned int cpu_socket = rte_eth_dev_socket_id(_port); for (uint16_t i = 0; i < _num_queues; i++) { retval = rte_eth_rx_queue_setup(_port, i, rx_desc, cpu_socket, NULL, _rx_pktbuf_pool); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") << boost::format("Port %d: Could not init RX queue %d") % _port % i; throw uhd::runtime_error("DPDK: Failure to init RX queue"); } struct rte_eth_txconf txconf = dev_info.default_txconf; txconf.offloads = DEV_TX_OFFLOAD_IPV4_CKSUM; retval = rte_eth_tx_queue_setup(_port, i, tx_desc, cpu_socket, &txconf); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") << boost::format("Port %d: Could not init TX queue %d") % _port % i; throw uhd::runtime_error("DPDK: Failure to init TX queue"); } } /* 5. Set up initial flow table */ /* Append all free queues except 0, which is reserved for ARP */ _free_queues.reserve(_num_queues - 1); for (unsigned int i = 1; i < _num_queues; i++) { _free_queues.push_back(i); } // struct rte_flow_attr flow_attr; // flow_attr.group = 0; // flow_attr.priority = 1; // flow_attr.ingress = 1; // flow_attr.egress = 0; // flow_attr.transfer = 0; // flow_attr.reserved = 0; // struct rte_flow_item[] flow_pattern = { //}; // int rte_flow_validate(uint16_t port_id, // const struct rte_flow_attr *attr, // const struct rte_flow_item pattern[], // const struct rte_flow_action actions[], // struct rte_flow_error *error); // struct rte_flow * rte_flow_create(uint16_t port_id, // const struct rte_flow_attr *attr, // const struct rte_flow_item pattern[], // const struct rte_flow_action *actions[], // struct rte_flow_error *error); /* 6. Start the Ethernet device */ retval = rte_eth_dev_start(_port); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") << boost::format("Port %d: Could not start device") % _port; throw uhd::runtime_error("DPDK: Failure to start device"); } /* Grab and display the port MAC address. */ rte_eth_macaddr_get(_port, &_mac_addr); UHD_LOGGER_TRACE("DPDK") << "Port " << _port << " MAC: " << eth_addr_to_string(_mac_addr); } /* TODO: Do flow directions */ queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) { std::lock_guard lock(_mutex); UHD_ASSERT_THROW(_free_queues.size() != 0); auto queue = _free_queues.back(); _free_queues.pop_back(); return queue; } void dpdk_port::free_queue(queue_id_t queue) { std::lock_guard lock(_mutex); auto flow = _flow_rules.at(queue); int status = rte_flow_destroy(_port, flow, NULL); if (status) { UHD_LOGGER_ERROR("DPDK") << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port % queue; throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); } else { _flow_rules.erase(queue); } _free_queues.push_back(queue); } int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) { struct rte_mbuf* mbuf; struct ether_hdr* hdr; struct arp_hdr* arp_frame; mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); if (unlikely(mbuf == NULL)) { UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); return -ENOMEM; } hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); arp_frame = (struct arp_hdr*)&hdr[1]; ether_addr_copy(&arp_req->arp_data.arp_sha, &hdr->d_addr); ether_addr_copy(&_mac_addr, &hdr->s_addr); hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); arp_frame->arp_hln = 6; arp_frame->arp_pln = 4; arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY); ether_addr_copy(&_mac_addr, &arp_frame->arp_data.arp_sha); arp_frame->arp_data.arp_sip = _ipv4; ether_addr_copy(&hdr->d_addr, &arp_frame->arp_data.arp_tha); arp_frame->arp_data.arp_tip = arp_req->arp_data.arp_sip; mbuf->pkt_len = 42; mbuf->data_len = 42; // ARP replies always on queue 0 if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { UHD_LOGGER_WARNING("DPDK") << boost::format("%s: TX descriptor ring is full") % __func__; rte_pktmbuf_free(mbuf); return -EAGAIN; } return 0; } // TODO: ARP processing for queue 0 // int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr // *arp_frame) //{ // std::lock_guard lock(_mutex); // uint32_t dest_ip = arp_frame->arp_data.arp_sip; // struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; // // /* Add entry to ARP table */ // struct uhd_dpdk_arp_entry *entry = NULL; // rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); // if (!entry) { // entry = rte_zmalloc(NULL, sizeof(*entry), 0); // if (!entry) { // return -ENOMEM; // } // LIST_INIT(&entry->pending_list); // ether_addr_copy(&dest_addr, &entry->mac_addr); // if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { // rte_free(entry); // return -ENOSPC; // } // } else { // struct uhd_dpdk_config_req *req = NULL; // ether_addr_copy(&dest_addr, &entry->mac_addr); // /* Now wake any config reqs waiting for the ARP */ // LIST_FOREACH(req, &entry->pending_list, entry) { // _uhd_dpdk_config_req_compl(req, 0); // } // while (entry->pending_list.lh_first != NULL) { // LIST_REMOVE(entry->pending_list.lh_first, entry); // } // } // /* Respond if this was an ARP request */ // if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && // arp_frame->arp_data.arp_tip == port->ipv4_addr) { // _arp_reply(tx_pktbuf_pool, arp_frame); // } // // return 0; // //} static dpdk_ctx* global_ctx = nullptr; static std::mutex global_ctx_mutex; dpdk_ctx::sptr dpdk_ctx::get() { std::lock_guard lock(global_ctx_mutex); if (!global_ctx) { auto new_ctx = std::make_shared(); global_ctx = new_ctx.get(); return new_ctx; } return global_ctx->shared_from_this(); } dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} dpdk_ctx::~dpdk_ctx(void) { std::lock_guard lock(global_ctx_mutex); global_ctx = nullptr; // Stop all the ports for (auto& port : _ports) { rte_eth_dev_stop(port.first); } // Free mempools for (auto& pool : _rx_pktbuf_pools) { rte_mempool_free(pool); } for (auto& pool : _tx_pktbuf_pools) { rte_mempool_free(pool); } // Free EAL resources rte_eal_cleanup(); } void dpdk_ctx::_eal_init(const device_addr_t& eal_args) { /* Build up argc and argv */ std::vector argv; argv.push_back("uhd::transport::dpdk"); auto args = new std::array(); char* opt = args->data(); char* end = args->data() + args->size(); for (std::string& key : eal_args.keys()) { std::string val = eal_args[key]; if (key == "dpdk_coremask") { opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str()); } else if (key == "dpdk_corelist") { /* NOTE: This arg may have commas, so limited to config file */ opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str()); } else if (key == "dpdk_coremap") { opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str()); } else if (key == "dpdk_master_lcore") { opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str()); } else if (key == "dpdk_pci_blacklist") { opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str()); } else if (key == "dpdk_pci_whitelist") { opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str()); } else if (key == "dpdk_log_level") { opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str()); } else if (key == "dpdk_huge_dir") { opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str()); } else if (key == "dpdk_file_prefix") { opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str()); } else if (key == "dpdk_driver") { opt = eal_add_opt(argv, end - opt, opt, "-d", val.c_str()); } /* TODO: Change where log goes? int rte_openlog_stream( FILE * f) */ } /* Init DPDK's EAL */ int ret = rte_eal_init(argv.size(), (char**)argv.data()); /* Done with the temporaries */ delete args; if (ret < 0) { UHD_LOG_ERROR("DPDK", "Error with EAL initialization"); throw uhd::runtime_error("Error with EAL initialization"); } /* Create pktbuf pool entries, but only allocate on use */ int socket_count = rte_socket_count(); for (int i = 0; i < socket_count; i++) { _rx_pktbuf_pools.push_back(NULL); _tx_pktbuf_pools.push_back(NULL); } } /** * Init DPDK environment, including DPDK's EAL. * This will make available information about the DPDK-assigned NIC devices. * * \param user_args User args passed in to override config files */ void dpdk_ctx::init(const device_addr_t& user_args) { unsigned int i; std::lock_guard lock(_init_mutex); if (!_init_done) { /* Gather global config, build args for EAL, and init UHD-DPDK */ const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args); UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl << dpdk_args.to_pp_string()); _eal_init(dpdk_args); /* TODO: Should MTU be defined per-port? */ _mtu = dpdk_args.cast("dpdk_mtu", DEFAULT_FRAME_SIZE); /* This is per queue */ _num_mbufs = dpdk_args.cast("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); _mbuf_cache_size = dpdk_args.cast("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); /* Get device info for all the NIC ports */ int num_dpdk_ports = rte_eth_dev_count_avail(); UHD_ASSERT_THROW(num_dpdk_ports > 0); device_addrs_t nics(num_dpdk_ports); RTE_ETH_FOREACH_DEV(i) { struct ether_addr mac_addr; rte_eth_macaddr_get(i, &mac_addr); nics[i]["dpdk_mac"] = eth_addr_to_string(mac_addr); } /* Get user configuration for each NIC port */ device_addrs_t args = separate_device_addr(user_args); size_t queue_count = 0; RTE_ETH_FOREACH_DEV(i) { auto& nic = nics.at(i); for (const auto& arg : args) { /* Match DPDK-discovered NICs and user config via MAC addr */ if (arg.has_key("dpdk_mac") && nic["dpdk_mac"] == arg["dpdk_mac"]) { /* Copy user args for discovered NICs */ nic.update(arg, false); break; } } /* Now combine user args with conf file */ auto conf = uhd::prefs::get_dpdk_nic_args(nic); /* Update config, and remove ports that aren't fully configured */ if (conf.has_key("dpdk_ipv4")) { nics[i] = conf; /* Update queue count, to generate a large enough mempool */ queue_count += conf.cast("dpdk_num_queues", rte_lcore_count()); } else { nics[i] = device_addr_t(); } } RTE_ETH_FOREACH_DEV(i) { auto& conf = nics.at(i); if (conf.has_key("dpdk_ipv4")) { // Allocating enough buffers for all DMA queues for each CPU socket // - This is a bit inefficient for larger systems, since NICs may not // all be on one socket auto cpu_socket = rte_eth_dev_socket_id(i); auto rx_pool = _get_rx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count); auto tx_pool = _get_tx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count); UHD_LOG_TRACE("DPDK", "Initializing NIC(" << i << "):" << std::endl << conf.to_pp_string()); _ports[i] = dpdk_port::make(i, _mtu, conf.cast("dpdk_num_queues", rte_lcore_count()), _num_mbufs, rx_pool, tx_pool, conf["dpdk_ipv4"]); } } UHD_LOG_TRACE("DPDK", "Waiting for links to come up..."); rte_delay_ms(1000); for (auto& port : _ports) { struct rte_eth_link link; auto portid = port.second->get_port_id(); rte_eth_link_get(portid, &link); unsigned int link_status = link.link_status; unsigned int link_speed = link.link_speed; UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") % portid % link_status % link_speed; } UHD_LOG_TRACE("DPDK", "Init DONE!"); _init_done = true; } } dpdk_port* dpdk_ctx::get_port(port_id_t port) const { assert(is_init_done()); if (_ports.count(port) == 0) { return nullptr; } return _ports.at(port).get(); } dpdk_port* dpdk_ctx::get_port(struct ether_addr mac_addr) const { assert(is_init_done()); for (const auto& port : _ports) { struct ether_addr port_mac_addr; rte_eth_macaddr_get(port.first, &port_mac_addr); for (int j = 0; j < 6; j++) { if (mac_addr.addr_bytes[j] != port_mac_addr.addr_bytes[j]) { break; } if (j == 5) { return port.second.get(); } } } return nullptr; } int dpdk_ctx::get_port_count(void) { assert(is_init_done()); return _ports.size(); } int dpdk_ctx::get_port_queue_count(port_id_t portid) { assert(is_init_done()); return _ports.at(portid)->get_queue_count(); } int dpdk_ctx::get_port_link_status(port_id_t portid) const { struct rte_eth_link link; rte_eth_link_get_nowait(portid, &link); return link.link_status; } int dpdk_ctx::get_route(const std::string& addr) const { const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); for (const auto& port : _ports) { if (get_port_link_status(port.first) < 1) continue; uint32_t src_ipv4 = port.second->get_ipv4(); uint32_t netmask = port.second->get_netmask(); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { return (int)port.first; } } return -ENODEV; } bool dpdk_ctx::is_init_done(void) const { return _init_done.load(); } struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( unsigned int cpu_socket, size_t num_bufs) { if (!_rx_pktbuf_pools.at(cpu_socket)) { const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; char name[32]; snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); if (!_rx_pktbuf_pools.at(cpu_socket)) { UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); } } return _rx_pktbuf_pools.at(cpu_socket); } struct rte_mempool* dpdk_ctx::_get_tx_pktbuf_pool( unsigned int cpu_socket, size_t num_bufs) { if (!_tx_pktbuf_pools.at(cpu_socket)) { const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; char name[32]; snprintf(name, sizeof(name), "tx_mbuf_pool_%u", cpu_socket); _tx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); if (!_tx_pktbuf_pools.at(cpu_socket)) { UHD_LOG_ERROR("DPDK", "Could not allocate TX pktbuf pool"); throw uhd::runtime_error("DPDK: Could not allocate TX pktbuf pool"); } } return _tx_pktbuf_pools.at(cpu_socket); } }}} // namespace uhd::transport::dpdk