diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 6 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 36 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/test/Makefile | 53 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/test/test.c | 303 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk.c | 363 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h | 253 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 401 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h | 32 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c | 306 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h | 15 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 456 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h | 30 |
12 files changed, 2252 insertions, 2 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 15771697a..7c79bc67c 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -130,8 +130,6 @@ IF(ENABLE_X300) ENDIF(ENABLE_X300) IF(ENABLE_LIBERIO) - MESSAGE(STATUS "") - MESSAGE(STATUS "liberio support enabled.") INCLUDE_DIRECTORIES(${LIBERIO_INCLUDE_DIRS}) LIBUHD_APPEND_LIBS(${LIBERIO_LIBRARIES}) LIBUHD_APPEND_SOURCES( @@ -139,6 +137,10 @@ IF(ENABLE_LIBERIO) ) ENDIF(ENABLE_LIBERIO) +IF(ENABLE_DPDK) + INCLUDE_SUBDIRECTORY(uhd-dpdk) +ENDIF(ENABLE_DPDK) + # Verbose Debug output for send/recv SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" ) OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" ) diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt new file mode 100644 index 000000000..be683aaf2 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -0,0 +1,36 @@ +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0 +# + +######################################################################## +# Add the subdirectories +######################################################################## +if(ENABLE_DPDK) + if(NOT DEFINED UHD_DPDK_CFLAGS) + message(STATUS "") + set(UHD_DPDK_CFLAGS "-march=native" + CACHE STRING "CFLAGS to use when building uhd-dpdk sources") + message(STATUS "DPDK: Using default UHD_DPDK_CFLAGS=" ${UHD_DPDK_CFLAGS}) + endif(NOT DEFINED UHD_DPDK_CFLAGS) + + include_directories(${CMAKE_CURRENT_SOURCE_DIR}) + + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c + ) + set_source_files_properties( + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c + PROPERTIES COMPILE_FLAGS ${UHD_DPDK_CFLAGS} + ) + include_directories(${DPDK_INCLUDE_DIR}) + LIBUHD_APPEND_LIBS(${DPDK_LIBRARIES}) +endif(ENABLE_DPDK) + diff --git a/host/lib/transport/uhd-dpdk/test/Makefile b/host/lib/transport/uhd-dpdk/test/Makefile new file mode 100644 index 000000000..9d6e60372 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/test/Makefile @@ -0,0 +1,53 @@ +# BSD LICENSE +# +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overridden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = test + +# all source are stored in SRCS-y +SRCS-y := test.c + +CFLAGS += -O0 -g +CFLAGS += $(WERROR_FLAGS) + +EXTRA_CFLAGS=-I${S}/../include +EXTRA_LDFLAGS=-L${O}/lib -luhd-dpdk + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/host/lib/transport/uhd-dpdk/test/test.c b/host/lib/transport/uhd-dpdk/test/test.c new file mode 100644 index 000000000..c324561de --- /dev/null +++ b/host/lib/transport/uhd-dpdk/test/test.c @@ -0,0 +1,303 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +/** + * Benchmark program to check performance of 2 simultaneous links + */ + + +#include <uhd-dpdk.h> +#include <stdio.h> +#include <stdbool.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <errno.h> +#include <arpa/inet.h> + +#define NUM_MBUFS 4095 +#define MBUF_CACHE_SIZE 315 +#define BURST_SIZE 64 + +#define NUM_PORTS 2 +#define TX_CREDITS 28 +#define RX_CREDITS 64 +#define BENCH_SPP 700 +#define BENCH_IFG 220 + + +static uint32_t last_seqno[NUM_PORTS]; +static uint32_t dropped_packets[NUM_PORTS]; +static uint32_t lasts[NUM_PORTS][16], drops[NUM_PORTS][16]; +static uint32_t last_ackno[NUM_PORTS]; +static uint32_t tx_seqno[NUM_PORTS]; +static uint64_t tx_xfer[NUM_PORTS]; + +static void process_udp(int id, uint32_t *udp_data) +{ + if (udp_data[0] != last_seqno[id] + 1) { + lasts[id][dropped_packets[id] & 0xf] = last_seqno[id]; + drops[id][dropped_packets[id] & 0xf] = udp_data[0]; + dropped_packets[id]++; + } + + last_seqno[id] = udp_data[0]; + last_ackno[id] = udp_data[1]; +} + +static void send_ctrl(struct uhd_dpdk_socket *sock, uint32_t run) +{ + struct rte_mbuf *mbuf = NULL; + uhd_dpdk_request_tx_bufs(sock, &mbuf, 1); + if (unlikely(mbuf == NULL)) + return; + uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf); + tx_data[0] = (BENCH_SPP << 16) | (TX_CREDITS << 4) | run; // spp, tx_credits, run + tx_data[1] = (RX_CREDITS << 16) | (BENCH_IFG); // credits, ifg + mbuf->pkt_len = 8; + mbuf->data_len = 8; + uhd_dpdk_send(sock, &mbuf, 1); +} + +static void send_udp(struct uhd_dpdk_socket *sock, int id, bool fc_only) +{ + struct rte_mbuf *mbuf = NULL; + uhd_dpdk_request_tx_bufs(sock, &mbuf, 1); + if (unlikely(mbuf == NULL)) + return; + uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf); + tx_data[0] = fc_only ? tx_seqno[id] - 1 : tx_seqno[id]; + tx_data[1] = last_seqno[id]; + if (!fc_only) { + memset(&tx_data[2], last_seqno[id], 8*BENCH_SPP); + tx_xfer[id] += 8*BENCH_SPP; + } + mbuf->pkt_len = 8 + (fc_only ? 0 : 8*BENCH_SPP); + mbuf->data_len = 8 + (fc_only ? 0 : 8*BENCH_SPP); + + uhd_dpdk_send(sock, &mbuf, 1); + + if (!fc_only) { + tx_seqno[id]++; + } +} + +static void bench(struct uhd_dpdk_socket **tx, struct uhd_dpdk_socket **rx, struct uhd_dpdk_socket **ctrl, uint32_t nb_ports) +{ + uint64_t total_xfer[NUM_PORTS]; + uint32_t id; + for (id = 0; id < nb_ports; id++) { + tx_seqno[id] = 1; + tx_xfer[id] = 0; + last_ackno[id] = 0; + last_seqno[id] = 0; + dropped_packets[id] = 0; + total_xfer[id] = 0; + } + sleep(1); + struct timeval bench_start, bench_end; + gettimeofday(&bench_start, NULL); + for (id = 0; id < nb_ports; id++) { + send_ctrl(ctrl[id], 0); + for (int pktno = 0; (pktno < TX_CREDITS*3/4); pktno++) { + send_udp(tx[id], id, false); + } + } + for (id = 0; id < nb_ports; id++) { + send_ctrl(ctrl[id], 1); + } + /* + * The test... + */ + uint64_t total_received = 0; + uint32_t consec_no_rx = 0; + while (total_received < 1000000 ) { //&& consec_no_rx < 10000) { + for (id = 0; id < nb_ports; id++) { + + /* Get burst of RX packets, from first port of pair. */ + struct rte_mbuf *bufs[BURST_SIZE]; + const int64_t nb_rx = uhd_dpdk_recv(rx[id], bufs, BURST_SIZE); + + if (unlikely(nb_rx <= 0)) { + consec_no_rx++; + if (consec_no_rx >= 100000) { + uint32_t skt_drops = 0; + uhd_dpdk_get_drop_count(rx[id], &skt_drops); + printf("TX seq %d, TX ack %d, RX seq %d, %d, drops!\n", tx_seqno[id], last_ackno[id], last_seqno[id], skt_drops); + consec_no_rx = 0; + break; + } + continue; + } else { + consec_no_rx = 0; + } + + for (int buf = 0; buf < nb_rx; buf++) { + total_xfer[id] += bufs[buf]->pkt_len; + uint64_t ol_flags = bufs[buf]->ol_flags; + uint32_t *data = (uint32_t *) uhd_dpdk_buf_to_data(rx[id], bufs[buf]); + if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* FIXME: Deprecated/changed in later release */ + printf("Buf %d: Bad IP cksum\n", buf); + } else { + process_udp(id, data); + } + } + + /* Free buffers. */ + for (int buf = 0; buf < nb_rx; buf++) + uhd_dpdk_free_buf(bufs[buf]); + total_received += nb_rx; + } + + for (id = 0; id < nb_ports; id++) { + /* TX portion */ + uint32_t window_end = last_ackno[id] + TX_CREDITS; + //uint32_t window_end = last_seqno[port] + TX_CREDITS; + if (window_end <= tx_seqno[id]) { + if (consec_no_rx == 9999) { + send_udp(tx[id], id, true); + } + //send_udp(tx[id], id, true); + ; + } else { + for (int pktno = 0; (pktno < BURST_SIZE) && (tx_seqno[id] < window_end); pktno++) { + send_udp(tx[id], id, false); + } + } + } + } + for (id = 0; id < nb_ports; id++) { + send_ctrl(ctrl[id], 0); + } + gettimeofday(&bench_end, NULL); + printf("Benchmark complete\n\n"); + + for (id = 0; id < nb_ports; id++) { + printf("\n"); + printf("Bytes received = %ld\n", total_xfer[id]); + printf("Bytes sent = %ld\n", tx_xfer[id]); + printf("Time taken = %ld us\n", (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec)); + double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec); + elapsed_time *= 1.0e-6; + double elapsed_bytes = total_xfer[id]; + printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); + elapsed_bytes = tx_xfer[id]; + printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); + uint32_t skt_drops = 0; + uhd_dpdk_get_drop_count(rx[id], &skt_drops); + printf("Dropped %d packets\n", dropped_packets[id]); + printf("Socket reports dropped %d packets\n", skt_drops); + for (unsigned int i = 0; i < 16; i++) { + if (i >= dropped_packets[id]) + break; + printf("Last(%u), Recv(%u)\n", lasts[id][i], drops[id][i]); + } + //printf("%d missed/dropped packets\n", errors); + printf("\n"); + } + +} + +int main(int argc, char **argv) +{ + int port_thread_mapping[2] = {1, 1}; + int status = uhd_dpdk_init(argc, argv, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE); + if (status) { + printf("%d: Something wrong?\n", status); + return status; + } + + uint32_t eth_ip = htonl(0xc0a80008); + uint32_t eth_mask = htonl(0xffffff00); + status = uhd_dpdk_set_ipv4_addr(0, eth_ip, eth_mask); + if (status) { + printf("Error while setting IP0: %d\n", status); + return status; + } + status = uhd_dpdk_set_ipv4_addr(1, eth_ip, eth_mask); + if (status) { + printf("Error while setting IP1: %d\n", status); + return status; + } + + struct uhd_dpdk_socket *eth_rx[2]; + struct uhd_dpdk_socket *eth_tx[2]; + struct uhd_dpdk_socket *eth_ctrl[2]; + struct uhd_dpdk_sockarg_udp sockarg = { + .is_tx = false, + .local_port = htons(0xBEE7), + .remote_port = htons(0xBEE7), + .dst_addr = htonl(0xc0a80004) + }; + eth_rx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg); + if (!eth_rx[0]) { + printf("!eth0_rx\n"); + return -ENODEV; + } + eth_rx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg); + if (!eth_rx[1]) { + printf("!eth1_rx\n"); + return -ENODEV; + } + + sockarg.is_tx = true; + eth_tx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg); + if (!eth_tx[0]) { + printf("!eth0_tx\n"); + return -ENODEV; + } + eth_tx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg); + if (!eth_tx[1]) { + printf("!eth1_tx\n"); + return -ENODEV; + } + + sockarg.local_port = htons(0xB4D); + sockarg.remote_port = htons(0xB4D); + eth_ctrl[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg); + if (!eth_ctrl[0]) { + printf("!eth0_ctrl\n"); + return -ENODEV; + } + eth_ctrl[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg); + if (!eth_ctrl[1]) { + printf("!eth1_ctrl\n"); + return -ENODEV; + } + + bench(eth_tx, eth_rx, eth_ctrl, 2); + + status = uhd_dpdk_sock_close(eth_rx[0]); + if (status) { + printf("Bad close RX0 %d\n", status); + return status; + } + status = uhd_dpdk_sock_close(eth_rx[1]); + if (status) { + printf("Bad close RX1 %d\n", status); + return status; + } + status = uhd_dpdk_sock_close(eth_tx[0]); + if (status) { + printf("Bad close TX0 %d\n", status); + return status; + } + status = uhd_dpdk_sock_close(eth_tx[1]); + if (status) { + printf("Bad close TX1 %d\n", status); + return status; + } + status = uhd_dpdk_sock_close(eth_ctrl[0]); + if (status) { + printf("Bad close Ctrl0 %d\n", status); + return status; + } + status = uhd_dpdk_sock_close(eth_ctrl[1]); + if (status) { + printf("Bad close Ctrl1 %d\n", status); + return status; + } + return status; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c new file mode 100644 index 000000000..2ee74a201 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -0,0 +1,363 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_ctx.h" +#include "uhd_dpdk_udp.h" +#include "uhd_dpdk_driver.h" +#include <stdlib.h> +#include <rte_errno.h> +#include <rte_malloc.h> +#include <rte_log.h> + +/* FIXME: Replace with configurable values */ +#define DEFAULT_RING_SIZE 512 + +/* FIXME: This needs to be protected */ +struct uhd_dpdk_ctx *ctx = NULL; + +/** + * TODO: Probably should provide way to get access to thread for a given port + * UHD's first calling thread will be the master thread + * In UHD, maybe check thread, and if it is different, pass work to that thread and optionally wait() on it (some condition variable) + */ + +/* TODO: For nice scheduling options later, make sure to separate RX and TX activity */ + + +int uhd_dpdk_port_count(void) +{ + if (!ctx) + return -ENODEV; + return ctx->num_ports; +} + +struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) +{ + struct eth_addr retval; + memset(retval.addr, 0xff, ETHER_ADDR_LEN); + + struct uhd_dpdk_port *p = find_port(portid); + if (p) { + memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN); + } + return retval; +} + +int uhd_dpdk_get_ipv4_addr(unsigned int portid, uint32_t *ipv4_addr, uint32_t *netmask) +{ + if (!ipv4_addr) + return -EINVAL; + struct uhd_dpdk_port *p = find_port(portid); + if (p) { + *ipv4_addr = p->ipv4_addr; + if (netmask) { + *netmask = p->netmask; + } + return 0; + } + return -ENODEV; +} + +int uhd_dpdk_set_ipv4_addr(unsigned int portid, uint32_t ipv4_addr, uint32_t netmask) +{ + struct uhd_dpdk_port *p = find_port(portid); + if (p) { + p->ipv4_addr = ipv4_addr; + p->netmask = netmask; + return 0; + } + return -ENODEV; +} + +/* + * Initialize a given port using default settings and with the RX buffers + * coming from the mbuf_pool passed as a parameter. + * FIXME: Starting with assumption of one thread/core per port + */ +static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port, + struct rte_mempool *rx_mbuf_pool, + unsigned int mtu) +{ + int retval; + + /* Check for a valid port */ + if (port->id >= rte_eth_dev_count()) + return -ENODEV; + + /* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */ + /* FIXME: Check if hw_ip_checksum is possible */ + struct rte_eth_conf port_conf = { + .rxmode = { + .max_rx_pkt_len = mtu, + .jumbo_frame = 1, + .hw_ip_checksum = 1, + } + }; + retval = rte_eth_dev_configure(port->id, 1, 1, &port_conf); + if (retval != 0) + return retval; + + retval = rte_eth_rx_queue_setup(port->id, 0, DEFAULT_RING_SIZE, + rte_eth_dev_socket_id(port->id), NULL, rx_mbuf_pool); + if (retval < 0) + return retval; + + retval = rte_eth_tx_queue_setup(port->id, 0, DEFAULT_RING_SIZE, + rte_eth_dev_socket_id(port->id), NULL); + if (retval < 0) + goto port_init_fail; + + /* Create the hash table for the RX sockets */ + char name[32]; + snprintf(name, sizeof(name), "rx_table_%u", port->id); + struct rte_hash_parameters hash_params = { + .name = name, + .entries = UHD_DPDK_MAX_SOCKET_CNT, + .key_len = sizeof(struct uhd_dpdk_ipv4_5tuple), + .hash_func = NULL, + .hash_func_init_val = 0, + }; + port->rx_table = rte_hash_create(&hash_params); + if (port->rx_table == NULL) { + retval = rte_errno; + goto port_init_fail; + } + + /* Create ARP table */ + snprintf(name, sizeof(name), "arp_table_%u", port->id); + hash_params.name = name; + hash_params.entries = UHD_DPDK_MAX_SOCKET_CNT; + hash_params.key_len = sizeof(uint32_t); + hash_params.hash_func = NULL; + hash_params.hash_func_init_val = 0; + port->arp_table = rte_hash_create(&hash_params); + if (port->arp_table == NULL) { + retval = rte_errno; + goto free_rx_table; + } + + /* Set up list for TX queues */ + LIST_INIT(&port->txq_list); + + /* Start the Ethernet port. */ + retval = rte_eth_dev_start(port->id); + if (retval < 0) { + goto free_arp_table; + } + + /* Display the port MAC address. */ + rte_eth_macaddr_get(port->id, &port->mac_addr); + RTE_LOG(INFO, EAL, "Port %u MAC: %02x %02x %02x %02x %02x %02x\n", + (unsigned)port->id, + port->mac_addr.addr_bytes[0], port->mac_addr.addr_bytes[1], + port->mac_addr.addr_bytes[2], port->mac_addr.addr_bytes[3], + port->mac_addr.addr_bytes[4], port->mac_addr.addr_bytes[5]); + + struct rte_eth_link link; + rte_eth_link_get(port->id, &link); + RTE_LOG(INFO, EAL, "Port %u UP: %d\n", port->id, link.link_status); + + return 0; + +free_arp_table: + rte_hash_free(port->arp_table); +free_rx_table: + rte_hash_free(port->rx_table); +port_init_fail: + return rte_errno; +} + +static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id) +{ + if (!ctx || !thread) + return -EINVAL; + + unsigned int socket_id = rte_lcore_to_socket_id(id); + thread->id = id; + thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id]; + thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id]; + LIST_INIT(&thread->port_list); + + char name[32]; + snprintf(name, sizeof(name), "sockreq_ring_%u", id); + thread->sock_req_ring = rte_ring_create( + name, + UHD_DPDK_MAX_PENDING_SOCK_REQS, + socket_id, + RING_F_SC_DEQ + ); + if (!thread->sock_req_ring) + return -ENOMEM; + return 0; +} + + +int uhd_dpdk_init(int argc, char **argv, unsigned int num_ports, + int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, + int mtu) +{ + /* Init context only once */ + if (ctx) + return 1; + + if ((num_ports == 0) || (port_thread_mapping == NULL)) { + return -EINVAL; + } + + /* Grabs arguments intended for DPDK's EAL */ + int ret = rte_eal_init(argc, argv); + if (ret < 0) + rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); + + ctx = (struct uhd_dpdk_ctx *) rte_zmalloc("uhd_dpdk_ctx", sizeof(*ctx), rte_socket_id()); + if (!ctx) + return -ENOMEM; + + ctx->num_threads = rte_lcore_count(); + if (ctx->num_threads <= 1) + rte_exit(EXIT_FAILURE, "Error: No worker threads enabled\n"); + + /* Check that we have ports to send/receive on */ + ctx->num_ports = rte_eth_dev_count(); + if (ctx->num_ports < 1) + rte_exit(EXIT_FAILURE, "Error: Found no ports\n"); + if (ctx->num_ports < num_ports) + rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); + + /* Get memory for thread and port data structures */ + ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0); + if (!ctx->threads) + rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for thread data\n"); + ctx->ports = rte_zmalloc("uhd_dpdk_port", ctx->num_ports*sizeof(struct uhd_dpdk_port), 0); + if (!ctx->ports) + rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n"); + + /* Initialize the thread data structures */ + for (int i = rte_get_next_lcore(-1, 1, 0); + (i < RTE_MAX_LCORE); + i = rte_get_next_lcore(i, 1, 0)) + { + /* Do one mempool of RX/TX per socket */ + unsigned int socket_id = rte_lcore_to_socket_id(i); + /* FIXME Probably want to take into account actual number of ports per socket */ + if (ctx->tx_pktbuf_pools[socket_id] == NULL) { + /* Creates a new mempool in memory to hold the mbufs. + * This is done for each CPU socket + */ + const int mbuf_size = mtu + 2048 + RTE_PKTMBUF_HEADROOM; + char name[32]; + snprintf(name, sizeof(name), "rx_mbuf_pool_%u", socket_id); + ctx->rx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create( + name, + ctx->num_ports*num_mbufs, + mbuf_cache_size, + 0, + mbuf_size, + socket_id + ); + snprintf(name, sizeof(name), "tx_mbuf_pool_%u", socket_id); + ctx->tx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create( + name, + ctx->num_ports*num_mbufs, + mbuf_cache_size, + 0, + mbuf_size, + socket_id + ); + if ((ctx->rx_pktbuf_pools[socket_id]== NULL) || + (ctx->tx_pktbuf_pools[socket_id]== NULL)) + rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); + } + + if (uhd_dpdk_thread_init(&ctx->threads[i], i) < 0) + rte_exit(EXIT_FAILURE, "Error initializing thread %i\n", i); + } + + unsigned master_lcore = rte_get_master_lcore(); + + /* Assign ports to threads and initialize the port data structures */ + for (unsigned int i = 0; i < num_ports; i++) { + int thread_id = port_thread_mapping[i]; + if (thread_id < 0) + continue; + if (((unsigned int) thread_id) == master_lcore) + RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i); + if (ctx->threads[thread_id].id != (unsigned int) thread_id) + rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); + + struct uhd_dpdk_port *port = &ctx->ports[i]; + port->id = i; + port->parent = &ctx->threads[thread_id]; + ctx->threads[thread_id].num_ports++; + LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry); + + /* Initialize port. */ + if (uhd_dpdk_port_init(port, port->parent->rx_pktbuf_pool, mtu) != 0) + rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", + i); + } + + RTE_LOG(INFO, EAL, "Init DONE!\n"); + + /* FIXME: Create functions to do this */ + RTE_LOG(INFO, EAL, "Starting I/O threads!\n"); + + for (int i = rte_get_next_lcore(-1, 1, 0); + (i < RTE_MAX_LCORE); + i = rte_get_next_lcore(i, 1, 0)) + { + struct uhd_dpdk_thread *t = &ctx->threads[i]; + if (!LIST_EMPTY(&t->port_list)) { + rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id); + } + } + return 0; +} + +/* FIXME: This will be changed once we have functions to handle the threads */ +int uhd_dpdk_destroy(void) +{ + if (!ctx) + return -ENODEV; + + struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); + if (!req) + return -ENOMEM; + + req->req_type = UHD_DPDK_LCORE_TERM; + + for (int i = rte_get_next_lcore(-1, 1, 0); + (i < RTE_MAX_LCORE); + i = rte_get_next_lcore(i, 1, 0)) + { + struct uhd_dpdk_thread *t = &ctx->threads[i]; + + if (LIST_EMPTY(&t->port_list)) + continue; + + if (rte_eal_get_lcore_state(t->id) == FINISHED) + continue; + + pthread_mutex_init(&req->mutex, NULL); + pthread_cond_init(&req->cond, NULL); + pthread_mutex_lock(&req->mutex); + if (rte_ring_enqueue(t->sock_req_ring, req)) { + pthread_mutex_unlock(&req->mutex); + RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i); + rte_free(req); + return -ENOSPC; + } + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0 + }; + pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); + pthread_mutex_unlock(&req->mutex); + } + + rte_free(req); + return 0; +} + diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h new file mode 100644 index 000000000..31c9dba0c --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h @@ -0,0 +1,253 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_CTX_H_ +#define _UHD_DPDK_CTX_H_ + +#include <stdint.h> +#include <sys/queue.h> +#include <sys/types.h> +#include <rte_ethdev.h> +#include <rte_mbuf.h> +#include <rte_hash.h> +#include <rte_eal.h> +#include <uhd/transport/uhd-dpdk.h> +//#include <pthread.h> + +/* For nice scheduling options later, make sure to separate RX and TX activity */ + +#define UHD_DPDK_MAX_SOCKET_CNT 1024 +#define UHD_DPDK_MAX_PENDING_SOCK_REQS 16 +#define UHD_DPDK_TXQ_SIZE 64 +#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1) +#define UHD_DPDK_RXQ_SIZE 64 +#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1) + +struct uhd_dpdk_port; + +/** + * + * All memory allocation for port, rx_ring, and tx_ring owned by I/O thread + * Rest owned by user thread + * + * port: port servicing this socket + * tid: thread ID that owns this socket (to be associated with TX queue) + * sock_type: Type of socket + * priv: Private data, based on sock_type + * rx_ring: pointer to individual rx_ring (created during init--Also used as free buffer ring for TX) + * tx_ring: pointer to shared tx_ring (with all sockets for this tid) + * tx_buf_count: Number of buffers currently outside the rings + * tx_entry: List node for TX Queue tracking + * + * If a user closes a socket without outstanding TX buffers, user must free the + * buffers. Otherwise, that memory will be leaked, and usage will grow. + */ +struct uhd_dpdk_socket { + struct uhd_dpdk_port *port; + pid_t tid; + enum uhd_dpdk_sock_type sock_type; + void *priv; + struct rte_ring *rx_ring; + struct rte_ring *tx_ring; + int tx_buf_count; + LIST_ENTRY(uhd_dpdk_socket) tx_entry; +}; +LIST_HEAD(uhd_dpdk_tx_head, uhd_dpdk_socket); + +/************************************************ + * Configuration + ************************************************/ +enum uhd_dpdk_sock_req { + UHD_DPDK_SOCK_OPEN = 0, + UHD_DPDK_SOCK_CLOSE, + UHD_DPDK_LCORE_TERM, + UHD_DPDK_SOCK_REQ_COUNT +}; + +/** + * port: port associated with this request + * sock: socket associated with this request + * req_type: Open, Close, or terminate lcore + * sock_type: Only udp is supported + * cond: Used to sleep until socket creation is finished + * mutex: associated with cond + * entry: List node for requests pending ARP responses + * priv: private data + * retval: Result of call (needed post-wakeup) + */ +struct uhd_dpdk_config_req { + struct uhd_dpdk_port *port; + struct uhd_dpdk_socket *sock; + enum uhd_dpdk_sock_req req_type; + enum uhd_dpdk_sock_type sock_type; + pthread_cond_t cond; + pthread_mutex_t mutex; + LIST_ENTRY(uhd_dpdk_config_req) entry; + void *priv; + int retval; +}; +LIST_HEAD(uhd_dpdk_config_head, uhd_dpdk_config_req); + +/************************************************ + * RX Table + ************************************************/ +struct uhd_dpdk_arp_entry { + struct ether_addr mac_addr; + struct uhd_dpdk_config_head pending_list; /* Config reqs pending ARP--Thread-unsafe */ +}; + +struct uhd_dpdk_ipv4_5tuple { + enum uhd_dpdk_sock_type sock_type; + uint32_t src_ip; + uint32_t dst_ip; + uint16_t src_port; + uint16_t dst_port; +}; + +/************************************************ + * TX Queues + * + * 1 TX Queue per thread sending through a hardware port + * All memory allocation owned by I/O thread + * + * tid: thread id + * queue: TX queue holding threads prepared packets (via send()) + * retry_queue: queue holding packets that couldn't be sent + * freebufs: queue holding empty buffers + * tx_list: list of sockets using this queue + * entry: list node for port to track TX queues + * + * queue, retry_queue, and freebufs are single-producer, single-consumer queues + * retry_queue wholly-owned by I/O thread + * For queue, user thread is producer, I/O thread is consumer + * For freebufs, user thread is consumer, I/O thread is consumer + * + * All queues are same size, and they are shared between all sockets on one + * thread (tid is the identifier) + * 1. Buffers start in freebufs (user gets buffers from freebufs) + * 2. User submits packet to queue + * 3. If packet couldn't be sent, it is (re)enqueued on retry_queue + ************************************************/ +struct uhd_dpdk_tx_queue { + pid_t tid; + struct rte_ring *queue; + struct rte_ring *retry_queue; + struct rte_ring *freebufs; + struct uhd_dpdk_tx_head tx_list; + LIST_ENTRY(uhd_dpdk_tx_queue) entry; +}; +LIST_HEAD(uhd_dpdk_txq_head, uhd_dpdk_tx_queue); + +/************************************************ + * Port structure + * + * All memory allocation owned by I/O thread + * + * id: hardware port id (for DPDK) + * parent: I/O thread servicing this port + * mac_addr: MAC address of this port + * ipv4_addr: IPv4 address of this port + * netmask: Subnet mask of this port + * arp_table: ARP cache for this port + * rx_table: Mapping of 5-tuple key to sockets for RX + * txq_list: List of TX queues associated with this port + * port_entry: List node entry for I/O thread to track + ************************************************/ +struct uhd_dpdk_port { + unsigned int id; + struct uhd_dpdk_thread *parent; + struct ether_addr mac_addr; + uint32_t ipv4_addr; /* FIXME: Check this before allowing a socket!!! */ + uint32_t netmask; + /* Key = IP addr + * Value = MAC addr (ptr to uhd_dpdk_arp_entry) + */ + struct rte_hash *arp_table; + /* hash map of RX sockets + * Key = uhd_dpdk_ipv4_5tuple + * Value = uhd_dpdk_socket + */ + struct rte_hash *rx_table; + /* doubly-linked list of TX sockets */ + struct uhd_dpdk_txq_head txq_list; + LIST_ENTRY(uhd_dpdk_port) port_entry; +}; + +LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port); + +/************************************************ + * Thread/lcore-private data structure + * + * All data owned by global context + * + * id: lcore id (from DPDK) + * rx_pktbuf_pool: memory pool for generating buffers for RX packets + * tx_pktbuf_pool: memory pool for generating buffers for TX packets + * num_ports: Number of ports this lcore is servicing + * port_list: List of ports this lcore is servicing + * sock_req_ring: Queue for user threads to submit service requests to the lcore + * + * sock_req_ring is a multi-producer, single-consumer queue + * + * For threads that have ports: + * Launch individually + * For threads without ports: + * Do not launch unless user specifically does it themselves. + * Should also have master lcore returned to user + * REMEMBER: Without args, DPDK creates an lcore for each CPU core! + */ +struct uhd_dpdk_thread { + unsigned int id; + struct rte_mempool *rx_pktbuf_pool; + struct rte_mempool *tx_pktbuf_pool; + int num_ports; + struct uhd_dpdk_port_head port_list; + struct rte_ring *sock_req_ring; +}; + + +/************************************************ + * One global context + * + * num_threads: Number of DPDK lcores tracked + * num_ports: Number of DPDK/NIC ports tracked + * threads: Array of all lcores/threads + * ports: Array of all DPDK/NIC ports + * rx_pktbuf_pools: Array of all packet buffer pools for RX + * tx_pktbuf_pools: Array of all packet buffer pools for TX + * + * The packet buffer pools are memory pools that are associated with a CPU + * socket. They will provide storage close to the socket to accommodate NUMA + * nodes. + ************************************************/ +struct uhd_dpdk_ctx { + unsigned int num_threads; + unsigned int num_ports; + struct uhd_dpdk_thread *threads; + struct uhd_dpdk_port *ports; + struct rte_mempool *rx_pktbuf_pools[RTE_MAX_NUMA_NODES]; + struct rte_mempool *tx_pktbuf_pools[RTE_MAX_NUMA_NODES]; +}; + +extern struct uhd_dpdk_ctx *ctx; + +static inline struct uhd_dpdk_port * find_port(unsigned int portid) +{ + if (!ctx) + return NULL; + + for (unsigned int i = 0; i < ctx->num_threads; i++) { + struct uhd_dpdk_thread *t = &ctx->threads[i]; + struct uhd_dpdk_port *p; + LIST_FOREACH(p, &t->port_list, port_entry) { + if (p->id == portid) { + return p; + } + } + } + return NULL; +} + +#endif /* _UHD_DPDK_CTX_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c new file mode 100644 index 000000000..0af9cc4e5 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -0,0 +1,401 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_driver.h" +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include <rte_malloc.h> +#include <rte_mempool.h> +#include <arpa/inet.h> +#include <unistd.h> + +int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame) +{ + uint32_t dest_ip = arp_frame->arp_data.arp_sip; + struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + + struct uhd_dpdk_arp_entry *entry = NULL; + rte_hash_lookup_data(port->arp_table, &dest_ip, (void **) &entry); + if (!entry) { + entry = rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + return -ENOMEM; + } + LIST_INIT(&entry->pending_list); + ether_addr_copy(&dest_addr, &entry->mac_addr); + if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { + rte_free(entry); + return -ENOSPC; + } + } else { + struct uhd_dpdk_config_req *req = NULL; + ether_addr_copy(&dest_addr, &entry->mac_addr); + /* Now wake any config reqs waiting for the ARP */ + LIST_FOREACH(req, &entry->pending_list, entry) { + _uhd_dpdk_config_req_compl(req, 0); + } + } + + return 0; +} + +int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) +{ + struct rte_mbuf *mbuf; + struct ether_hdr *hdr; + struct arp_hdr *arp_frame; + + mbuf = rte_pktmbuf_alloc(port->parent->tx_pktbuf_pool); + if (unlikely(mbuf == NULL)) { + RTE_LOG(WARNING, MEMPOOL, "Could not allocate packet buffer for ARP request\n"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + arp_frame = (struct arp_hdr *) &hdr[1]; + + memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + ether_addr_copy(&port->mac_addr, &hdr->s_addr); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST); + ether_addr_copy(&port->mac_addr, &arp_frame->arp_data.arp_sha); + arp_frame->arp_data.arp_sip = port->ipv4_addr; + memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); + arp_frame->arp_data.arp_tip = ip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + mbuf->ol_flags = PKT_TX_IP_CKSUM; + + if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { + RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + + +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt) +{ + int status = 0; + struct uhd_dpdk_ipv4_5tuple ht_key = { + .sock_type = UHD_DPDK_SOCK_UDP, + .src_ip = 0, + .src_port = 0, + .dst_ip = 0, + .dst_port = pkt->dst_port + }; + + struct uhd_dpdk_socket *sock = NULL; + rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock); + if (!sock) { + status = -ENODEV; + goto udp_rx_drop; + } + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + status = rte_ring_enqueue(sock->rx_ring, mbuf); + if (status) { + pdata->dropped_pkts++; + goto udp_rx_drop; + } + return 0; + +udp_rx_drop: + rte_pktmbuf_free(mbuf); + return status; +} + +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt) +{ + if (pkt->dst_addr != port->ipv4_addr) { + rte_pktmbuf_free(mbuf); + return -ENODEV; + } + if (pkt->next_proto_id == 0x11) { + return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]); + } + rte_pktmbuf_free(mbuf); + return -EINVAL; +} + +static int _uhd_dpdk_fill_ipv4_addr(struct uhd_dpdk_port *port, + struct rte_mbuf *mbuf) +{ + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + if (is_broadcast(port, ip_hdr->dst_addr)) { + memset(eth_hdr->d_addr.addr_bytes, 0xff, ETHER_ADDR_LEN); + } else { + /* Lookup dest_addr */ + struct uhd_dpdk_arp_entry *entry = NULL; + rte_hash_lookup_data(port->arp_table, &ip_hdr->dst_addr, (void **) &entry); + if (!entry) { + RTE_LOG(ERR, USER1, "TX packet on port %d to addr 0x%08x has no ARP entry\n", port->id, ip_hdr->dst_addr); + return -ENODEV; + } + + ether_addr_copy(&entry->mac_addr, ð_hdr->d_addr); + } + return 0; +} + +static int _uhd_dpdk_send(struct uhd_dpdk_port *port, + struct uhd_dpdk_tx_queue *txq, + struct rte_ring *q) +{ + struct rte_mbuf *buf; + + unsigned int num_tx = rte_ring_count(q); + num_tx = (num_tx < UHD_DPDK_TX_BURST_SIZE) ? num_tx : UHD_DPDK_TX_BURST_SIZE; + for (unsigned int i = 0; i < num_tx; i++) { + int status = rte_ring_dequeue(q, (void **) &buf); + if (status) { + RTE_LOG(ERR, USER1, "%s: Q Count doesn't match actual\n", __func__); + break; + } + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *); + if (eth_hdr->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv4)) { + status = _uhd_dpdk_fill_ipv4_addr(port, buf); + if (status) { + return status; + } + } + status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */ + if (status != 1) { + status = rte_ring_enqueue(txq->retry_queue, buf); + if (status) + RTE_LOG(WARNING, USER1, "%s: Could not re-enqueue pkt %d\n", __func__, i); + num_tx = i; + rte_pktmbuf_free(buf); + break; + } + } + + return num_tx; +} + +static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port, + struct uhd_dpdk_tx_queue *q, + unsigned int num_bufs) +{ + /* Allocate more buffers to replace the sent ones */ + struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; + int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, num_bufs); + if (status) { + RTE_LOG(ERR, USER1, "%d %s: Could not restore %u pktmbufs in bulk!\n", status, __func__, num_bufs); + } + + /* Enqueue the buffers for the user thread to retrieve */ + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n"); + return status; + } + + return num_bufs; +} + +static inline void _uhd_dpdk_disable_ports(struct uhd_dpdk_thread *t) +{ + struct uhd_dpdk_port *port = NULL; + LIST_FOREACH(port, &t->port_list, port_entry) { + rte_eth_dev_stop(port->id); + } +} + +static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) +{ + /* Close sockets upon request, but reply to other service requests with + * errors + */ + struct uhd_dpdk_config_req *sock_req; + if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req)) { + switch (sock_req->req_type) { + case UHD_DPDK_SOCK_CLOSE: + _uhd_dpdk_sock_release(sock_req); + break; + default: + _uhd_dpdk_config_req_compl(sock_req, -ENODEV); + break; + } + } + + /* Do nothing if there are users remaining */ + struct uhd_dpdk_port *port = NULL; + LIST_FOREACH(port, &t->port_list, port_entry) { + /* Check for RX sockets */ + const void *hash_key; + void *hash_sock; + uint32_t hash_next = 0; + if (rte_hash_iterate(port->rx_table, &hash_key, + &hash_sock, &hash_next) != -ENOENT) + return -EAGAIN; + + /* Check for TX sockets */ + struct uhd_dpdk_tx_queue *q = NULL; + LIST_FOREACH(q, &port->txq_list, entry) { + if (!LIST_EMPTY(&q->tx_list)) + return -EAGAIN; + } + } + + /* Now can free memory */ + LIST_FOREACH(port, &t->port_list, port_entry) { + rte_hash_free(port->rx_table); + + struct uhd_dpdk_tx_queue *q = LIST_FIRST(&port->txq_list); + while (!LIST_EMPTY(&port->txq_list)) { + struct uhd_dpdk_tx_queue *nextq = LIST_NEXT(q, entry); + while (!rte_ring_empty(q->queue)) { + struct rte_buf *buf = NULL; + rte_ring_dequeue(q->queue, (void **) &buf); + rte_free(buf); + } + while (!rte_ring_empty(q->freebufs)) { + struct rte_buf *buf = NULL; + rte_ring_dequeue(q->freebufs, (void **) &buf); + rte_free(buf); + } + while (!rte_ring_empty(q->retry_queue)) { + struct rte_buf *buf = NULL; + rte_ring_dequeue(q->retry_queue, (void **) &buf); + rte_free(buf); + } + rte_ring_free(q->queue); + rte_ring_free(q->freebufs); + rte_ring_free(q->retry_queue); + rte_free(q); + q = nextq; + } + + const void *arp_key; + uint32_t arp_key_next = 0; + struct uhd_dpdk_arp_entry *arp_entry = NULL; + while (rte_hash_iterate(port->arp_table, &arp_key, + (void **) &arp_entry, &arp_key_next) >= 0) { + rte_free(arp_entry); + } + rte_hash_free(port->arp_table); + } + + return 0; +} + +int _uhd_dpdk_driver_main(void *arg) +{ + + /* Don't currently have use for arguments */ + if (arg) + return -EINVAL; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; + if (t->id != lcore_id) + return -ENODEV; + + RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); + int status = 0; + while (!status) { + /* Check for open()/close() requests and service 1 at a time */ + struct uhd_dpdk_config_req *sock_req; + if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) { + /* FIXME: Not checking return vals */ + switch (sock_req->req_type) { + case UHD_DPDK_SOCK_OPEN: + _uhd_dpdk_sock_setup(sock_req); + break; + case UHD_DPDK_SOCK_CLOSE: + _uhd_dpdk_sock_release(sock_req); + break; + case UHD_DPDK_LCORE_TERM: + RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id); + status = 1; + _uhd_dpdk_config_req_compl(sock_req, 0); + break; + default: + RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type); + break; + } + } + /* For each port, attempt to receive packets and process */ + struct uhd_dpdk_port *port = NULL; + LIST_FOREACH(port, &t->port_list, port_entry) { + struct ether_hdr *hdr; + char *l2_data; + struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; + const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, + bufs, UHD_DPDK_RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + continue; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); + l2_data = (char *) &hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); + } else { + _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } + } + /* For each port's TX queues, do TX */ + LIST_FOREACH(port, &t->port_list, port_entry) { + struct uhd_dpdk_tx_queue *q = NULL; + LIST_FOREACH(q, &port->txq_list, entry) { + if (!rte_ring_empty(q->retry_queue)) { + int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); + _uhd_dpdk_restore_bufs(port, q, num_retry); + if (!rte_ring_empty(q->retry_queue)) { + break; + } + } + if (rte_ring_empty(q->queue)) { + continue; + } + int num_tx = _uhd_dpdk_send(port, q, q->queue); + if (num_tx > 0) { + _uhd_dpdk_restore_bufs(port, q, num_tx); + } else { + break; + } + } + } + } + + /* Now turn off ports */ + _uhd_dpdk_disable_ports(t); + + /* Now clean up before exiting */ + int cleaning = -EAGAIN; + while (cleaning == -EAGAIN) { + cleaning = _uhd_dpdk_driver_cleanup(t); + } + return status; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h new file mode 100644 index 000000000..b0d3e42cd --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h @@ -0,0 +1,32 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_DRIVER_H_ +#define _UHD_DPDK_DRIVER_H_ + +#include "uhd_dpdk_ctx.h" +#include <rte_mbuf.h> +#include <rte_arp.h> +#include <rte_udp.h> +#include <rte_ip.h> + +static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_addr) +{ + uint32_t network = port->netmask | ((~port->netmask) & dst_ipv4_addr); + return (network == 0xffffffff); +} + + +int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame); +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt); +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt); +int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port, + struct uhd_dpdk_socket *sock, + struct rte_mbuf *mbuf); +int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, + uint32_t ip); + +int _uhd_dpdk_driver_main(void *arg); +#endif /* _UHD_DPDK_DRIVER_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c new file mode 100644 index 000000000..3acc3d709 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c @@ -0,0 +1,306 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include <rte_malloc.h> +#include <rte_ip.h> + +/************************************************ + * I/O thread ONLY + * + * TODO: Decide whether to allow blocking on mutex + * This would cause the I/O thread to sleep, which isn't desireable + * Could throw in a "request completion cleanup" section in I/O thread's + * main loop, though. Just keep trying until the requesting thred is woken + * up. This would be to handle the case where the thread hadn't finished + * setting itself up to wait on the condition variable, but the I/O thread + * still got the request. + */ +int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval) +{ + req->retval = retval; + int stat = pthread_mutex_trylock(&req->mutex); + if (stat) { + RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__); + return stat; + } + stat = pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + if (stat) { + RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__); + return stat; + } + return 0; +} + +int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req) +{ + int stat = 0; + switch (req->sock_type) { + case UHD_DPDK_SOCK_UDP: + stat = _uhd_dpdk_udp_setup(req); + break; + default: + stat = -EINVAL; + _uhd_dpdk_config_req_compl(req, -EINVAL); + } + return stat; +} + +int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req) +{ + int stat = 0; + switch (req->sock_type) { + case UHD_DPDK_SOCK_UDP: + stat = _uhd_dpdk_udp_release(req); + break; + default: + stat = -EINVAL; + _uhd_dpdk_config_req_compl(req, -EINVAL); + } + + return stat; +} + +/************************************************ + * API calls + */ +struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, + enum uhd_dpdk_sock_type t, void *sockarg) +{ + if (!ctx || (t >= UHD_DPDK_SOCK_TYPE_COUNT)) { + return NULL; + } + + struct uhd_dpdk_port *port = find_port(portid); + if (!port) { + return NULL; + } + + if (!port->ipv4_addr) { + RTE_LOG(WARNING, EAL, "Please set IPv4 address for port %u before opening socket\n", portid); + return NULL; + } + + + struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); + if (!req) { + return NULL; + } + + struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0); + if (!s) { + goto sock_open_end; + } + + s->port = port; + req->sock = s; + req->req_type = UHD_DPDK_SOCK_OPEN; + req->sock_type = t; + req->retval = -ETIMEDOUT; + pthread_mutex_init(&req->mutex, NULL); + pthread_condattr_t condattr; + pthread_condattr_init(&condattr); + pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC); + pthread_cond_init(&req->cond, &condattr); + + switch (t) { + case UHD_DPDK_SOCK_UDP: + uhd_dpdk_udp_open(req, sockarg); + break; + default: + break; + } + + if (req->retval) { + rte_free(s); + s = NULL; + } + +sock_open_end: + rte_free(req); + return s; +} + +int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) +{ + if (!ctx || !sock) + return -EINVAL; + + struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); + if (!req) + return -ENOMEM; + req->sock = sock; + req->req_type = UHD_DPDK_SOCK_CLOSE; + req->sock_type = sock->sock_type; + req->retval = -ETIMEDOUT; + pthread_mutex_init(&req->mutex, NULL); + pthread_cond_init(&req->cond, NULL); + + switch (sock->sock_type) { + case UHD_DPDK_SOCK_UDP: + uhd_dpdk_udp_close(req); + break; + default: + break; + } + + if (req->retval) { + rte_free(req); + return req->retval; + } + + rte_free(sock); + return 0; +} + +/* + * TODO: + * Add blocking calls with timeout + * Implementation would involve a condition variable, like config reqs + * Also would create a cleanup section in I/O main loop (like config reqs) + */ +int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, + unsigned int num_bufs) +{ + if (!sock || !bufs || !num_bufs) { + return -EINVAL; + } + *bufs = NULL; + + if (!sock->tx_ring) + return -EINVAL; + + unsigned int num_tx = rte_ring_count(sock->rx_ring); + num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; + if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0) + return -ENOENT; + sock->tx_buf_count += num_tx; + return num_tx; +} + +int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, + unsigned int num_bufs) +{ + if (!sock || !bufs || !num_bufs) + return -EINVAL; + if (!sock->tx_ring) + return -EINVAL; + unsigned int num_tx = rte_ring_free_count(sock->tx_ring); + num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; + switch (sock->sock_type) { + case UHD_DPDK_SOCK_UDP: + for (unsigned int i = 0; i < num_tx; i++) { + uhd_dpdk_udp_prep(sock, bufs[i]); + } + break; + default: + RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__); + return -EINVAL; + } + int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL); + if (status == 0) { + RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n"); + return status; + } + sock->tx_buf_count -= num_tx; + return num_tx; +} + +/* + * TODO: + * Add blocking calls with timeout + */ +int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, + unsigned int num_bufs, unsigned int timeout) +{ + if (!sock || !bufs || !num_bufs) + return -EINVAL; + if (!sock->rx_ring) + return -EINVAL; + unsigned int num_rx = rte_ring_count(sock->rx_ring); + num_rx = (num_rx < num_bufs) ? num_rx : num_bufs; + if (num_rx) { + unsigned int avail = 0; + unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring, + (void **) bufs, num_rx, &avail); + if (status == 0) { + RTE_LOG(ERR, USER1, "Invalid shared usage of RX ring detected\n"); + RTE_LOG(ERR, USER1, "Requested %u, but %u available\n", + num_rx, avail); + return -ENOENT; + } + } + return num_rx; +} + +void uhd_dpdk_free_buf(struct rte_mbuf *buf) +{ + rte_pktmbuf_free(buf); +} + +void * uhd_dpdk_buf_to_data(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf) +{ + if (!sock || !buf) + return NULL; + + /* TODO: Support for more types? */ + switch (sock->sock_type) { + case UHD_DPDK_SOCK_UDP: + return rte_pktmbuf_mtod_offset(buf, void *, sizeof(struct ether_hdr) + + sizeof(struct ipv4_hdr) + + sizeof(struct udp_hdr)); + default: + return NULL; + } +} + + +int uhd_dpdk_get_len(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf) +{ + if (!sock || !buf) + return -EINVAL; + + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + + struct udp_hdr *hdr = (struct udp_hdr *) ((uint8_t *) uhd_dpdk_buf_to_data(sock, buf) - sizeof(struct udp_hdr)); + if (!hdr) + return -EINVAL; + + /* Report dgram length - header */ + return ntohs(hdr->dgram_len) - 8; +} + +int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf, + uint32_t *ipv4_addr) +{ + if (!sock || !buf || !ipv4_addr) + return -EINVAL; + + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + + struct ipv4_hdr *hdr = rte_pktmbuf_mtod_offset(buf, struct ipv4_hdr *, + sizeof(struct ether_hdr)); + + *ipv4_addr = hdr->src_addr; + return 0; +} + +int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count) +{ + if (!sock) + return -EINVAL; + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + if (!sock->priv) + return -ENODEV; + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + *count = pdata->dropped_pkts; + return 0; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h new file mode 100644 index 000000000..c07c1913a --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h @@ -0,0 +1,15 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_FOPS_H_ +#define _UHD_DPDK_FOPS_H_ + +#include "uhd_dpdk_ctx.h" + +int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval); + +int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req); +int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req); +#endif /* _UHD_DPDK_FOPS_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c new file mode 100644 index 000000000..26cfd43e1 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -0,0 +1,456 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_fops.h" +#include "uhd_dpdk_udp.h" +#include "uhd_dpdk_driver.h" +#include <rte_ring.h> +#include <rte_malloc.h> +#include <unistd.h> +#include <sys/syscall.h> +#include <arpa/inet.h> + +/************************************************ + * I/O thread ONLY + */ + +static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue) +{ + *queue = NULL; + struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); + if (!q) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate TX queue\n", __func__); + return -ENOMEM; + } + q->tid = tid; + LIST_INIT(&q->tx_list); + + char name[32]; + snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid); + q->queue = rte_ring_create( + name, + UHD_DPDK_TXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid); + q->freebufs = rte_ring_create( + name, + UHD_DPDK_TXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + /* Set up retry queue */ + snprintf(name, sizeof(name), "retry_queue_%u", port->id); + q->retry_queue = rte_ring_create( + name, + UHD_DPDK_TXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + + if (!q->queue || !q->freebufs || !q->retry_queue) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate TX rings\n", __func__); + if (q->queue) + rte_ring_free(q->queue); + if (q->freebufs) + rte_ring_free(q->freebufs); + if (q->retry_queue) + rte_ring_free(q->retry_queue); + rte_free(q); + return -ENOMEM; + } + struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; + unsigned int num_bufs = rte_ring_free_count(q->freebufs); + int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); + if (buf_stat) { + rte_ring_free(q->freebufs); + rte_ring_free(q->queue); + rte_ring_free(q->retry_queue); + rte_free(q); + RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); + return -ENOENT; + } + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); + } + LIST_INSERT_HEAD(&port->txq_list, q, entry); + *queue = q; + return 0; +} + +/* Finish setting up UDP socket (unless ARP needs to be done) + * Not multi-thread safe! + * This call should only be used by the thread servicing the port + * In addition, the code below assumes simplex sockets and unique receive ports + * FIXME: May need some way to help clean up abandoned socket requests (refcnt check...) + */ +int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) +{ + int retval = 0; + struct uhd_dpdk_socket *sock = req->sock; + struct uhd_dpdk_udp_priv *pdata = sock->priv; + struct uhd_dpdk_port *port = req->sock->port; + + struct uhd_dpdk_ipv4_5tuple ht_key = { + .sock_type = UHD_DPDK_SOCK_UDP, + .src_ip = 0, + .src_port = 0, + .dst_ip = 0, + .dst_port = pdata->dst_port + }; + + /* Are we doing RX? */ + if (sock->rx_ring) { + /* Add to rx table */ + if (pdata->dst_port == 0) { + /* Assign unused one in a very slow fashion */ + for (uint16_t i = 1; i > 0; i++) { + ht_key.dst_port = htons(i); + if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) { + pdata->dst_port = htons(i); + break; + } + } + } + + /* Is the port STILL invalid? */ + if (pdata->dst_port == 0) { + RTE_LOG(ERR, USER1, "%s: No available UDP ports\n", __func__); + _uhd_dpdk_config_req_compl(req, -EADDRINUSE); + return -EADDRINUSE; + } + + ht_key.dst_port = pdata->dst_port; + if (rte_hash_lookup(port->rx_table, &ht_key) > 0) { + RTE_LOG(ERR, USER1, "%s: Cannot add to RX table\n", __func__); + _uhd_dpdk_config_req_compl(req, -EADDRINUSE); + return -EADDRINUSE; + } + + char name[32]; + snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port)); + sock->rx_ring = rte_ring_create( + name, + UHD_DPDK_RXQ_SIZE, + rte_socket_id(), + RING_F_SC_DEQ | RING_F_SP_ENQ + ); + if (!sock->rx_ring) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate RX ring\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock); + if (retval != 0) { + RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval); + rte_ring_free(sock->rx_ring); + _uhd_dpdk_config_req_compl(req, retval); + return retval; + } + _uhd_dpdk_config_req_compl(req, 0); + } + + /* Are we doing TX? */ + if (sock->tx_ring) { + sock->tx_ring = NULL; + struct uhd_dpdk_tx_queue *q = NULL; + LIST_FOREACH(q, &port->txq_list, entry) { + if (q->tid == sock->tid) { + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + sock->tx_ring = q->queue; + sock->rx_ring = q->freebufs; + break; + } + } + if (!sock->tx_ring) { + retval = _alloc_txq(port, sock->tid, &q); + if (retval) { + _uhd_dpdk_config_req_compl(req, retval); + return retval; + } + sock->tx_ring = q->queue; + sock->rx_ring = q->freebufs; + } + /* If a broadcast type, just finish setup and return */ + if (is_broadcast(port, pdata->dst_ipv4_addr)) { + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + _uhd_dpdk_config_req_compl(req, 0); + return 0; + } + /* Otherwise... Check for entry in ARP table */ + struct uhd_dpdk_arp_entry *entry = NULL; + int arp_table_stat = rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry); + if (entry) { + /* Check for null entry */ + if ((entry->mac_addr.addr_bytes[0] == 0xFF) && + (entry->mac_addr.addr_bytes[1] == 0xFF) && + (entry->mac_addr.addr_bytes[2] == 0xFF) && + (entry->mac_addr.addr_bytes[3] == 0xFF) && + (entry->mac_addr.addr_bytes[4] == 0xFF) && + (entry->mac_addr.addr_bytes[5] == 0xFF)) { + arp_table_stat = -ENOENT; + } + } else { + /* No entry -> Add null entry */ + entry = rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate ARP entry\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + memset(entry->mac_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + LIST_INIT(&entry->pending_list); + + if (rte_hash_add_key_data(port->arp_table, &pdata->dst_ipv4_addr, entry) < 0) { + rte_free(entry); + RTE_LOG(ERR, USER1, "%s: Cannot add entry to ARP table\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + } + + /* Was there a valid address? */ + if (arp_table_stat == -ENOENT) { + /* Get valid address and have requestor continue waiting */ + int arp_stat = 0; + do { /* Keep trying to send request if no descriptor */ + arp_stat = _uhd_dpdk_arp_request(port, pdata->dst_ipv4_addr); + } while (arp_stat == -EAGAIN); + + if (arp_stat) { + /* Config request errors out */ + RTE_LOG(ERR, USER1, "%s: Cannot make ARP request\n", __func__); + _uhd_dpdk_config_req_compl(req, arp_stat); + return arp_stat; + } + /* Append req to pending list. Will signal later. */ + LIST_INSERT_HEAD(&entry->pending_list, req, entry); + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + } else { + /* We have a valid address. All good. */ + LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + _uhd_dpdk_config_req_compl(req, 0); + } + } + return 0; +} + +int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) +{ + struct uhd_dpdk_socket *sock = req->sock; + struct uhd_dpdk_port *port = req->sock->port; + struct uhd_dpdk_config_req *conf_req = NULL; + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + if (sock->tx_ring) { + /* Remove from tx_list */ + LIST_REMOVE(sock, tx_entry); + /* Check for entry in ARP table */ + struct uhd_dpdk_arp_entry *entry = NULL; + rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry); + if (entry) { + LIST_FOREACH(conf_req, &entry->pending_list, entry) { + if (conf_req->sock == sock) { + LIST_REMOVE(conf_req, entry); + break; + } + } + } + + /* Add outstanding buffers back to TX queue's freebufs */ + struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; + int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); + if (status) { + RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); + } + + unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); + if (enqd != (unsigned int) sock->tx_buf_count) { + RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); + return status; + } + } else if (sock->rx_ring) { + struct uhd_dpdk_ipv4_5tuple ht_key = { + .sock_type = UHD_DPDK_SOCK_UDP, + .src_ip = 0, + .src_port = 0, + .dst_ip = 0, + .dst_port = pdata->dst_port + }; + rte_hash_del_key(port->rx_table, &ht_key); + struct rte_mbuf *mbuf = NULL; + while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) { + rte_pktmbuf_free(mbuf); + } + rte_ring_free(sock->rx_ring); + } + + _uhd_dpdk_config_req_compl(req, 0); + return 0; +} + +/* Configure a socket for UDP + * TODO: Make sure EVERYTHING is configured in socket + * FIXME: Make all of this part of the user thread, except hash table interaction + * and list handling + */ +void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, + struct uhd_dpdk_sockarg_udp *arg) +{ + if (!req) + return; + + if (!arg) { + req->retval = -EINVAL; + return; + } + + struct uhd_dpdk_socket *sock = req->sock; + pid_t tid = syscall(__NR_gettid); + sock->tid = tid; + + /* Create private data */ + struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0); + if (!data) { + req->retval = -ENOMEM; + return; + } + sock->priv = data; + + data->dst_ipv4_addr = arg->dst_addr; + if (arg->is_tx) { + data->src_port = arg->local_port; + data->dst_port = arg->remote_port; + sock->tx_ring = (struct rte_ring *) sock; + } else { + data->src_port = arg->remote_port; + data->dst_port = arg->local_port; + sock->rx_ring = (struct rte_ring *) sock; + } + + /* TODO: Add support for I/O thread calling (skip locking and sleep) */ + /* Add to port's config queue */ + pthread_mutex_lock(&req->mutex); + if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { + pthread_mutex_unlock(&req->mutex); + rte_free(data); + req->retval = -ENOSPC; + return; + } + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += 1; + pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); + pthread_mutex_unlock(&req->mutex); + + if (req->retval) + rte_free(data); +} + +void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req) +{ + if (!req) + return; + + pthread_mutex_lock(&req->mutex); + if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { + pthread_mutex_unlock(&req->mutex); + rte_free(req->sock->priv); + req->retval = -ENOSPC; + return; + } + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += 1; + pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); + pthread_mutex_unlock(&req->mutex); + rte_free(req->sock->priv); +} + +/* + * Note: I/O thread will fill in destination MAC address (doesn't happen here) + */ +static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port, + struct rte_mbuf *mbuf, + uint32_t dst_ipv4_addr, + uint8_t proto_id, + uint32_t payload_len) +{ + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + + ether_addr_copy(&port->mac_addr, ð_hdr->s_addr); + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + + ip_hdr->version_ihl = 0x40 | 5; + ip_hdr->type_of_service = 0; + ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len); + ip_hdr->packet_id = 0; + ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG); + ip_hdr->time_to_live = 64; + ip_hdr->next_proto_id = proto_id; + ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */ + ip_hdr->src_addr = port->ipv4_addr; + ip_hdr->dst_addr = dst_ipv4_addr; + + mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; + mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +} + +int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock, + struct rte_mbuf *mbuf) +{ + struct ether_hdr *eth_hdr; + struct ipv4_hdr *ip_hdr; + struct udp_hdr *tx_hdr; + struct uhd_dpdk_port *port = sock->port; + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + + if (unlikely(mbuf == NULL || pdata == NULL || port == NULL)) + return -EINVAL; + + uint32_t udp_data_len = mbuf->data_len; + uhd_dpdk_ipv4_prep(port, + mbuf, + pdata->dst_ipv4_addr, + 0x11, + 8 + udp_data_len); + + eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *); + ip_hdr = (struct ipv4_hdr *) ð_hdr[1]; + tx_hdr = (struct udp_hdr *) &ip_hdr[1]; + + tx_hdr->src_port = pdata->src_port; + tx_hdr->dst_port = pdata->dst_port; + tx_hdr->dgram_len = rte_cpu_to_be_16(8 + udp_data_len); + tx_hdr->dgram_cksum = 0; + + return 0; +} + +int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_sockarg_udp *sockarg) +{ + if (unlikely(sock == NULL || sockarg == NULL)) + return -EINVAL; + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + if (sock->tx_ring) { + sockarg->is_tx = true; + sockarg->local_port = pdata->src_port; + sockarg->remote_port = pdata->dst_port; + sockarg->dst_addr = pdata->dst_ipv4_addr; + } else { + sockarg->is_tx = false; + sockarg->local_port = pdata->dst_port; + sockarg->remote_port = pdata->src_port; + sockarg->dst_addr = 0; + } + return 0; +} + diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h new file mode 100644 index 000000000..651ae144e --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h @@ -0,0 +1,30 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_UDP_H_ +#define _UHD_DPDK_UDP_H_ + +#include "uhd_dpdk_ctx.h" +#include <rte_udp.h> + +struct uhd_dpdk_udp_priv { + uint16_t src_port; + uint16_t dst_port; + uint32_t dst_ipv4_addr; + uint32_t dropped_pkts; + /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */ + //struct uhd_dpdk_arp_entry *arp_entry; +}; + +int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req); +int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req); + +void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, + struct uhd_dpdk_sockarg_udp *arg); +void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req); + +int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock, + struct rte_mbuf *mbuf); +#endif /* _UHD_DPDK_UDP_H_ */ |