diff options
Diffstat (limited to 'host/lib/transport/uhd-dpdk')
-rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_common.cpp | 258 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_io_service.cpp | 950 |
3 files changed, 1081 insertions, 129 deletions
diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index ea78aa1e8..a13886653 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -19,9 +19,11 @@ if(ENABLE_DPDK) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp ) set_source_files_properties( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" ) include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp index 43a1507bb..46818e973 100644 --- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -3,8 +3,13 @@ // // SPDX-License-Identifier: GPL-3.0-or-later // + +#include <uhd/utils/algorithm.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> #include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> #include <uhdlib/utils/prefs.hpp> #include <arpa/inet.h> #include <rte_arp.h> @@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; inline char* eal_add_opt( std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg) { + UHD_LOG_TRACE("DPDK", opt << " " << arg); char* ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); @@ -34,15 +40,6 @@ inline char* eal_add_opt( return ptr; } -inline std::string eth_addr_to_string(const struct ether_addr mac_addr) -{ - auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); - mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] - % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] - % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; - return mac_stream.str(); -} - inline void separate_ipv4_addr( const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) { @@ -59,28 +56,29 @@ inline void separate_ipv4_addr( dpdk_port::uptr dpdk_port::make(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) { return std::make_unique<dpdk_port>( - port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); + port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); } dpdk_port::dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) : _port(port) , _mtu(mtu) + , _num_queues(num_queues) , _rx_pktbuf_pool(rx_pktbuf_pool) , _tx_pktbuf_pool(tx_pktbuf_pool) { - /* 1. Set MTU and IPv4 address */ + /* Set MTU and IPv4 address */ int retval; retval = rte_eth_dev_set_mtu(_port, _mtu); @@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port, separate_ipv4_addr(ipv4_address, _ipv4, _netmask); - /* 2. Set hardware offloads */ + /* Set hardware offloads */ struct rte_eth_dev_info dev_info; rte_eth_dev_info_get(_port, &dev_info); uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; @@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the device"); } - /* 3. Set descriptor ring sizes */ - uint16_t rx_desc = num_mbufs; + /* Set descriptor ring sizes */ + uint16_t rx_desc = num_desc; if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.rx_desc_lim.nb_min + % _port % num_desc % dev_info.rx_desc_lim.nb_min % dev_info.rx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num RX descriptors must also be aligned to 0x%x") @@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); } - uint16_t tx_desc = num_mbufs; + uint16_t tx_desc = num_desc; if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.tx_desc_lim.nb_min + % _port % num_desc % dev_info.tx_desc_lim.nb_min % dev_info.tx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num TX descriptors must also be aligned to 0x%x") @@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); } - /* 4. Set up the RX and TX DMA queues (May not be generally supported after + /* Set up the RX and TX DMA queues (May not be generally supported after * eth_dev_start) */ unsigned int cpu_socket = rte_eth_dev_socket_id(_port); for (uint16_t i = 0; i < _num_queues; i++) { @@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port, } } - /* 5. Set up initial flow table */ + /* TODO: Enable multiple queues (only support 1 right now) */ - /* Append all free queues except 0, which is reserved for ARP */ - _free_queues.reserve(_num_queues - 1); - for (unsigned int i = 1; i < _num_queues; i++) { - _free_queues.push_back(i); - } - - // struct rte_flow_attr flow_attr; - // flow_attr.group = 0; - // flow_attr.priority = 1; - // flow_attr.ingress = 1; - // flow_attr.egress = 0; - // flow_attr.transfer = 0; - // flow_attr.reserved = 0; - - // struct rte_flow_item[] flow_pattern = { - //}; - // int rte_flow_validate(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action actions[], - // struct rte_flow_error *error); - // struct rte_flow * rte_flow_create(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action *actions[], - // struct rte_flow_error *error); - - /* 6. Start the Ethernet device */ + /* Start the Ethernet device */ retval = rte_eth_dev_start(_port); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") @@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port, << " MAC: " << eth_addr_to_string(_mac_addr); } -/* TODO: Do flow directions */ -queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +dpdk_port::~dpdk_port() { - std::lock_guard<std::mutex> lock(_mutex); - UHD_ASSERT_THROW(_free_queues.size() != 0); - auto queue = _free_queues.back(); - _free_queues.pop_back(); - return queue; + rte_eth_dev_stop(_port); + rte_spinlock_lock(&_spinlock); + for (auto kv : _arp_table) { + for (auto req : kv.second->reqs) { + req->cond.notify_one(); + } + rte_free(kv.second); + } + _arp_table.clear(); + rte_spinlock_unlock(&_spinlock); } -void dpdk_port::free_queue(queue_id_t queue) +uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port) { + uint16_t port_selected; std::lock_guard<std::mutex> lock(_mutex); - auto flow = _flow_rules.at(queue); - int status = rte_flow_destroy(_port, flow, NULL); - if (status) { - UHD_LOGGER_ERROR("DPDK") - << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port - % queue; - throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); + if (udp_port) { + if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) { + return 0; + } + port_selected = rte_be_to_cpu_16(udp_port); } else { - _flow_rules.erase(queue); + if (_udp_ports.size() >= 65535) { + UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain"); + return 0; + } + port_selected = _next_udp_port; + while (true) { + if (port_selected == 0) { + continue; + } + if (_udp_ports.count(port_selected) == 0) { + _next_udp_port = port_selected - 1; + break; + } + if (port_selected - 1 == _next_udp_port) { + return 0; + } + port_selected--; + } } - _free_queues.push_back(queue); + _udp_ports.insert(port_selected); + return rte_cpu_to_be_16(port_selected); } -int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req) { struct rte_mbuf* mbuf; struct ether_hdr* hdr; struct arp_hdr* arp_frame; - mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); + mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool); if (unlikely(mbuf == NULL)) { UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); return -ENOMEM; @@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar mbuf->pkt_len = 42; mbuf->data_len = 42; - // ARP replies always on queue 0 - if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { + if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) { UHD_LOGGER_WARNING("DPDK") << boost::format("%s: TX descriptor ring is full") % __func__; rte_pktmbuf_free(mbuf); @@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar return 0; } -// TODO: ARP processing for queue 0 -// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr -// *arp_frame) -//{ -// std::lock_guard<std::mutex> lock(_mutex); -// uint32_t dest_ip = arp_frame->arp_data.arp_sip; -// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; -// -// /* Add entry to ARP table */ -// struct uhd_dpdk_arp_entry *entry = NULL; -// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); -// if (!entry) { -// entry = rte_zmalloc(NULL, sizeof(*entry), 0); -// if (!entry) { -// return -ENOMEM; -// } -// LIST_INIT(&entry->pending_list); -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { -// rte_free(entry); -// return -ENOSPC; -// } -// } else { -// struct uhd_dpdk_config_req *req = NULL; -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// /* Now wake any config reqs waiting for the ARP */ -// LIST_FOREACH(req, &entry->pending_list, entry) { -// _uhd_dpdk_config_req_compl(req, 0); -// } -// while (entry->pending_list.lh_first != NULL) { -// LIST_REMOVE(entry->pending_list.lh_first, entry); -// } -// } -// /* Respond if this was an ARP request */ -// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && -// arp_frame->arp_data.arp_tip == port->ipv4_addr) { -// _arp_reply(tx_pktbuf_pool, arp_frame); -// } -// -// return 0; -// -//} - -static dpdk_ctx* global_ctx = nullptr; +static dpdk_ctx::sptr global_ctx = nullptr; static std::mutex global_ctx_mutex; dpdk_ctx::sptr dpdk_ctx::get() { std::lock_guard<std::mutex> lock(global_ctx_mutex); if (!global_ctx) { - auto new_ctx = std::make_shared<dpdk_ctx>(); - global_ctx = new_ctx.get(); - return new_ctx; + global_ctx = std::make_shared<dpdk_ctx>(); } - return global_ctx->shared_from_this(); + return global_ctx; } dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} @@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args) auto args = new std::array<char, 4096>(); char* opt = args->data(); char* end = args->data() + args->size(); + UHD_LOG_TRACE("DPDK", "EAL init options: "); for (std::string& key : eal_args.keys()) { std::string val = eal_args[key]; if (key == "dpdk_coremask") { @@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args) _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); _mbuf_cache_size = dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); + UHD_LOG_TRACE("DPDK", + "mtu: " << _mtu << " num_mbufs: " << _num_mbufs + << " mbuf_cache_size: " << _mbuf_cache_size); /* Get device info for all the NIC ports */ int num_dpdk_ports = rte_eth_dev_count_avail(); - UHD_ASSERT_THROW(num_dpdk_ports > 0); + if (num_dpdk_ports == 0) { + UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!"); + throw uhd::runtime_error("No available DPDK devices (ports) found!"); + } device_addrs_t nics(num_dpdk_ports); RTE_ETH_FOREACH_DEV(i) { @@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args) } /* Now combine user args with conf file */ auto conf = uhd::prefs::get_dpdk_nic_args(nic); + // TODO: Enable the use of multiple DMA queues + conf["dpdk_num_queues"] = "1"; /* Update config, and remove ports that aren't fully configured */ if (conf.has_key("dpdk_ipv4")) { @@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args) } } + std::map<size_t, std::vector<size_t>> lcore_to_port_id_map; RTE_ETH_FOREACH_DEV(i) { auto& conf = nics.at(i); if (conf.has_key("dpdk_ipv4")) { + UHD_ASSERT_THROW(conf.has_key("dpdk_lcore")); + const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0); + if (!lcore_to_port_id_map.count(lcore_id)) { + lcore_to_port_id_map.insert({lcore_id, {}}); + } + // Allocating enough buffers for all DMA queues for each CPU socket // - This is a bit inefficient for larger systems, since NICs may not // all be on one socket @@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args) _ports[i] = dpdk_port::make(i, _mtu, conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), - _num_mbufs, + conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE), rx_pool, tx_pool, conf["dpdk_ipv4"]); + + // Remember all port IDs that map to an lcore + lcore_to_port_id_map.at(lcore_id).push_back(i); } } @@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args) rte_eth_link_get(portid, &link); unsigned int link_status = link.link_status; unsigned int link_speed = link.link_speed; - UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") - % portid % link_status % link_speed; + UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid + % link_status % link_speed; } - UHD_LOG_TRACE("DPDK", "Init DONE!"); - + UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services"); _init_done = true; + + // Links are up, now create one IO service per lcore + for (auto& lcore_portids_pair : lcore_to_port_id_map) { + const size_t lcore_id = lcore_portids_pair.first; + std::vector<dpdk_port*> dpdk_ports; + dpdk_ports.reserve(lcore_portids_pair.second.size()); + for (const size_t port_id : lcore_portids_pair.second) { + dpdk_ports.push_back(get_port(port_id)); + } + const size_t servq_depth = 32; // FIXME + UHD_LOG_TRACE("DPDK", + "Creating I/O service for lcore " + << lcore_id << ", servicing " << dpdk_ports.size() + << " ports, service queue depth " << servq_depth); + _io_srv_portid_map.insert( + {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth), + lcore_portids_pair.second}); + } } } @@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const return link.link_status; } -int dpdk_ctx::get_route(const std::string& addr) const +dpdk_port* dpdk_ctx::get_route(const std::string& addr) const { const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); for (const auto& port : _ports) { @@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const uint32_t src_ipv4 = port.second->get_ipv4(); uint32_t netmask = port.second->get_netmask(); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { - return (int)port.first; + return port.second.get(); } } - return -ENODEV; + return NULL; } @@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const return _init_done.load(); } +uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id) +{ + for (auto& io_srv_portid_pair : _io_srv_portid_map) { + if (uhd::has(io_srv_portid_pair.second, port_id)) { + return io_srv_portid_pair.first; + } + } + + std::string err_msg = std::string("Cannot look up I/O service for port ID: ") + + std::to_string(port_id) + ". No such port ID!"; + UHD_LOG_ERROR("DPDK", err_msg); + throw uhd::lookup_error(err_msg); +} + struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( unsigned int cpu_socket, size_t num_bufs) { @@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; char name[32]; snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); - _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( - name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name, + num_bufs, + _mbuf_cache_size, + DPDK_MBUF_PRIV_SIZE, + mbuf_size, + SOCKET_ID_ANY); if (!_rx_pktbuf_pools.at(cpu_socket)) { UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp new file mode 100644 index 000000000..1fcedca51 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp @@ -0,0 +1,950 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> +#include <uhdlib/utils/narrow.hpp> +#include <cmath> + +/* + * Memory management + * + * Every object that allocates and frees DPDK memory has a reference to the + * dpdk_ctx. + * + * Ownership hierarchy: + * + * dpdk_io_service_mgr (1) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * xport (1) => + * dpdk_send_io::sptr + * dpdk_recv_io::sptr + * + * usrp link_mgr (1) => + * udp_dpdk_link::sptr + * + * dpdk_send_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_recv_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_io_service (3) => + * dpdk_ctx::wptr (weak_ptr) + * udp_dpdk_link::sptr + * + * udp_dpdk_link (4) => + * dpdk_ctx::sptr + */ + +using namespace uhd::transport; + +dpdk_io_service::dpdk_io_service( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) + : _ctx(dpdk::dpdk_ctx::get()) + , _lcore_id(lcore_id) + , _ports(ports) + , _servq(servq_depth, lcore_id) +{ + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id); + for (auto port : _ports) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id()); + _tx_queues[port->get_port_id()] = std::list<dpdk_send_io*>(); + _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>(); + } + int status = rte_eal_remote_launch(_io_worker, this, lcore_id); + if (status) { + throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore"); + } +} + +dpdk_io_service::sptr dpdk_io_service::make( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +{ + return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth)); +} + +dpdk_io_service::~dpdk_io_service() +{ + UHD_LOG_TRACE( + "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id); + dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL); + if (!req) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "Could not allocate request for lcore termination for lcore " << _lcore_id); + return; + } + dpdk::wait_req_get(req); + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); +} + +void dpdk_io_service::attach_recv_link(recv_link_if::sptr link) +{ + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link.get()); + data.is_recv = true; + assert(data.link); + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.push_back(link); + } +} + +void dpdk_io_service::attach_send_link(send_link_if::sptr link) +{ + udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get()); + assert(dpdk_link); + + // First, fill in destination MAC address + struct dpdk::arp_request arp_data; + arp_data.tpa = dpdk_link->get_remote_ipv4(); + arp_data.port = dpdk_link->get_port()->get_port_id(); + if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) { + // If a broadcast IP, skip the ARP and fill with broadcast MAC addr + memset(arp_data.tha.addr_bytes, 0xFF, 6); + } else { + auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (!arp_req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request"); + } + if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) { + // Try one more time... + auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) { + wait_req_put(arp_req); + wait_req_put(arp_req2); + throw uhd::io_error("DPDK: Could not reach host"); + } + wait_req_put(arp_req2); + } + wait_req_put(arp_req); + } + dpdk_link->set_remote_mac(arp_data.tha); + + // Then, submit the link to the I/O service thread + struct dpdk_flow_data data; + data.link = dpdk_link; + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.push_back(link); + } +} + +void dpdk_io_service::detach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = true; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.remove_if( + [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +void dpdk_io_service::detach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.remove_if( + [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr /*fc_link*/, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(data_link.get()); + auto recv_io = std::make_shared<dpdk_recv_io>( + shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb); + + // Register with I/O service + recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return recv_io; +} + +send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr /*recv_link*/, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(send_link.get()); + auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(), + link, + num_send_frames, + send_cb, + num_recv_frames, + recv_cb, + fc_cb); + + // Register with I/O service + send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return send_io; +} + + +int dpdk_io_service::_io_worker(void* arg) +{ + if (!arg) + return -EINVAL; + dpdk_io_service* srv = (dpdk_io_service*)arg; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + if (srv->_ports.size() == 0) + return -ENODEV; + + char name[16]; + snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id); + rte_thread_setname(pthread_self(), name); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "I/O service thread '" << name << "' started on lcore " << lcore_id); + + uhd::set_thread_priority_safe(); + + snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id); + struct rte_hash_parameters hash_params = {.name = name, + .entries = MAX_FLOWS, + .reserved = 0, + .key_len = sizeof(struct dpdk::ipv4_5tuple), + .hash_func = NULL, + .hash_func_init_val = 0, + .socket_id = uhd::narrow_cast<int>(rte_socket_id()), + .extra_flag = 0}; + srv->_rx_table = rte_hash_create(&hash_params); + if (srv->_rx_table == NULL) { + return rte_errno; + } + + int status = 0; + while (!status) { + /* For each port, attempt to receive packets and process */ + for (auto port : srv->_ports) { + srv->_rx_burst(port, 0); + } + /* For each port's TX queues, do TX */ + for (auto port : srv->_ports) { + srv->_tx_burst(port); + } + /* For each port's RX release queues, release buffers */ + for (auto port : srv->_ports) { + srv->_rx_release(port); + } + /* Retry waking clients */ + if (srv->_retry_head) { + dpdk_io_if* node = srv->_retry_head; + dpdk_io_if* end = srv->_retry_head->prev; + while (true) { + dpdk_io_if* next = node->next; + srv->_wake_client(node); + if (node == end) { + break; + } else { + node = next; + next = node->next; + } + } + } + /* Check for open()/close()/term() requests and service 1 at a time + * Leave this last so we immediately terminate if requested + */ + status = srv->_service_requests(); + } + + return status; +} + +int dpdk_io_service::_service_requests() +{ + for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) { + /* Dequeue */ + dpdk::wait_req* req = _servq.pop(); + if (!req) { + break; + } + switch (req->reason) { + case dpdk::wait_type::WAIT_SIMPLE: + while (_servq.complete(req) == -ENOBUFS) + ; + break; + case dpdk::wait_type::WAIT_RX: + case dpdk::wait_type::WAIT_TX_BUF: + throw uhd::not_implemented_error( + "DPDK: _service_requests(): DPDK is still a WIP"); + case dpdk::wait_type::WAIT_FLOW_OPEN: + _service_flow_open(req); + break; + case dpdk::wait_type::WAIT_FLOW_CLOSE: + _service_flow_close(req); + break; + case dpdk::wait_type::WAIT_XPORT_CONNECT: + _service_xport_connect(req); + break; + case dpdk::wait_type::WAIT_XPORT_DISCONNECT: + _service_xport_disconnect(req); + break; + case dpdk::wait_type::WAIT_ARP: { + assert(req->data != NULL); + int arp_status = _service_arp_request(req); + assert(arp_status != -ENOMEM); + if (arp_status == 0) { + while (_servq.complete(req) == -ENOBUFS) + ; + } + break; + } + case dpdk::wait_type::WAIT_LCORE_TERM: + rte_free(_rx_table); + while (_servq.complete(req) == -ENOBUFS) + ; + // Return a positive value to indicate we should terminate + return 1; + default: + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Invalid reason associated with wait request"); + while (_servq.complete(req) == -ENOBUFS) + ; + break; + } + } + return 0; +} + +void dpdk_io_service::_service_flow_open(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, add to RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + // Check the UDP port isn't in use + if (rte_hash_lookup(_rx_table, &ht_key) > 0) { + req->retval = -EADDRINUSE; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add xport list for this UDP port + auto rx_entry = new std::list<dpdk_io_if*>(); + if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table"); + delete rx_entry; + req->retval = -ENOMEM; + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_flow_close(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, remove from RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + std::list<dpdk_io_if*>* xport_list; + + if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) { + UHD_ASSERT_THROW(xport_list->empty()); + delete xport_list; + rte_hash_del_key(_rx_table, &ht_key); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req) +{ + auto dpdk_io = static_cast<dpdk_io_if*>(req->data); + UHD_ASSERT_THROW(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Add to RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + req->retval = -ENOENT; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add to xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->push_back(dpdk_io); + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request..."); + // Add to xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request..."); + dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Add to xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.push_back(send_io); + for (size_t i = 0; i < send_io->_num_send_frames; i++) { + auto buff_ptr = + (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + break; + } + if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + break; + } + send_io->_num_frames_in_use++; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req) +{ + auto dpdk_io = (struct dpdk_io_if*)req->data; + assert(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Remove from RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) { + // Remove from xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->remove(dpdk_io); + } else { + req->retval = -EINVAL; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table"); + } + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request..."); + dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.remove(recv_client); + while (!rte_ring_empty(recv_client->_recv_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(recv_client->_release_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request..."); + dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.remove(send_client); + while (!rte_ring_empty(send_client->_send_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(send_client->_buffer_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + } + // Now remove the node if it's on the retry list + if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) { + _retry_head = NULL; + } else if (_retry_head) { + dpdk_io_if* node = _retry_head->next; + while (node != _retry_head) { + if (node == dpdk_io) { + dpdk_io->prev->next = dpdk_io->next; + dpdk_io->next->prev = dpdk_io->prev; + break; + } + node = node->next; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +int dpdk_io_service::_service_arp_request(dpdk::wait_req* req) +{ + int status = 0; + auto arp_req_data = (struct dpdk::arp_request*)req->data; + dpdk::ipv4_addr dst_addr = arp_req_data->tpa; + auto ctx_sptr = _ctx.lock(); + UHD_ASSERT_THROW(ctx_sptr); + dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr)); + + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dst_addr) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + status = -ENOMEM; + goto arp_end; + } + entry = new (entry) dpdk::arp_entry(); + entry->reqs.push_back(req); + port->_arp_table[dst_addr] = entry; + status = -EAGAIN; + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request."); + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + entry = port->_arp_table.at(dst_addr); + if (is_zero_ether_addr(&entry->mac_addr)) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Address in table, but not populated yet. Resending ARP request."); + port->_arp_table.at(dst_addr)->reqs.push_back(req); + status = -EAGAIN; + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table."); + ether_addr_copy(&entry->mac_addr, &arp_req_data->tha); + status = 0; + } + } +arp_end: + rte_spinlock_unlock(&port->_spinlock); + return status; +} + +int dpdk_io_service::_send_arp_request( + dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip) +{ + struct rte_mbuf* mbuf; + struct ether_hdr* hdr; + struct arp_hdr* arp_frame; + + mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool()); + if (unlikely(mbuf == NULL)) { + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + arp_frame = (struct arp_hdr*)&hdr[1]; + + memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + hdr->s_addr = port->get_mac_addr(); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST); + arp_frame->arp_data.arp_sha = port->get_mac_addr(); + arp_frame->arp_data.arp_sip = port->get_ipv4(); + memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); + arp_frame->arp_data.arp_tip = ip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + + if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full"); + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + +/* Do a burst of RX on port */ +int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue) +{ + struct ether_hdr* hdr; + char* l2_data; + struct rte_mbuf* bufs[RX_BURST_SIZE]; + const uint16_t num_rx = + rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + return 0; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*); + l2_data = (char*)&hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _process_arp(port, queue, (struct arp_hdr*)l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum"); + } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum"); + } else { + _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } + return num_rx; +} + +int dpdk_io_service::_process_arp( + dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame) +{ + uint32_t dest_ip = arp_frame->arp_data.arp_sip; + struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> " + << dpdk::eth_addr_to_string(dest_addr)); + /* Add entry to ARP table */ + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dest_ip) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + return -ENOMEM; + } + entry = new (entry) dpdk::arp_entry(); + ether_addr_copy(&dest_addr, &entry->mac_addr); + port->_arp_table[dest_ip] = entry; + } else { + entry = port->_arp_table.at(dest_ip); + ether_addr_copy(&dest_addr, &entry->mac_addr); + for (auto req : entry->reqs) { + auto arp_data = (struct dpdk::arp_request*)req->data; + ether_addr_copy(&dest_addr, &arp_data->tha); + while (_servq.complete(req) == -ENOBUFS) + ; + } + entry->reqs.clear(); + } + rte_spinlock_unlock(&port->_spinlock); + + /* Respond if this was an ARP request */ + if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) + && arp_frame->arp_data.arp_tip == port->get_ipv4()) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply."); + port->_arp_reply(queue_id, arp_frame); + } + + return 0; +} + +int dpdk_io_service::_process_ipv4( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt) +{ + bool bcast = port->dst_is_broadcast(pkt->dst_addr); + if (pkt->dst_addr != port->get_ipv4() && !bcast) { + rte_pktmbuf_free(mbuf); + return -ENODEV; + } + if (pkt->next_proto_id == IPPROTO_UDP) { + return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast); + } + rte_pktmbuf_free(mbuf); + return -EINVAL; +} + + +int dpdk_io_service::_process_udp( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/) +{ + // Get the link + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = pkt->dst_port}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Get xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + if (rx_entry->empty()) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Turn rte_mbuf -> dpdk_frame_buff + auto link = rx_entry->front()->link; + link->enqueue_recv_mbuf(mbuf); + auto buff = link->get_recv_buff(0); + bool rcvr_found = false; + for (auto client_if : *rx_entry) { + // Check all the muxed receivers... + if (client_if->recv_cb(buff, link, link)) { + rcvr_found = true; + if (buff) { + assert(client_if->is_recv); + auto recv_io = (dpdk_recv_io*)client_if->io_client; + auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); + if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue"); + } else { + recv_io->_num_frames_in_use++; + assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames); + _wake_client(client_if); + } + } + break; + } + } + if (!rcvr_found) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found"); + // Release the buffer if no receiver found + link->release_recv_buff(std::move(buff)); + return -ENOENT; + } + return 0; +} + +/* Do a burst of TX on port's tx queues */ +int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port) +{ + unsigned int total_tx = 0; + auto& queues = _tx_queues.at(port->get_port_id()); + + for (auto& send_io : queues) { + unsigned int num_tx = rte_ring_count(send_io->_send_queue); + num_tx = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE; + bool replaced_buffers = false; + for (unsigned int i = 0; i < num_tx; i++) { + size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size(); + if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) { + break; + } + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual"); + break; + } + send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link); + // Attempt to replace buffer + buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0) + .release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + send_io->_num_frames_in_use--; + } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + send_io->_num_frames_in_use--; + } else { + replaced_buffers = true; + } + } + if (replaced_buffers) { + _wake_client(&send_io->_dpdk_io_if); + } + total_tx += num_tx; + } + + return total_tx; +} + +int dpdk_io_service::_rx_release(dpdk::dpdk_port* port) +{ + unsigned int total_bufs = 0; + auto& queues = _recv_xport_map.at(port->get_port_id()); + + for (auto& recv_io : queues) { + unsigned int num_buf = rte_ring_count(recv_io->_release_queue); + num_buf = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE; + for (unsigned int i = 0; i < num_buf; i++) { + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual"); + break; + } + recv_io->_fc_cb(frame_buff::uptr(buff_ptr), + recv_io->_dpdk_io_if.link, + recv_io->_dpdk_io_if.link); + recv_io->_num_frames_in_use--; + } + total_bufs += num_buf; + } + + return total_bufs; +} + +uint16_t dpdk_io_service::_get_unique_client_id() +{ + std::lock_guard<std::mutex> lock(_mutex); + if (_client_id_set.size() >= MAX_CLIENTS) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients"); + throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients"); + } + + uint16_t id = _next_client_id++; + while (_client_id_set.count(id)) { + id = _next_client_id++; + } + _client_id_set.insert(id); + return id; +} + +void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io) +{ + dpdk::wait_req* req; + if (dpdk_io->is_recv) { + auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + req = recv_io->_waiter; + } else { + auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + req = send_io->_waiter; + } + bool stat = req->mutex.try_lock(); + if (stat) { + bool active_req = !req->complete; + if (dpdk_io->next) { + // On the list: Take it off + if (dpdk_io->next == dpdk_io) { + // Only node on the list + _retry_head = NULL; + } else { + // Other nodes are on the list + if (_retry_head == dpdk_io) { + // Move list head to next + _retry_head = dpdk_io->next; + } + dpdk_io->next->prev = dpdk_io->prev; + dpdk_io->prev->next = dpdk_io->next; + } + dpdk_io->next = NULL; + dpdk_io->prev = NULL; + } + if (active_req) { + req->complete = true; + req->cond.notify_one(); + } + req->mutex.unlock(); + if (active_req) { + wait_req_put(req); + } + } else { + // Put on the retry list, if it isn't already + if (!dpdk_io->next) { + if (_retry_head) { + dpdk_io->next = _retry_head; + dpdk_io->prev = _retry_head->prev; + _retry_head->prev->next = dpdk_io; + _retry_head->prev = dpdk_io; + } else { + _retry_head = dpdk_io; + dpdk_io->next = dpdk_io; + dpdk_io->prev = dpdk_io; + } + } + } +} |