From 4e38eef817813c1bbd8a9cf972e4cf0134d24308 Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Sun, 1 Dec 2019 21:58:13 -0800 Subject: dpdk: Add new DPDK stack to integrate with I/O services docs: Update DPDK docs with new parameters: Parameter names have had their hyphens changed to underscores, and the I/O CPU argument is now named after the lcores and reflects the naming used by DPDK. transport: Add new udp_dpdk_link, based atop the new APIs: This link is tightly coupled with the DPDK I/O service. The link class carries all the address information to communicate with the other host, and it can send packets directly through the DPDK NIC ports. However, for receiving packets, the I/O service must pull the packets from the DMA queue and attach them to the appropriate link object. The link object merely formats the frame_buff object underneath, which is embedded in the rte_mbuf container. For get_recv_buff, the link will pull buffers only from its internal queue (the one filled by the I/O service). transport: Add DPDK-specific I/O service: The I/O service is split into two parts, the user threads and the I/O worker threads. The user threads submit requests through various appropriate queues, and the I/O threads perform all the I/O on their behalf. This includes routing UDP packets to the correct receiver and getting the MAC address of a destination (by performing the ARP request and handling the ARP replies). The DPDK context stores I/O services. The context spawns all I/O services on init(), and I/O services can be fetched from the dpdk_ctx object by using a port ID. I/O service clients: The clients have two lockless ring buffers. One is to get a buffer from the I/O service; the other is to release a buffer back to the I/O service. Threads sleeping on buffer I/O are kept in a separate list from the service queue and are processed in the course of doing RX or TX. The list nodes are embedded in the dpdk_io_if, and the head of the list is on the dpdk_io_service. The I/O service will transfer the embedded wait_req to the list if it cannot acquire the mutex to complete the condition for waking. Co-authored-by: Martin Braun Co-authored-by: Ciro Nishiguchi Co-authored-by: Brent Stapleton --- host/lib/transport/uhd-dpdk/dpdk_common.cpp | 258 ++++++++++++++-------------- 1 file changed, 129 insertions(+), 129 deletions(-) (limited to 'host/lib/transport/uhd-dpdk/dpdk_common.cpp') diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp index 43a1507bb..46818e973 100644 --- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -3,8 +3,13 @@ // // SPDX-License-Identifier: GPL-3.0-or-later // + +#include #include +#include #include +#include +#include #include #include #include @@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; inline char* eal_add_opt( std::vector& argv, size_t n, char* dst, const char* opt, const char* arg) { + UHD_LOG_TRACE("DPDK", opt << " " << arg); char* ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); @@ -34,15 +40,6 @@ inline char* eal_add_opt( return ptr; } -inline std::string eth_addr_to_string(const struct ether_addr mac_addr) -{ - auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); - mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] - % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] - % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; - return mac_stream.str(); -} - inline void separate_ipv4_addr( const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) { @@ -59,28 +56,29 @@ inline void separate_ipv4_addr( dpdk_port::uptr dpdk_port::make(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) { return std::make_unique( - port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); + port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); } dpdk_port::dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) : _port(port) , _mtu(mtu) + , _num_queues(num_queues) , _rx_pktbuf_pool(rx_pktbuf_pool) , _tx_pktbuf_pool(tx_pktbuf_pool) { - /* 1. Set MTU and IPv4 address */ + /* Set MTU and IPv4 address */ int retval; retval = rte_eth_dev_set_mtu(_port, _mtu); @@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port, separate_ipv4_addr(ipv4_address, _ipv4, _netmask); - /* 2. Set hardware offloads */ + /* Set hardware offloads */ struct rte_eth_dev_info dev_info; rte_eth_dev_info_get(_port, &dev_info); uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; @@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the device"); } - /* 3. Set descriptor ring sizes */ - uint16_t rx_desc = num_mbufs; + /* Set descriptor ring sizes */ + uint16_t rx_desc = num_desc; if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.rx_desc_lim.nb_min + % _port % num_desc % dev_info.rx_desc_lim.nb_min % dev_info.rx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num RX descriptors must also be aligned to 0x%x") @@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); } - uint16_t tx_desc = num_mbufs; + uint16_t tx_desc = num_desc; if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.tx_desc_lim.nb_min + % _port % num_desc % dev_info.tx_desc_lim.nb_min % dev_info.tx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num TX descriptors must also be aligned to 0x%x") @@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); } - /* 4. Set up the RX and TX DMA queues (May not be generally supported after + /* Set up the RX and TX DMA queues (May not be generally supported after * eth_dev_start) */ unsigned int cpu_socket = rte_eth_dev_socket_id(_port); for (uint16_t i = 0; i < _num_queues; i++) { @@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port, } } - /* 5. Set up initial flow table */ + /* TODO: Enable multiple queues (only support 1 right now) */ - /* Append all free queues except 0, which is reserved for ARP */ - _free_queues.reserve(_num_queues - 1); - for (unsigned int i = 1; i < _num_queues; i++) { - _free_queues.push_back(i); - } - - // struct rte_flow_attr flow_attr; - // flow_attr.group = 0; - // flow_attr.priority = 1; - // flow_attr.ingress = 1; - // flow_attr.egress = 0; - // flow_attr.transfer = 0; - // flow_attr.reserved = 0; - - // struct rte_flow_item[] flow_pattern = { - //}; - // int rte_flow_validate(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action actions[], - // struct rte_flow_error *error); - // struct rte_flow * rte_flow_create(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action *actions[], - // struct rte_flow_error *error); - - /* 6. Start the Ethernet device */ + /* Start the Ethernet device */ retval = rte_eth_dev_start(_port); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") @@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port, << " MAC: " << eth_addr_to_string(_mac_addr); } -/* TODO: Do flow directions */ -queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +dpdk_port::~dpdk_port() { - std::lock_guard lock(_mutex); - UHD_ASSERT_THROW(_free_queues.size() != 0); - auto queue = _free_queues.back(); - _free_queues.pop_back(); - return queue; + rte_eth_dev_stop(_port); + rte_spinlock_lock(&_spinlock); + for (auto kv : _arp_table) { + for (auto req : kv.second->reqs) { + req->cond.notify_one(); + } + rte_free(kv.second); + } + _arp_table.clear(); + rte_spinlock_unlock(&_spinlock); } -void dpdk_port::free_queue(queue_id_t queue) +uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port) { + uint16_t port_selected; std::lock_guard lock(_mutex); - auto flow = _flow_rules.at(queue); - int status = rte_flow_destroy(_port, flow, NULL); - if (status) { - UHD_LOGGER_ERROR("DPDK") - << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port - % queue; - throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); + if (udp_port) { + if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) { + return 0; + } + port_selected = rte_be_to_cpu_16(udp_port); } else { - _flow_rules.erase(queue); + if (_udp_ports.size() >= 65535) { + UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain"); + return 0; + } + port_selected = _next_udp_port; + while (true) { + if (port_selected == 0) { + continue; + } + if (_udp_ports.count(port_selected) == 0) { + _next_udp_port = port_selected - 1; + break; + } + if (port_selected - 1 == _next_udp_port) { + return 0; + } + port_selected--; + } } - _free_queues.push_back(queue); + _udp_ports.insert(port_selected); + return rte_cpu_to_be_16(port_selected); } -int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req) { struct rte_mbuf* mbuf; struct ether_hdr* hdr; struct arp_hdr* arp_frame; - mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); + mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool); if (unlikely(mbuf == NULL)) { UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); return -ENOMEM; @@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar mbuf->pkt_len = 42; mbuf->data_len = 42; - // ARP replies always on queue 0 - if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { + if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) { UHD_LOGGER_WARNING("DPDK") << boost::format("%s: TX descriptor ring is full") % __func__; rte_pktmbuf_free(mbuf); @@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar return 0; } -// TODO: ARP processing for queue 0 -// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr -// *arp_frame) -//{ -// std::lock_guard lock(_mutex); -// uint32_t dest_ip = arp_frame->arp_data.arp_sip; -// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; -// -// /* Add entry to ARP table */ -// struct uhd_dpdk_arp_entry *entry = NULL; -// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); -// if (!entry) { -// entry = rte_zmalloc(NULL, sizeof(*entry), 0); -// if (!entry) { -// return -ENOMEM; -// } -// LIST_INIT(&entry->pending_list); -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { -// rte_free(entry); -// return -ENOSPC; -// } -// } else { -// struct uhd_dpdk_config_req *req = NULL; -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// /* Now wake any config reqs waiting for the ARP */ -// LIST_FOREACH(req, &entry->pending_list, entry) { -// _uhd_dpdk_config_req_compl(req, 0); -// } -// while (entry->pending_list.lh_first != NULL) { -// LIST_REMOVE(entry->pending_list.lh_first, entry); -// } -// } -// /* Respond if this was an ARP request */ -// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && -// arp_frame->arp_data.arp_tip == port->ipv4_addr) { -// _arp_reply(tx_pktbuf_pool, arp_frame); -// } -// -// return 0; -// -//} - -static dpdk_ctx* global_ctx = nullptr; +static dpdk_ctx::sptr global_ctx = nullptr; static std::mutex global_ctx_mutex; dpdk_ctx::sptr dpdk_ctx::get() { std::lock_guard lock(global_ctx_mutex); if (!global_ctx) { - auto new_ctx = std::make_shared(); - global_ctx = new_ctx.get(); - return new_ctx; + global_ctx = std::make_shared(); } - return global_ctx->shared_from_this(); + return global_ctx; } dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} @@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args) auto args = new std::array(); char* opt = args->data(); char* end = args->data() + args->size(); + UHD_LOG_TRACE("DPDK", "EAL init options: "); for (std::string& key : eal_args.keys()) { std::string val = eal_args[key]; if (key == "dpdk_coremask") { @@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args) _num_mbufs = dpdk_args.cast("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); _mbuf_cache_size = dpdk_args.cast("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); + UHD_LOG_TRACE("DPDK", + "mtu: " << _mtu << " num_mbufs: " << _num_mbufs + << " mbuf_cache_size: " << _mbuf_cache_size); /* Get device info for all the NIC ports */ int num_dpdk_ports = rte_eth_dev_count_avail(); - UHD_ASSERT_THROW(num_dpdk_ports > 0); + if (num_dpdk_ports == 0) { + UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!"); + throw uhd::runtime_error("No available DPDK devices (ports) found!"); + } device_addrs_t nics(num_dpdk_ports); RTE_ETH_FOREACH_DEV(i) { @@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args) } /* Now combine user args with conf file */ auto conf = uhd::prefs::get_dpdk_nic_args(nic); + // TODO: Enable the use of multiple DMA queues + conf["dpdk_num_queues"] = "1"; /* Update config, and remove ports that aren't fully configured */ if (conf.has_key("dpdk_ipv4")) { @@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args) } } + std::map> lcore_to_port_id_map; RTE_ETH_FOREACH_DEV(i) { auto& conf = nics.at(i); if (conf.has_key("dpdk_ipv4")) { + UHD_ASSERT_THROW(conf.has_key("dpdk_lcore")); + const size_t lcore_id = conf.cast("dpdk_lcore", 0); + if (!lcore_to_port_id_map.count(lcore_id)) { + lcore_to_port_id_map.insert({lcore_id, {}}); + } + // Allocating enough buffers for all DMA queues for each CPU socket // - This is a bit inefficient for larger systems, since NICs may not // all be on one socket @@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args) _ports[i] = dpdk_port::make(i, _mtu, conf.cast("dpdk_num_queues", rte_lcore_count()), - _num_mbufs, + conf.cast("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE), rx_pool, tx_pool, conf["dpdk_ipv4"]); + + // Remember all port IDs that map to an lcore + lcore_to_port_id_map.at(lcore_id).push_back(i); } } @@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args) rte_eth_link_get(portid, &link); unsigned int link_status = link.link_status; unsigned int link_speed = link.link_speed; - UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") - % portid % link_status % link_speed; + UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid + % link_status % link_speed; } - UHD_LOG_TRACE("DPDK", "Init DONE!"); - + UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services"); _init_done = true; + + // Links are up, now create one IO service per lcore + for (auto& lcore_portids_pair : lcore_to_port_id_map) { + const size_t lcore_id = lcore_portids_pair.first; + std::vector dpdk_ports; + dpdk_ports.reserve(lcore_portids_pair.second.size()); + for (const size_t port_id : lcore_portids_pair.second) { + dpdk_ports.push_back(get_port(port_id)); + } + const size_t servq_depth = 32; // FIXME + UHD_LOG_TRACE("DPDK", + "Creating I/O service for lcore " + << lcore_id << ", servicing " << dpdk_ports.size() + << " ports, service queue depth " << servq_depth); + _io_srv_portid_map.insert( + {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth), + lcore_portids_pair.second}); + } } } @@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const return link.link_status; } -int dpdk_ctx::get_route(const std::string& addr) const +dpdk_port* dpdk_ctx::get_route(const std::string& addr) const { const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); for (const auto& port : _ports) { @@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const uint32_t src_ipv4 = port.second->get_ipv4(); uint32_t netmask = port.second->get_netmask(); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { - return (int)port.first; + return port.second.get(); } } - return -ENODEV; + return NULL; } @@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const return _init_done.load(); } +uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id) +{ + for (auto& io_srv_portid_pair : _io_srv_portid_map) { + if (uhd::has(io_srv_portid_pair.second, port_id)) { + return io_srv_portid_pair.first; + } + } + + std::string err_msg = std::string("Cannot look up I/O service for port ID: ") + + std::to_string(port_id) + ". No such port ID!"; + UHD_LOG_ERROR("DPDK", err_msg); + throw uhd::lookup_error(err_msg); +} + struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( unsigned int cpu_socket, size_t num_bufs) { @@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; char name[32]; snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); - _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( - name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name, + num_bufs, + _mbuf_cache_size, + DPDK_MBUF_PRIV_SIZE, + mbuf_size, + SOCKET_ID_ANY); if (!_rx_pktbuf_pools.at(cpu_socket)) { UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); -- cgit v1.2.3