aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt6
-rw-r--r--host/lib/transport/uhd-dpdk/CMakeLists.txt36
-rw-r--r--host/lib/transport/uhd-dpdk/test/Makefile53
-rw-r--r--host/lib/transport/uhd-dpdk/test/test.c303
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk.c363
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h253
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c401
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h32
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c306
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h15
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c456
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h30
12 files changed, 2252 insertions, 2 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 15771697a..7c79bc67c 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -130,8 +130,6 @@ IF(ENABLE_X300)
ENDIF(ENABLE_X300)
IF(ENABLE_LIBERIO)
- MESSAGE(STATUS "")
- MESSAGE(STATUS "liberio support enabled.")
INCLUDE_DIRECTORIES(${LIBERIO_INCLUDE_DIRS})
LIBUHD_APPEND_LIBS(${LIBERIO_LIBRARIES})
LIBUHD_APPEND_SOURCES(
@@ -139,6 +137,10 @@ IF(ENABLE_LIBERIO)
)
ENDIF(ENABLE_LIBERIO)
+IF(ENABLE_DPDK)
+ INCLUDE_SUBDIRECTORY(uhd-dpdk)
+ENDIF(ENABLE_DPDK)
+
# Verbose Debug output for send/recv
SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" )
OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" )
diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt
new file mode 100644
index 000000000..be683aaf2
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Copyright 2018 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0
+#
+
+########################################################################
+# Add the subdirectories
+########################################################################
+if(ENABLE_DPDK)
+ if(NOT DEFINED UHD_DPDK_CFLAGS)
+ message(STATUS "")
+ set(UHD_DPDK_CFLAGS "-march=native"
+ CACHE STRING "CFLAGS to use when building uhd-dpdk sources")
+ message(STATUS "DPDK: Using default UHD_DPDK_CFLAGS=" ${UHD_DPDK_CFLAGS})
+ endif(NOT DEFINED UHD_DPDK_CFLAGS)
+
+ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
+
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c
+ )
+ set_source_files_properties(
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c
+ PROPERTIES COMPILE_FLAGS ${UHD_DPDK_CFLAGS}
+ )
+ include_directories(${DPDK_INCLUDE_DIR})
+ LIBUHD_APPEND_LIBS(${DPDK_LIBRARIES})
+endif(ENABLE_DPDK)
+
diff --git a/host/lib/transport/uhd-dpdk/test/Makefile b/host/lib/transport/uhd-dpdk/test/Makefile
new file mode 100644
index 000000000..9d6e60372
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/test/Makefile
@@ -0,0 +1,53 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = test
+
+# all source are stored in SRCS-y
+SRCS-y := test.c
+
+CFLAGS += -O0 -g
+CFLAGS += $(WERROR_FLAGS)
+
+EXTRA_CFLAGS=-I${S}/../include
+EXTRA_LDFLAGS=-L${O}/lib -luhd-dpdk
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/host/lib/transport/uhd-dpdk/test/test.c b/host/lib/transport/uhd-dpdk/test/test.c
new file mode 100644
index 000000000..c324561de
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/test/test.c
@@ -0,0 +1,303 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+/**
+ * Benchmark program to check performance of 2 simultaneous links
+ */
+
+
+#include <uhd-dpdk.h>
+#include <stdio.h>
+#include <stdbool.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <arpa/inet.h>
+
+#define NUM_MBUFS 4095
+#define MBUF_CACHE_SIZE 315
+#define BURST_SIZE 64
+
+#define NUM_PORTS 2
+#define TX_CREDITS 28
+#define RX_CREDITS 64
+#define BENCH_SPP 700
+#define BENCH_IFG 220
+
+
+static uint32_t last_seqno[NUM_PORTS];
+static uint32_t dropped_packets[NUM_PORTS];
+static uint32_t lasts[NUM_PORTS][16], drops[NUM_PORTS][16];
+static uint32_t last_ackno[NUM_PORTS];
+static uint32_t tx_seqno[NUM_PORTS];
+static uint64_t tx_xfer[NUM_PORTS];
+
+static void process_udp(int id, uint32_t *udp_data)
+{
+ if (udp_data[0] != last_seqno[id] + 1) {
+ lasts[id][dropped_packets[id] & 0xf] = last_seqno[id];
+ drops[id][dropped_packets[id] & 0xf] = udp_data[0];
+ dropped_packets[id]++;
+ }
+
+ last_seqno[id] = udp_data[0];
+ last_ackno[id] = udp_data[1];
+}
+
+static void send_ctrl(struct uhd_dpdk_socket *sock, uint32_t run)
+{
+ struct rte_mbuf *mbuf = NULL;
+ uhd_dpdk_request_tx_bufs(sock, &mbuf, 1);
+ if (unlikely(mbuf == NULL))
+ return;
+ uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf);
+ tx_data[0] = (BENCH_SPP << 16) | (TX_CREDITS << 4) | run; // spp, tx_credits, run
+ tx_data[1] = (RX_CREDITS << 16) | (BENCH_IFG); // credits, ifg
+ mbuf->pkt_len = 8;
+ mbuf->data_len = 8;
+ uhd_dpdk_send(sock, &mbuf, 1);
+}
+
+static void send_udp(struct uhd_dpdk_socket *sock, int id, bool fc_only)
+{
+ struct rte_mbuf *mbuf = NULL;
+ uhd_dpdk_request_tx_bufs(sock, &mbuf, 1);
+ if (unlikely(mbuf == NULL))
+ return;
+ uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf);
+ tx_data[0] = fc_only ? tx_seqno[id] - 1 : tx_seqno[id];
+ tx_data[1] = last_seqno[id];
+ if (!fc_only) {
+ memset(&tx_data[2], last_seqno[id], 8*BENCH_SPP);
+ tx_xfer[id] += 8*BENCH_SPP;
+ }
+ mbuf->pkt_len = 8 + (fc_only ? 0 : 8*BENCH_SPP);
+ mbuf->data_len = 8 + (fc_only ? 0 : 8*BENCH_SPP);
+
+ uhd_dpdk_send(sock, &mbuf, 1);
+
+ if (!fc_only) {
+ tx_seqno[id]++;
+ }
+}
+
+static void bench(struct uhd_dpdk_socket **tx, struct uhd_dpdk_socket **rx, struct uhd_dpdk_socket **ctrl, uint32_t nb_ports)
+{
+ uint64_t total_xfer[NUM_PORTS];
+ uint32_t id;
+ for (id = 0; id < nb_ports; id++) {
+ tx_seqno[id] = 1;
+ tx_xfer[id] = 0;
+ last_ackno[id] = 0;
+ last_seqno[id] = 0;
+ dropped_packets[id] = 0;
+ total_xfer[id] = 0;
+ }
+ sleep(1);
+ struct timeval bench_start, bench_end;
+ gettimeofday(&bench_start, NULL);
+ for (id = 0; id < nb_ports; id++) {
+ send_ctrl(ctrl[id], 0);
+ for (int pktno = 0; (pktno < TX_CREDITS*3/4); pktno++) {
+ send_udp(tx[id], id, false);
+ }
+ }
+ for (id = 0; id < nb_ports; id++) {
+ send_ctrl(ctrl[id], 1);
+ }
+ /*
+ * The test...
+ */
+ uint64_t total_received = 0;
+ uint32_t consec_no_rx = 0;
+ while (total_received < 1000000 ) { //&& consec_no_rx < 10000) {
+ for (id = 0; id < nb_ports; id++) {
+
+ /* Get burst of RX packets, from first port of pair. */
+ struct rte_mbuf *bufs[BURST_SIZE];
+ const int64_t nb_rx = uhd_dpdk_recv(rx[id], bufs, BURST_SIZE);
+
+ if (unlikely(nb_rx <= 0)) {
+ consec_no_rx++;
+ if (consec_no_rx >= 100000) {
+ uint32_t skt_drops = 0;
+ uhd_dpdk_get_drop_count(rx[id], &skt_drops);
+ printf("TX seq %d, TX ack %d, RX seq %d, %d, drops!\n", tx_seqno[id], last_ackno[id], last_seqno[id], skt_drops);
+ consec_no_rx = 0;
+ break;
+ }
+ continue;
+ } else {
+ consec_no_rx = 0;
+ }
+
+ for (int buf = 0; buf < nb_rx; buf++) {
+ total_xfer[id] += bufs[buf]->pkt_len;
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ uint32_t *data = (uint32_t *) uhd_dpdk_buf_to_data(rx[id], bufs[buf]);
+ if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* FIXME: Deprecated/changed in later release */
+ printf("Buf %d: Bad IP cksum\n", buf);
+ } else {
+ process_udp(id, data);
+ }
+ }
+
+ /* Free buffers. */
+ for (int buf = 0; buf < nb_rx; buf++)
+ uhd_dpdk_free_buf(bufs[buf]);
+ total_received += nb_rx;
+ }
+
+ for (id = 0; id < nb_ports; id++) {
+ /* TX portion */
+ uint32_t window_end = last_ackno[id] + TX_CREDITS;
+ //uint32_t window_end = last_seqno[port] + TX_CREDITS;
+ if (window_end <= tx_seqno[id]) {
+ if (consec_no_rx == 9999) {
+ send_udp(tx[id], id, true);
+ }
+ //send_udp(tx[id], id, true);
+ ;
+ } else {
+ for (int pktno = 0; (pktno < BURST_SIZE) && (tx_seqno[id] < window_end); pktno++) {
+ send_udp(tx[id], id, false);
+ }
+ }
+ }
+ }
+ for (id = 0; id < nb_ports; id++) {
+ send_ctrl(ctrl[id], 0);
+ }
+ gettimeofday(&bench_end, NULL);
+ printf("Benchmark complete\n\n");
+
+ for (id = 0; id < nb_ports; id++) {
+ printf("\n");
+ printf("Bytes received = %ld\n", total_xfer[id]);
+ printf("Bytes sent = %ld\n", tx_xfer[id]);
+ printf("Time taken = %ld us\n", (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec));
+ double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec);
+ elapsed_time *= 1.0e-6;
+ double elapsed_bytes = total_xfer[id];
+ printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
+ elapsed_bytes = tx_xfer[id];
+ printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
+ uint32_t skt_drops = 0;
+ uhd_dpdk_get_drop_count(rx[id], &skt_drops);
+ printf("Dropped %d packets\n", dropped_packets[id]);
+ printf("Socket reports dropped %d packets\n", skt_drops);
+ for (unsigned int i = 0; i < 16; i++) {
+ if (i >= dropped_packets[id])
+ break;
+ printf("Last(%u), Recv(%u)\n", lasts[id][i], drops[id][i]);
+ }
+ //printf("%d missed/dropped packets\n", errors);
+ printf("\n");
+ }
+
+}
+
+int main(int argc, char **argv)
+{
+ int port_thread_mapping[2] = {1, 1};
+ int status = uhd_dpdk_init(argc, argv, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE);
+ if (status) {
+ printf("%d: Something wrong?\n", status);
+ return status;
+ }
+
+ uint32_t eth_ip = htonl(0xc0a80008);
+ uint32_t eth_mask = htonl(0xffffff00);
+ status = uhd_dpdk_set_ipv4_addr(0, eth_ip, eth_mask);
+ if (status) {
+ printf("Error while setting IP0: %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_set_ipv4_addr(1, eth_ip, eth_mask);
+ if (status) {
+ printf("Error while setting IP1: %d\n", status);
+ return status;
+ }
+
+ struct uhd_dpdk_socket *eth_rx[2];
+ struct uhd_dpdk_socket *eth_tx[2];
+ struct uhd_dpdk_socket *eth_ctrl[2];
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .local_port = htons(0xBEE7),
+ .remote_port = htons(0xBEE7),
+ .dst_addr = htonl(0xc0a80004)
+ };
+ eth_rx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_rx[0]) {
+ printf("!eth0_rx\n");
+ return -ENODEV;
+ }
+ eth_rx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_rx[1]) {
+ printf("!eth1_rx\n");
+ return -ENODEV;
+ }
+
+ sockarg.is_tx = true;
+ eth_tx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_tx[0]) {
+ printf("!eth0_tx\n");
+ return -ENODEV;
+ }
+ eth_tx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_tx[1]) {
+ printf("!eth1_tx\n");
+ return -ENODEV;
+ }
+
+ sockarg.local_port = htons(0xB4D);
+ sockarg.remote_port = htons(0xB4D);
+ eth_ctrl[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_ctrl[0]) {
+ printf("!eth0_ctrl\n");
+ return -ENODEV;
+ }
+ eth_ctrl[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_ctrl[1]) {
+ printf("!eth1_ctrl\n");
+ return -ENODEV;
+ }
+
+ bench(eth_tx, eth_rx, eth_ctrl, 2);
+
+ status = uhd_dpdk_sock_close(eth_rx[0]);
+ if (status) {
+ printf("Bad close RX0 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_rx[1]);
+ if (status) {
+ printf("Bad close RX1 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_tx[0]);
+ if (status) {
+ printf("Bad close TX0 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_tx[1]);
+ if (status) {
+ printf("Bad close TX1 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_ctrl[0]);
+ if (status) {
+ printf("Bad close Ctrl0 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_ctrl[1]);
+ if (status) {
+ printf("Bad close Ctrl1 %d\n", status);
+ return status;
+ }
+ return status;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
new file mode 100644
index 000000000..2ee74a201
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
@@ -0,0 +1,363 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_ctx.h"
+#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_driver.h"
+#include <stdlib.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_log.h>
+
+/* FIXME: Replace with configurable values */
+#define DEFAULT_RING_SIZE 512
+
+/* FIXME: This needs to be protected */
+struct uhd_dpdk_ctx *ctx = NULL;
+
+/**
+ * TODO: Probably should provide way to get access to thread for a given port
+ * UHD's first calling thread will be the master thread
+ * In UHD, maybe check thread, and if it is different, pass work to that thread and optionally wait() on it (some condition variable)
+ */
+
+/* TODO: For nice scheduling options later, make sure to separate RX and TX activity */
+
+
+int uhd_dpdk_port_count(void)
+{
+ if (!ctx)
+ return -ENODEV;
+ return ctx->num_ports;
+}
+
+struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid)
+{
+ struct eth_addr retval;
+ memset(retval.addr, 0xff, ETHER_ADDR_LEN);
+
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN);
+ }
+ return retval;
+}
+
+int uhd_dpdk_get_ipv4_addr(unsigned int portid, uint32_t *ipv4_addr, uint32_t *netmask)
+{
+ if (!ipv4_addr)
+ return -EINVAL;
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ *ipv4_addr = p->ipv4_addr;
+ if (netmask) {
+ *netmask = p->netmask;
+ }
+ return 0;
+ }
+ return -ENODEV;
+}
+
+int uhd_dpdk_set_ipv4_addr(unsigned int portid, uint32_t ipv4_addr, uint32_t netmask)
+{
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ p->ipv4_addr = ipv4_addr;
+ p->netmask = netmask;
+ return 0;
+ }
+ return -ENODEV;
+}
+
+/*
+ * Initialize a given port using default settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ * FIXME: Starting with assumption of one thread/core per port
+ */
+static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port,
+ struct rte_mempool *rx_mbuf_pool,
+ unsigned int mtu)
+{
+ int retval;
+
+ /* Check for a valid port */
+ if (port->id >= rte_eth_dev_count())
+ return -ENODEV;
+
+ /* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */
+ /* FIXME: Check if hw_ip_checksum is possible */
+ struct rte_eth_conf port_conf = {
+ .rxmode = {
+ .max_rx_pkt_len = mtu,
+ .jumbo_frame = 1,
+ .hw_ip_checksum = 1,
+ }
+ };
+ retval = rte_eth_dev_configure(port->id, 1, 1, &port_conf);
+ if (retval != 0)
+ return retval;
+
+ retval = rte_eth_rx_queue_setup(port->id, 0, DEFAULT_RING_SIZE,
+ rte_eth_dev_socket_id(port->id), NULL, rx_mbuf_pool);
+ if (retval < 0)
+ return retval;
+
+ retval = rte_eth_tx_queue_setup(port->id, 0, DEFAULT_RING_SIZE,
+ rte_eth_dev_socket_id(port->id), NULL);
+ if (retval < 0)
+ goto port_init_fail;
+
+ /* Create the hash table for the RX sockets */
+ char name[32];
+ snprintf(name, sizeof(name), "rx_table_%u", port->id);
+ struct rte_hash_parameters hash_params = {
+ .name = name,
+ .entries = UHD_DPDK_MAX_SOCKET_CNT,
+ .key_len = sizeof(struct uhd_dpdk_ipv4_5tuple),
+ .hash_func = NULL,
+ .hash_func_init_val = 0,
+ };
+ port->rx_table = rte_hash_create(&hash_params);
+ if (port->rx_table == NULL) {
+ retval = rte_errno;
+ goto port_init_fail;
+ }
+
+ /* Create ARP table */
+ snprintf(name, sizeof(name), "arp_table_%u", port->id);
+ hash_params.name = name;
+ hash_params.entries = UHD_DPDK_MAX_SOCKET_CNT;
+ hash_params.key_len = sizeof(uint32_t);
+ hash_params.hash_func = NULL;
+ hash_params.hash_func_init_val = 0;
+ port->arp_table = rte_hash_create(&hash_params);
+ if (port->arp_table == NULL) {
+ retval = rte_errno;
+ goto free_rx_table;
+ }
+
+ /* Set up list for TX queues */
+ LIST_INIT(&port->txq_list);
+
+ /* Start the Ethernet port. */
+ retval = rte_eth_dev_start(port->id);
+ if (retval < 0) {
+ goto free_arp_table;
+ }
+
+ /* Display the port MAC address. */
+ rte_eth_macaddr_get(port->id, &port->mac_addr);
+ RTE_LOG(INFO, EAL, "Port %u MAC: %02x %02x %02x %02x %02x %02x\n",
+ (unsigned)port->id,
+ port->mac_addr.addr_bytes[0], port->mac_addr.addr_bytes[1],
+ port->mac_addr.addr_bytes[2], port->mac_addr.addr_bytes[3],
+ port->mac_addr.addr_bytes[4], port->mac_addr.addr_bytes[5]);
+
+ struct rte_eth_link link;
+ rte_eth_link_get(port->id, &link);
+ RTE_LOG(INFO, EAL, "Port %u UP: %d\n", port->id, link.link_status);
+
+ return 0;
+
+free_arp_table:
+ rte_hash_free(port->arp_table);
+free_rx_table:
+ rte_hash_free(port->rx_table);
+port_init_fail:
+ return rte_errno;
+}
+
+static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id)
+{
+ if (!ctx || !thread)
+ return -EINVAL;
+
+ unsigned int socket_id = rte_lcore_to_socket_id(id);
+ thread->id = id;
+ thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id];
+ thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id];
+ LIST_INIT(&thread->port_list);
+
+ char name[32];
+ snprintf(name, sizeof(name), "sockreq_ring_%u", id);
+ thread->sock_req_ring = rte_ring_create(
+ name,
+ UHD_DPDK_MAX_PENDING_SOCK_REQS,
+ socket_id,
+ RING_F_SC_DEQ
+ );
+ if (!thread->sock_req_ring)
+ return -ENOMEM;
+ return 0;
+}
+
+
+int uhd_dpdk_init(int argc, char **argv, unsigned int num_ports,
+ int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
+ int mtu)
+{
+ /* Init context only once */
+ if (ctx)
+ return 1;
+
+ if ((num_ports == 0) || (port_thread_mapping == NULL)) {
+ return -EINVAL;
+ }
+
+ /* Grabs arguments intended for DPDK's EAL */
+ int ret = rte_eal_init(argc, argv);
+ if (ret < 0)
+ rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
+
+ ctx = (struct uhd_dpdk_ctx *) rte_zmalloc("uhd_dpdk_ctx", sizeof(*ctx), rte_socket_id());
+ if (!ctx)
+ return -ENOMEM;
+
+ ctx->num_threads = rte_lcore_count();
+ if (ctx->num_threads <= 1)
+ rte_exit(EXIT_FAILURE, "Error: No worker threads enabled\n");
+
+ /* Check that we have ports to send/receive on */
+ ctx->num_ports = rte_eth_dev_count();
+ if (ctx->num_ports < 1)
+ rte_exit(EXIT_FAILURE, "Error: Found no ports\n");
+ if (ctx->num_ports < num_ports)
+ rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n");
+
+ /* Get memory for thread and port data structures */
+ ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0);
+ if (!ctx->threads)
+ rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for thread data\n");
+ ctx->ports = rte_zmalloc("uhd_dpdk_port", ctx->num_ports*sizeof(struct uhd_dpdk_port), 0);
+ if (!ctx->ports)
+ rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n");
+
+ /* Initialize the thread data structures */
+ for (int i = rte_get_next_lcore(-1, 1, 0);
+ (i < RTE_MAX_LCORE);
+ i = rte_get_next_lcore(i, 1, 0))
+ {
+ /* Do one mempool of RX/TX per socket */
+ unsigned int socket_id = rte_lcore_to_socket_id(i);
+ /* FIXME Probably want to take into account actual number of ports per socket */
+ if (ctx->tx_pktbuf_pools[socket_id] == NULL) {
+ /* Creates a new mempool in memory to hold the mbufs.
+ * This is done for each CPU socket
+ */
+ const int mbuf_size = mtu + 2048 + RTE_PKTMBUF_HEADROOM;
+ char name[32];
+ snprintf(name, sizeof(name), "rx_mbuf_pool_%u", socket_id);
+ ctx->rx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create(
+ name,
+ ctx->num_ports*num_mbufs,
+ mbuf_cache_size,
+ 0,
+ mbuf_size,
+ socket_id
+ );
+ snprintf(name, sizeof(name), "tx_mbuf_pool_%u", socket_id);
+ ctx->tx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create(
+ name,
+ ctx->num_ports*num_mbufs,
+ mbuf_cache_size,
+ 0,
+ mbuf_size,
+ socket_id
+ );
+ if ((ctx->rx_pktbuf_pools[socket_id]== NULL) ||
+ (ctx->tx_pktbuf_pools[socket_id]== NULL))
+ rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
+ }
+
+ if (uhd_dpdk_thread_init(&ctx->threads[i], i) < 0)
+ rte_exit(EXIT_FAILURE, "Error initializing thread %i\n", i);
+ }
+
+ unsigned master_lcore = rte_get_master_lcore();
+
+ /* Assign ports to threads and initialize the port data structures */
+ for (unsigned int i = 0; i < num_ports; i++) {
+ int thread_id = port_thread_mapping[i];
+ if (thread_id < 0)
+ continue;
+ if (((unsigned int) thread_id) == master_lcore)
+ RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i);
+ if (ctx->threads[thread_id].id != (unsigned int) thread_id)
+ rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i);
+
+ struct uhd_dpdk_port *port = &ctx->ports[i];
+ port->id = i;
+ port->parent = &ctx->threads[thread_id];
+ ctx->threads[thread_id].num_ports++;
+ LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry);
+
+ /* Initialize port. */
+ if (uhd_dpdk_port_init(port, port->parent->rx_pktbuf_pool, mtu) != 0)
+ rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+ i);
+ }
+
+ RTE_LOG(INFO, EAL, "Init DONE!\n");
+
+ /* FIXME: Create functions to do this */
+ RTE_LOG(INFO, EAL, "Starting I/O threads!\n");
+
+ for (int i = rte_get_next_lcore(-1, 1, 0);
+ (i < RTE_MAX_LCORE);
+ i = rte_get_next_lcore(i, 1, 0))
+ {
+ struct uhd_dpdk_thread *t = &ctx->threads[i];
+ if (!LIST_EMPTY(&t->port_list)) {
+ rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id);
+ }
+ }
+ return 0;
+}
+
+/* FIXME: This will be changed once we have functions to handle the threads */
+int uhd_dpdk_destroy(void)
+{
+ if (!ctx)
+ return -ENODEV;
+
+ struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req)
+ return -ENOMEM;
+
+ req->req_type = UHD_DPDK_LCORE_TERM;
+
+ for (int i = rte_get_next_lcore(-1, 1, 0);
+ (i < RTE_MAX_LCORE);
+ i = rte_get_next_lcore(i, 1, 0))
+ {
+ struct uhd_dpdk_thread *t = &ctx->threads[i];
+
+ if (LIST_EMPTY(&t->port_list))
+ continue;
+
+ if (rte_eal_get_lcore_state(t->id) == FINISHED)
+ continue;
+
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_cond_init(&req->cond, NULL);
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(t->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i);
+ rte_free(req);
+ return -ENOSPC;
+ }
+ struct timespec timeout = {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+ }
+
+ rte_free(req);
+ return 0;
+}
+
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
new file mode 100644
index 000000000..31c9dba0c
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
@@ -0,0 +1,253 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_CTX_H_
+#define _UHD_DPDK_CTX_H_
+
+#include <stdint.h>
+#include <sys/queue.h>
+#include <sys/types.h>
+#include <rte_ethdev.h>
+#include <rte_mbuf.h>
+#include <rte_hash.h>
+#include <rte_eal.h>
+#include <uhd/transport/uhd-dpdk.h>
+//#include <pthread.h>
+
+/* For nice scheduling options later, make sure to separate RX and TX activity */
+
+#define UHD_DPDK_MAX_SOCKET_CNT 1024
+#define UHD_DPDK_MAX_PENDING_SOCK_REQS 16
+#define UHD_DPDK_TXQ_SIZE 64
+#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1)
+#define UHD_DPDK_RXQ_SIZE 64
+#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1)
+
+struct uhd_dpdk_port;
+
+/**
+ *
+ * All memory allocation for port, rx_ring, and tx_ring owned by I/O thread
+ * Rest owned by user thread
+ *
+ * port: port servicing this socket
+ * tid: thread ID that owns this socket (to be associated with TX queue)
+ * sock_type: Type of socket
+ * priv: Private data, based on sock_type
+ * rx_ring: pointer to individual rx_ring (created during init--Also used as free buffer ring for TX)
+ * tx_ring: pointer to shared tx_ring (with all sockets for this tid)
+ * tx_buf_count: Number of buffers currently outside the rings
+ * tx_entry: List node for TX Queue tracking
+ *
+ * If a user closes a socket without outstanding TX buffers, user must free the
+ * buffers. Otherwise, that memory will be leaked, and usage will grow.
+ */
+struct uhd_dpdk_socket {
+ struct uhd_dpdk_port *port;
+ pid_t tid;
+ enum uhd_dpdk_sock_type sock_type;
+ void *priv;
+ struct rte_ring *rx_ring;
+ struct rte_ring *tx_ring;
+ int tx_buf_count;
+ LIST_ENTRY(uhd_dpdk_socket) tx_entry;
+};
+LIST_HEAD(uhd_dpdk_tx_head, uhd_dpdk_socket);
+
+/************************************************
+ * Configuration
+ ************************************************/
+enum uhd_dpdk_sock_req {
+ UHD_DPDK_SOCK_OPEN = 0,
+ UHD_DPDK_SOCK_CLOSE,
+ UHD_DPDK_LCORE_TERM,
+ UHD_DPDK_SOCK_REQ_COUNT
+};
+
+/**
+ * port: port associated with this request
+ * sock: socket associated with this request
+ * req_type: Open, Close, or terminate lcore
+ * sock_type: Only udp is supported
+ * cond: Used to sleep until socket creation is finished
+ * mutex: associated with cond
+ * entry: List node for requests pending ARP responses
+ * priv: private data
+ * retval: Result of call (needed post-wakeup)
+ */
+struct uhd_dpdk_config_req {
+ struct uhd_dpdk_port *port;
+ struct uhd_dpdk_socket *sock;
+ enum uhd_dpdk_sock_req req_type;
+ enum uhd_dpdk_sock_type sock_type;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ LIST_ENTRY(uhd_dpdk_config_req) entry;
+ void *priv;
+ int retval;
+};
+LIST_HEAD(uhd_dpdk_config_head, uhd_dpdk_config_req);
+
+/************************************************
+ * RX Table
+ ************************************************/
+struct uhd_dpdk_arp_entry {
+ struct ether_addr mac_addr;
+ struct uhd_dpdk_config_head pending_list; /* Config reqs pending ARP--Thread-unsafe */
+};
+
+struct uhd_dpdk_ipv4_5tuple {
+ enum uhd_dpdk_sock_type sock_type;
+ uint32_t src_ip;
+ uint32_t dst_ip;
+ uint16_t src_port;
+ uint16_t dst_port;
+};
+
+/************************************************
+ * TX Queues
+ *
+ * 1 TX Queue per thread sending through a hardware port
+ * All memory allocation owned by I/O thread
+ *
+ * tid: thread id
+ * queue: TX queue holding threads prepared packets (via send())
+ * retry_queue: queue holding packets that couldn't be sent
+ * freebufs: queue holding empty buffers
+ * tx_list: list of sockets using this queue
+ * entry: list node for port to track TX queues
+ *
+ * queue, retry_queue, and freebufs are single-producer, single-consumer queues
+ * retry_queue wholly-owned by I/O thread
+ * For queue, user thread is producer, I/O thread is consumer
+ * For freebufs, user thread is consumer, I/O thread is consumer
+ *
+ * All queues are same size, and they are shared between all sockets on one
+ * thread (tid is the identifier)
+ * 1. Buffers start in freebufs (user gets buffers from freebufs)
+ * 2. User submits packet to queue
+ * 3. If packet couldn't be sent, it is (re)enqueued on retry_queue
+ ************************************************/
+struct uhd_dpdk_tx_queue {
+ pid_t tid;
+ struct rte_ring *queue;
+ struct rte_ring *retry_queue;
+ struct rte_ring *freebufs;
+ struct uhd_dpdk_tx_head tx_list;
+ LIST_ENTRY(uhd_dpdk_tx_queue) entry;
+};
+LIST_HEAD(uhd_dpdk_txq_head, uhd_dpdk_tx_queue);
+
+/************************************************
+ * Port structure
+ *
+ * All memory allocation owned by I/O thread
+ *
+ * id: hardware port id (for DPDK)
+ * parent: I/O thread servicing this port
+ * mac_addr: MAC address of this port
+ * ipv4_addr: IPv4 address of this port
+ * netmask: Subnet mask of this port
+ * arp_table: ARP cache for this port
+ * rx_table: Mapping of 5-tuple key to sockets for RX
+ * txq_list: List of TX queues associated with this port
+ * port_entry: List node entry for I/O thread to track
+ ************************************************/
+struct uhd_dpdk_port {
+ unsigned int id;
+ struct uhd_dpdk_thread *parent;
+ struct ether_addr mac_addr;
+ uint32_t ipv4_addr; /* FIXME: Check this before allowing a socket!!! */
+ uint32_t netmask;
+ /* Key = IP addr
+ * Value = MAC addr (ptr to uhd_dpdk_arp_entry)
+ */
+ struct rte_hash *arp_table;
+ /* hash map of RX sockets
+ * Key = uhd_dpdk_ipv4_5tuple
+ * Value = uhd_dpdk_socket
+ */
+ struct rte_hash *rx_table;
+ /* doubly-linked list of TX sockets */
+ struct uhd_dpdk_txq_head txq_list;
+ LIST_ENTRY(uhd_dpdk_port) port_entry;
+};
+
+LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port);
+
+/************************************************
+ * Thread/lcore-private data structure
+ *
+ * All data owned by global context
+ *
+ * id: lcore id (from DPDK)
+ * rx_pktbuf_pool: memory pool for generating buffers for RX packets
+ * tx_pktbuf_pool: memory pool for generating buffers for TX packets
+ * num_ports: Number of ports this lcore is servicing
+ * port_list: List of ports this lcore is servicing
+ * sock_req_ring: Queue for user threads to submit service requests to the lcore
+ *
+ * sock_req_ring is a multi-producer, single-consumer queue
+ *
+ * For threads that have ports:
+ * Launch individually
+ * For threads without ports:
+ * Do not launch unless user specifically does it themselves.
+ * Should also have master lcore returned to user
+ * REMEMBER: Without args, DPDK creates an lcore for each CPU core!
+ */
+struct uhd_dpdk_thread {
+ unsigned int id;
+ struct rte_mempool *rx_pktbuf_pool;
+ struct rte_mempool *tx_pktbuf_pool;
+ int num_ports;
+ struct uhd_dpdk_port_head port_list;
+ struct rte_ring *sock_req_ring;
+};
+
+
+/************************************************
+ * One global context
+ *
+ * num_threads: Number of DPDK lcores tracked
+ * num_ports: Number of DPDK/NIC ports tracked
+ * threads: Array of all lcores/threads
+ * ports: Array of all DPDK/NIC ports
+ * rx_pktbuf_pools: Array of all packet buffer pools for RX
+ * tx_pktbuf_pools: Array of all packet buffer pools for TX
+ *
+ * The packet buffer pools are memory pools that are associated with a CPU
+ * socket. They will provide storage close to the socket to accommodate NUMA
+ * nodes.
+ ************************************************/
+struct uhd_dpdk_ctx {
+ unsigned int num_threads;
+ unsigned int num_ports;
+ struct uhd_dpdk_thread *threads;
+ struct uhd_dpdk_port *ports;
+ struct rte_mempool *rx_pktbuf_pools[RTE_MAX_NUMA_NODES];
+ struct rte_mempool *tx_pktbuf_pools[RTE_MAX_NUMA_NODES];
+};
+
+extern struct uhd_dpdk_ctx *ctx;
+
+static inline struct uhd_dpdk_port * find_port(unsigned int portid)
+{
+ if (!ctx)
+ return NULL;
+
+ for (unsigned int i = 0; i < ctx->num_threads; i++) {
+ struct uhd_dpdk_thread *t = &ctx->threads[i];
+ struct uhd_dpdk_port *p;
+ LIST_FOREACH(p, &t->port_list, port_entry) {
+ if (p->id == portid) {
+ return p;
+ }
+ }
+ }
+ return NULL;
+}
+
+#endif /* _UHD_DPDK_CTX_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
new file mode 100644
index 000000000..0af9cc4e5
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
@@ -0,0 +1,401 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_driver.h"
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include <rte_malloc.h>
+#include <rte_mempool.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+
+int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame)
+{
+ uint32_t dest_ip = arp_frame->arp_data.arp_sip;
+ struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
+
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &dest_ip, (void **) &entry);
+ if (!entry) {
+ entry = rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ return -ENOMEM;
+ }
+ LIST_INIT(&entry->pending_list);
+ ether_addr_copy(&dest_addr, &entry->mac_addr);
+ if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) {
+ rte_free(entry);
+ return -ENOSPC;
+ }
+ } else {
+ struct uhd_dpdk_config_req *req = NULL;
+ ether_addr_copy(&dest_addr, &entry->mac_addr);
+ /* Now wake any config reqs waiting for the ARP */
+ LIST_FOREACH(req, &entry->pending_list, entry) {
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+ }
+
+ return 0;
+}
+
+int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip)
+{
+ struct rte_mbuf *mbuf;
+ struct ether_hdr *hdr;
+ struct arp_hdr *arp_frame;
+
+ mbuf = rte_pktmbuf_alloc(port->parent->tx_pktbuf_pool);
+ if (unlikely(mbuf == NULL)) {
+ RTE_LOG(WARNING, MEMPOOL, "Could not allocate packet buffer for ARP request\n");
+ return -ENOMEM;
+ }
+
+ hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ arp_frame = (struct arp_hdr *) &hdr[1];
+
+ memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
+ ether_addr_copy(&port->mac_addr, &hdr->s_addr);
+ hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
+
+ arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER);
+ arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+ arp_frame->arp_hln = 6;
+ arp_frame->arp_pln = 4;
+ arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST);
+ ether_addr_copy(&port->mac_addr, &arp_frame->arp_data.arp_sha);
+ arp_frame->arp_data.arp_sip = port->ipv4_addr;
+ memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN);
+ arp_frame->arp_data.arp_tip = ip;
+
+ mbuf->pkt_len = 42;
+ mbuf->data_len = 42;
+ mbuf->ol_flags = PKT_TX_IP_CKSUM;
+
+ if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) {
+ RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__);
+ rte_pktmbuf_free(mbuf);
+ return -EAGAIN;
+ }
+ return 0;
+}
+
+
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt)
+{
+ int status = 0;
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pkt->dst_port
+ };
+
+ struct uhd_dpdk_socket *sock = NULL;
+ rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock);
+ if (!sock) {
+ status = -ENODEV;
+ goto udp_rx_drop;
+ }
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ status = rte_ring_enqueue(sock->rx_ring, mbuf);
+ if (status) {
+ pdata->dropped_pkts++;
+ goto udp_rx_drop;
+ }
+ return 0;
+
+udp_rx_drop:
+ rte_pktmbuf_free(mbuf);
+ return status;
+}
+
+int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt)
+{
+ if (pkt->dst_addr != port->ipv4_addr) {
+ rte_pktmbuf_free(mbuf);
+ return -ENODEV;
+ }
+ if (pkt->next_proto_id == 0x11) {
+ return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]);
+ }
+ rte_pktmbuf_free(mbuf);
+ return -EINVAL;
+}
+
+static int _uhd_dpdk_fill_ipv4_addr(struct uhd_dpdk_port *port,
+ struct rte_mbuf *mbuf)
+{
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+ if (is_broadcast(port, ip_hdr->dst_addr)) {
+ memset(eth_hdr->d_addr.addr_bytes, 0xff, ETHER_ADDR_LEN);
+ } else {
+ /* Lookup dest_addr */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &ip_hdr->dst_addr, (void **) &entry);
+ if (!entry) {
+ RTE_LOG(ERR, USER1, "TX packet on port %d to addr 0x%08x has no ARP entry\n", port->id, ip_hdr->dst_addr);
+ return -ENODEV;
+ }
+
+ ether_addr_copy(&entry->mac_addr, &eth_hdr->d_addr);
+ }
+ return 0;
+}
+
+static int _uhd_dpdk_send(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_tx_queue *txq,
+ struct rte_ring *q)
+{
+ struct rte_mbuf *buf;
+
+ unsigned int num_tx = rte_ring_count(q);
+ num_tx = (num_tx < UHD_DPDK_TX_BURST_SIZE) ? num_tx : UHD_DPDK_TX_BURST_SIZE;
+ for (unsigned int i = 0; i < num_tx; i++) {
+ int status = rte_ring_dequeue(q, (void **) &buf);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%s: Q Count doesn't match actual\n", __func__);
+ break;
+ }
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *);
+ if (eth_hdr->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv4)) {
+ status = _uhd_dpdk_fill_ipv4_addr(port, buf);
+ if (status) {
+ return status;
+ }
+ }
+ status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */
+ if (status != 1) {
+ status = rte_ring_enqueue(txq->retry_queue, buf);
+ if (status)
+ RTE_LOG(WARNING, USER1, "%s: Could not re-enqueue pkt %d\n", __func__, i);
+ num_tx = i;
+ rte_pktmbuf_free(buf);
+ break;
+ }
+ }
+
+ return num_tx;
+}
+
+static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_tx_queue *q,
+ unsigned int num_bufs)
+{
+ /* Allocate more buffers to replace the sent ones */
+ struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, num_bufs);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%d %s: Could not restore %u pktmbufs in bulk!\n", status, __func__, num_bufs);
+ }
+
+ /* Enqueue the buffers for the user thread to retrieve */
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n");
+ return status;
+ }
+
+ return num_bufs;
+}
+
+static inline void _uhd_dpdk_disable_ports(struct uhd_dpdk_thread *t)
+{
+ struct uhd_dpdk_port *port = NULL;
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ rte_eth_dev_stop(port->id);
+ }
+}
+
+static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t)
+{
+ /* Close sockets upon request, but reply to other service requests with
+ * errors
+ */
+ struct uhd_dpdk_config_req *sock_req;
+ if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req)) {
+ switch (sock_req->req_type) {
+ case UHD_DPDK_SOCK_CLOSE:
+ _uhd_dpdk_sock_release(sock_req);
+ break;
+ default:
+ _uhd_dpdk_config_req_compl(sock_req, -ENODEV);
+ break;
+ }
+ }
+
+ /* Do nothing if there are users remaining */
+ struct uhd_dpdk_port *port = NULL;
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ /* Check for RX sockets */
+ const void *hash_key;
+ void *hash_sock;
+ uint32_t hash_next = 0;
+ if (rte_hash_iterate(port->rx_table, &hash_key,
+ &hash_sock, &hash_next) != -ENOENT)
+ return -EAGAIN;
+
+ /* Check for TX sockets */
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (!LIST_EMPTY(&q->tx_list))
+ return -EAGAIN;
+ }
+ }
+
+ /* Now can free memory */
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ rte_hash_free(port->rx_table);
+
+ struct uhd_dpdk_tx_queue *q = LIST_FIRST(&port->txq_list);
+ while (!LIST_EMPTY(&port->txq_list)) {
+ struct uhd_dpdk_tx_queue *nextq = LIST_NEXT(q, entry);
+ while (!rte_ring_empty(q->queue)) {
+ struct rte_buf *buf = NULL;
+ rte_ring_dequeue(q->queue, (void **) &buf);
+ rte_free(buf);
+ }
+ while (!rte_ring_empty(q->freebufs)) {
+ struct rte_buf *buf = NULL;
+ rte_ring_dequeue(q->freebufs, (void **) &buf);
+ rte_free(buf);
+ }
+ while (!rte_ring_empty(q->retry_queue)) {
+ struct rte_buf *buf = NULL;
+ rte_ring_dequeue(q->retry_queue, (void **) &buf);
+ rte_free(buf);
+ }
+ rte_ring_free(q->queue);
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ q = nextq;
+ }
+
+ const void *arp_key;
+ uint32_t arp_key_next = 0;
+ struct uhd_dpdk_arp_entry *arp_entry = NULL;
+ while (rte_hash_iterate(port->arp_table, &arp_key,
+ (void **) &arp_entry, &arp_key_next) >= 0) {
+ rte_free(arp_entry);
+ }
+ rte_hash_free(port->arp_table);
+ }
+
+ return 0;
+}
+
+int _uhd_dpdk_driver_main(void *arg)
+{
+
+ /* Don't currently have use for arguments */
+ if (arg)
+ return -EINVAL;
+
+ /* Check that this is a valid lcore */
+ unsigned int lcore_id = rte_lcore_id();
+ if (lcore_id == LCORE_ID_ANY)
+ return -ENODEV;
+
+ /* Check that this lcore has ports */
+ struct uhd_dpdk_thread *t = &ctx->threads[lcore_id];
+ if (t->id != lcore_id)
+ return -ENODEV;
+
+ RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id);
+ int status = 0;
+ while (!status) {
+ /* Check for open()/close() requests and service 1 at a time */
+ struct uhd_dpdk_config_req *sock_req;
+ if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) {
+ /* FIXME: Not checking return vals */
+ switch (sock_req->req_type) {
+ case UHD_DPDK_SOCK_OPEN:
+ _uhd_dpdk_sock_setup(sock_req);
+ break;
+ case UHD_DPDK_SOCK_CLOSE:
+ _uhd_dpdk_sock_release(sock_req);
+ break;
+ case UHD_DPDK_LCORE_TERM:
+ RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id);
+ status = 1;
+ _uhd_dpdk_config_req_compl(sock_req, 0);
+ break;
+ default:
+ RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type);
+ break;
+ }
+ }
+ /* For each port, attempt to receive packets and process */
+ struct uhd_dpdk_port *port = NULL;
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ struct ether_hdr *hdr;
+ char *l2_data;
+ struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE];
+ const uint16_t num_rx = rte_eth_rx_burst(port->id, 0,
+ bufs, UHD_DPDK_RX_BURST_SIZE);
+ if (unlikely(num_rx == 0)) {
+ continue;
+ }
+
+ for (int buf = 0; buf < num_rx; buf++) {
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *);
+ l2_data = (char *) &hdr[1];
+ switch (rte_be_to_cpu_16(hdr->ether_type)) {
+ case ETHER_TYPE_ARP:
+ _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data);
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ case ETHER_TYPE_IPv4:
+ if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
+ RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
+ } else {
+ _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
+ }
+ break;
+ default:
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ }
+ }
+ }
+ /* For each port's TX queues, do TX */
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (!rte_ring_empty(q->retry_queue)) {
+ int num_retry = _uhd_dpdk_send(port, q, q->retry_queue);
+ _uhd_dpdk_restore_bufs(port, q, num_retry);
+ if (!rte_ring_empty(q->retry_queue)) {
+ break;
+ }
+ }
+ if (rte_ring_empty(q->queue)) {
+ continue;
+ }
+ int num_tx = _uhd_dpdk_send(port, q, q->queue);
+ if (num_tx > 0) {
+ _uhd_dpdk_restore_bufs(port, q, num_tx);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /* Now turn off ports */
+ _uhd_dpdk_disable_ports(t);
+
+ /* Now clean up before exiting */
+ int cleaning = -EAGAIN;
+ while (cleaning == -EAGAIN) {
+ cleaning = _uhd_dpdk_driver_cleanup(t);
+ }
+ return status;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
new file mode 100644
index 000000000..b0d3e42cd
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
@@ -0,0 +1,32 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_DRIVER_H_
+#define _UHD_DPDK_DRIVER_H_
+
+#include "uhd_dpdk_ctx.h"
+#include <rte_mbuf.h>
+#include <rte_arp.h>
+#include <rte_udp.h>
+#include <rte_ip.h>
+
+static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_addr)
+{
+ uint32_t network = port->netmask | ((~port->netmask) & dst_ipv4_addr);
+ return (network == 0xffffffff);
+}
+
+
+int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame);
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt);
+int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt);
+int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf);
+int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port,
+ uint32_t ip);
+
+int _uhd_dpdk_driver_main(void *arg);
+#endif /* _UHD_DPDK_DRIVER_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
new file mode 100644
index 000000000..3acc3d709
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
@@ -0,0 +1,306 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include <rte_malloc.h>
+#include <rte_ip.h>
+
+/************************************************
+ * I/O thread ONLY
+ *
+ * TODO: Decide whether to allow blocking on mutex
+ * This would cause the I/O thread to sleep, which isn't desireable
+ * Could throw in a "request completion cleanup" section in I/O thread's
+ * main loop, though. Just keep trying until the requesting thred is woken
+ * up. This would be to handle the case where the thread hadn't finished
+ * setting itself up to wait on the condition variable, but the I/O thread
+ * still got the request.
+ */
+int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval)
+{
+ req->retval = retval;
+ int stat = pthread_mutex_trylock(&req->mutex);
+ if (stat) {
+ RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__);
+ return stat;
+ }
+ stat = pthread_cond_signal(&req->cond);
+ pthread_mutex_unlock(&req->mutex);
+ if (stat) {
+ RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__);
+ return stat;
+ }
+ return 0;
+}
+
+int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req)
+{
+ int stat = 0;
+ switch (req->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ stat = _uhd_dpdk_udp_setup(req);
+ break;
+ default:
+ stat = -EINVAL;
+ _uhd_dpdk_config_req_compl(req, -EINVAL);
+ }
+ return stat;
+}
+
+int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req)
+{
+ int stat = 0;
+ switch (req->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ stat = _uhd_dpdk_udp_release(req);
+ break;
+ default:
+ stat = -EINVAL;
+ _uhd_dpdk_config_req_compl(req, -EINVAL);
+ }
+
+ return stat;
+}
+
+/************************************************
+ * API calls
+ */
+struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
+ enum uhd_dpdk_sock_type t, void *sockarg)
+{
+ if (!ctx || (t >= UHD_DPDK_SOCK_TYPE_COUNT)) {
+ return NULL;
+ }
+
+ struct uhd_dpdk_port *port = find_port(portid);
+ if (!port) {
+ return NULL;
+ }
+
+ if (!port->ipv4_addr) {
+ RTE_LOG(WARNING, EAL, "Please set IPv4 address for port %u before opening socket\n", portid);
+ return NULL;
+ }
+
+
+ struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req) {
+ return NULL;
+ }
+
+ struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0);
+ if (!s) {
+ goto sock_open_end;
+ }
+
+ s->port = port;
+ req->sock = s;
+ req->req_type = UHD_DPDK_SOCK_OPEN;
+ req->sock_type = t;
+ req->retval = -ETIMEDOUT;
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_condattr_t condattr;
+ pthread_condattr_init(&condattr);
+ pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC);
+ pthread_cond_init(&req->cond, &condattr);
+
+ switch (t) {
+ case UHD_DPDK_SOCK_UDP:
+ uhd_dpdk_udp_open(req, sockarg);
+ break;
+ default:
+ break;
+ }
+
+ if (req->retval) {
+ rte_free(s);
+ s = NULL;
+ }
+
+sock_open_end:
+ rte_free(req);
+ return s;
+}
+
+int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
+{
+ if (!ctx || !sock)
+ return -EINVAL;
+
+ struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req)
+ return -ENOMEM;
+ req->sock = sock;
+ req->req_type = UHD_DPDK_SOCK_CLOSE;
+ req->sock_type = sock->sock_type;
+ req->retval = -ETIMEDOUT;
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_cond_init(&req->cond, NULL);
+
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ uhd_dpdk_udp_close(req);
+ break;
+ default:
+ break;
+ }
+
+ if (req->retval) {
+ rte_free(req);
+ return req->retval;
+ }
+
+ rte_free(sock);
+ return 0;
+}
+
+/*
+ * TODO:
+ * Add blocking calls with timeout
+ * Implementation would involve a condition variable, like config reqs
+ * Also would create a cleanup section in I/O main loop (like config reqs)
+ */
+int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs)
+{
+ if (!sock || !bufs || !num_bufs) {
+ return -EINVAL;
+ }
+ *bufs = NULL;
+
+ if (!sock->tx_ring)
+ return -EINVAL;
+
+ unsigned int num_tx = rte_ring_count(sock->rx_ring);
+ num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
+ if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0)
+ return -ENOENT;
+ sock->tx_buf_count += num_tx;
+ return num_tx;
+}
+
+int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs)
+{
+ if (!sock || !bufs || !num_bufs)
+ return -EINVAL;
+ if (!sock->tx_ring)
+ return -EINVAL;
+ unsigned int num_tx = rte_ring_free_count(sock->tx_ring);
+ num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ for (unsigned int i = 0; i < num_tx; i++) {
+ uhd_dpdk_udp_prep(sock, bufs[i]);
+ }
+ break;
+ default:
+ RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__);
+ return -EINVAL;
+ }
+ int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL);
+ if (status == 0) {
+ RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n");
+ return status;
+ }
+ sock->tx_buf_count -= num_tx;
+ return num_tx;
+}
+
+/*
+ * TODO:
+ * Add blocking calls with timeout
+ */
+int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs, unsigned int timeout)
+{
+ if (!sock || !bufs || !num_bufs)
+ return -EINVAL;
+ if (!sock->rx_ring)
+ return -EINVAL;
+ unsigned int num_rx = rte_ring_count(sock->rx_ring);
+ num_rx = (num_rx < num_bufs) ? num_rx : num_bufs;
+ if (num_rx) {
+ unsigned int avail = 0;
+ unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring,
+ (void **) bufs, num_rx, &avail);
+ if (status == 0) {
+ RTE_LOG(ERR, USER1, "Invalid shared usage of RX ring detected\n");
+ RTE_LOG(ERR, USER1, "Requested %u, but %u available\n",
+ num_rx, avail);
+ return -ENOENT;
+ }
+ }
+ return num_rx;
+}
+
+void uhd_dpdk_free_buf(struct rte_mbuf *buf)
+{
+ rte_pktmbuf_free(buf);
+}
+
+void * uhd_dpdk_buf_to_data(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf)
+{
+ if (!sock || !buf)
+ return NULL;
+
+ /* TODO: Support for more types? */
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ return rte_pktmbuf_mtod_offset(buf, void *, sizeof(struct ether_hdr) +
+ sizeof(struct ipv4_hdr) +
+ sizeof(struct udp_hdr));
+ default:
+ return NULL;
+ }
+}
+
+
+int uhd_dpdk_get_len(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf)
+{
+ if (!sock || !buf)
+ return -EINVAL;
+
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct udp_hdr *hdr = (struct udp_hdr *) ((uint8_t *) uhd_dpdk_buf_to_data(sock, buf) - sizeof(struct udp_hdr));
+ if (!hdr)
+ return -EINVAL;
+
+ /* Report dgram length - header */
+ return ntohs(hdr->dgram_len) - 8;
+}
+
+int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf,
+ uint32_t *ipv4_addr)
+{
+ if (!sock || !buf || !ipv4_addr)
+ return -EINVAL;
+
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct ipv4_hdr *hdr = rte_pktmbuf_mtod_offset(buf, struct ipv4_hdr *,
+ sizeof(struct ether_hdr));
+
+ *ipv4_addr = hdr->src_addr;
+ return 0;
+}
+
+int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count)
+{
+ if (!sock)
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+ if (!sock->priv)
+ return -ENODEV;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ *count = pdata->dropped_pkts;
+ return 0;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
new file mode 100644
index 000000000..c07c1913a
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
@@ -0,0 +1,15 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_FOPS_H_
+#define _UHD_DPDK_FOPS_H_
+
+#include "uhd_dpdk_ctx.h"
+
+int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval);
+
+int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req);
+int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req);
+#endif /* _UHD_DPDK_FOPS_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
new file mode 100644
index 000000000..26cfd43e1
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -0,0 +1,456 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_driver.h"
+#include <rte_ring.h>
+#include <rte_malloc.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <arpa/inet.h>
+
+/************************************************
+ * I/O thread ONLY
+ */
+
+static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue)
+{
+ *queue = NULL;
+ struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
+ if (!q) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate TX queue\n", __func__);
+ return -ENOMEM;
+ }
+ q->tid = tid;
+ LIST_INIT(&q->tx_list);
+
+ char name[32];
+ snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid);
+ q->queue = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid);
+ q->freebufs = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ /* Set up retry queue */
+ snprintf(name, sizeof(name), "retry_queue_%u", port->id);
+ q->retry_queue = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+
+ if (!q->queue || !q->freebufs || !q->retry_queue) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate TX rings\n", __func__);
+ if (q->queue)
+ rte_ring_free(q->queue);
+ if (q->freebufs)
+ rte_ring_free(q->freebufs);
+ if (q->retry_queue)
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ return -ENOMEM;
+ }
+ struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
+ unsigned int num_bufs = rte_ring_free_count(q->freebufs);
+ int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
+ if (buf_stat) {
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->queue);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
+ return -ENOENT;
+ }
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
+ }
+ LIST_INSERT_HEAD(&port->txq_list, q, entry);
+ *queue = q;
+ return 0;
+}
+
+/* Finish setting up UDP socket (unless ARP needs to be done)
+ * Not multi-thread safe!
+ * This call should only be used by the thread servicing the port
+ * In addition, the code below assumes simplex sockets and unique receive ports
+ * FIXME: May need some way to help clean up abandoned socket requests (refcnt check...)
+ */
+int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
+{
+ int retval = 0;
+ struct uhd_dpdk_socket *sock = req->sock;
+ struct uhd_dpdk_udp_priv *pdata = sock->priv;
+ struct uhd_dpdk_port *port = req->sock->port;
+
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pdata->dst_port
+ };
+
+ /* Are we doing RX? */
+ if (sock->rx_ring) {
+ /* Add to rx table */
+ if (pdata->dst_port == 0) {
+ /* Assign unused one in a very slow fashion */
+ for (uint16_t i = 1; i > 0; i++) {
+ ht_key.dst_port = htons(i);
+ if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) {
+ pdata->dst_port = htons(i);
+ break;
+ }
+ }
+ }
+
+ /* Is the port STILL invalid? */
+ if (pdata->dst_port == 0) {
+ RTE_LOG(ERR, USER1, "%s: No available UDP ports\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -EADDRINUSE);
+ return -EADDRINUSE;
+ }
+
+ ht_key.dst_port = pdata->dst_port;
+ if (rte_hash_lookup(port->rx_table, &ht_key) > 0) {
+ RTE_LOG(ERR, USER1, "%s: Cannot add to RX table\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -EADDRINUSE);
+ return -EADDRINUSE;
+ }
+
+ char name[32];
+ snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port));
+ sock->rx_ring = rte_ring_create(
+ name,
+ UHD_DPDK_RXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ if (!sock->rx_ring) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate RX ring\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock);
+ if (retval != 0) {
+ RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval);
+ rte_ring_free(sock->rx_ring);
+ _uhd_dpdk_config_req_compl(req, retval);
+ return retval;
+ }
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+
+ /* Are we doing TX? */
+ if (sock->tx_ring) {
+ sock->tx_ring = NULL;
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (q->tid == sock->tid) {
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ sock->tx_ring = q->queue;
+ sock->rx_ring = q->freebufs;
+ break;
+ }
+ }
+ if (!sock->tx_ring) {
+ retval = _alloc_txq(port, sock->tid, &q);
+ if (retval) {
+ _uhd_dpdk_config_req_compl(req, retval);
+ return retval;
+ }
+ sock->tx_ring = q->queue;
+ sock->rx_ring = q->freebufs;
+ }
+ /* If a broadcast type, just finish setup and return */
+ if (is_broadcast(port, pdata->dst_ipv4_addr)) {
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ _uhd_dpdk_config_req_compl(req, 0);
+ return 0;
+ }
+ /* Otherwise... Check for entry in ARP table */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ int arp_table_stat = rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry);
+ if (entry) {
+ /* Check for null entry */
+ if ((entry->mac_addr.addr_bytes[0] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[1] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[2] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[3] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[4] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[5] == 0xFF)) {
+ arp_table_stat = -ENOENT;
+ }
+ } else {
+ /* No entry -> Add null entry */
+ entry = rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate ARP entry\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ memset(entry->mac_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
+ LIST_INIT(&entry->pending_list);
+
+ if (rte_hash_add_key_data(port->arp_table, &pdata->dst_ipv4_addr, entry) < 0) {
+ rte_free(entry);
+ RTE_LOG(ERR, USER1, "%s: Cannot add entry to ARP table\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ }
+
+ /* Was there a valid address? */
+ if (arp_table_stat == -ENOENT) {
+ /* Get valid address and have requestor continue waiting */
+ int arp_stat = 0;
+ do { /* Keep trying to send request if no descriptor */
+ arp_stat = _uhd_dpdk_arp_request(port, pdata->dst_ipv4_addr);
+ } while (arp_stat == -EAGAIN);
+
+ if (arp_stat) {
+ /* Config request errors out */
+ RTE_LOG(ERR, USER1, "%s: Cannot make ARP request\n", __func__);
+ _uhd_dpdk_config_req_compl(req, arp_stat);
+ return arp_stat;
+ }
+ /* Append req to pending list. Will signal later. */
+ LIST_INSERT_HEAD(&entry->pending_list, req, entry);
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ } else {
+ /* We have a valid address. All good. */
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+ }
+ return 0;
+}
+
+int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ struct uhd_dpdk_port *port = req->sock->port;
+ struct uhd_dpdk_config_req *conf_req = NULL;
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (sock->tx_ring) {
+ /* Remove from tx_list */
+ LIST_REMOVE(sock, tx_entry);
+ /* Check for entry in ARP table */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry);
+ if (entry) {
+ LIST_FOREACH(conf_req, &entry->pending_list, entry) {
+ if (conf_req->sock == sock) {
+ LIST_REMOVE(conf_req, entry);
+ break;
+ }
+ }
+ }
+
+ /* Add outstanding buffers back to TX queue's freebufs */
+ struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ }
+
+ unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
+ if (enqd != (unsigned int) sock->tx_buf_count) {
+ RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
+ return status;
+ }
+ } else if (sock->rx_ring) {
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pdata->dst_port
+ };
+ rte_hash_del_key(port->rx_table, &ht_key);
+ struct rte_mbuf *mbuf = NULL;
+ while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) {
+ rte_pktmbuf_free(mbuf);
+ }
+ rte_ring_free(sock->rx_ring);
+ }
+
+ _uhd_dpdk_config_req_compl(req, 0);
+ return 0;
+}
+
+/* Configure a socket for UDP
+ * TODO: Make sure EVERYTHING is configured in socket
+ * FIXME: Make all of this part of the user thread, except hash table interaction
+ * and list handling
+ */
+void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
+ struct uhd_dpdk_sockarg_udp *arg)
+{
+ if (!req)
+ return;
+
+ if (!arg) {
+ req->retval = -EINVAL;
+ return;
+ }
+
+ struct uhd_dpdk_socket *sock = req->sock;
+ pid_t tid = syscall(__NR_gettid);
+ sock->tid = tid;
+
+ /* Create private data */
+ struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0);
+ if (!data) {
+ req->retval = -ENOMEM;
+ return;
+ }
+ sock->priv = data;
+
+ data->dst_ipv4_addr = arg->dst_addr;
+ if (arg->is_tx) {
+ data->src_port = arg->local_port;
+ data->dst_port = arg->remote_port;
+ sock->tx_ring = (struct rte_ring *) sock;
+ } else {
+ data->src_port = arg->remote_port;
+ data->dst_port = arg->local_port;
+ sock->rx_ring = (struct rte_ring *) sock;
+ }
+
+ /* TODO: Add support for I/O thread calling (skip locking and sleep) */
+ /* Add to port's config queue */
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(data);
+ req->retval = -ENOSPC;
+ return;
+ }
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += 1;
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+
+ if (req->retval)
+ rte_free(data);
+}
+
+void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req)
+{
+ if (!req)
+ return;
+
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(req->sock->priv);
+ req->retval = -ENOSPC;
+ return;
+ }
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += 1;
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(req->sock->priv);
+}
+
+/*
+ * Note: I/O thread will fill in destination MAC address (doesn't happen here)
+ */
+static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port,
+ struct rte_mbuf *mbuf,
+ uint32_t dst_ipv4_addr,
+ uint8_t proto_id,
+ uint32_t payload_len)
+{
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+
+ ether_addr_copy(&port->mac_addr, &eth_hdr->s_addr);
+ eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+
+ ip_hdr->version_ihl = 0x40 | 5;
+ ip_hdr->type_of_service = 0;
+ ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len);
+ ip_hdr->packet_id = 0;
+ ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG);
+ ip_hdr->time_to_live = 64;
+ ip_hdr->next_proto_id = proto_id;
+ ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */
+ ip_hdr->src_addr = port->ipv4_addr;
+ ip_hdr->dst_addr = dst_ipv4_addr;
+
+ mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+ mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+}
+
+int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf)
+{
+ struct ether_hdr *eth_hdr;
+ struct ipv4_hdr *ip_hdr;
+ struct udp_hdr *tx_hdr;
+ struct uhd_dpdk_port *port = sock->port;
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+
+ if (unlikely(mbuf == NULL || pdata == NULL || port == NULL))
+ return -EINVAL;
+
+ uint32_t udp_data_len = mbuf->data_len;
+ uhd_dpdk_ipv4_prep(port,
+ mbuf,
+ pdata->dst_ipv4_addr,
+ 0x11,
+ 8 + udp_data_len);
+
+ eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+ tx_hdr = (struct udp_hdr *) &ip_hdr[1];
+
+ tx_hdr->src_port = pdata->src_port;
+ tx_hdr->dst_port = pdata->dst_port;
+ tx_hdr->dgram_len = rte_cpu_to_be_16(8 + udp_data_len);
+ tx_hdr->dgram_cksum = 0;
+
+ return 0;
+}
+
+int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_sockarg_udp *sockarg)
+{
+ if (unlikely(sock == NULL || sockarg == NULL))
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (sock->tx_ring) {
+ sockarg->is_tx = true;
+ sockarg->local_port = pdata->src_port;
+ sockarg->remote_port = pdata->dst_port;
+ sockarg->dst_addr = pdata->dst_ipv4_addr;
+ } else {
+ sockarg->is_tx = false;
+ sockarg->local_port = pdata->dst_port;
+ sockarg->remote_port = pdata->src_port;
+ sockarg->dst_addr = 0;
+ }
+ return 0;
+}
+
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
new file mode 100644
index 000000000..651ae144e
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_UDP_H_
+#define _UHD_DPDK_UDP_H_
+
+#include "uhd_dpdk_ctx.h"
+#include <rte_udp.h>
+
+struct uhd_dpdk_udp_priv {
+ uint16_t src_port;
+ uint16_t dst_port;
+ uint32_t dst_ipv4_addr;
+ uint32_t dropped_pkts;
+ /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */
+ //struct uhd_dpdk_arp_entry *arp_entry;
+};
+
+int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req);
+int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req);
+
+void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
+ struct uhd_dpdk_sockarg_udp *arg);
+void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req);
+
+int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf);
+#endif /* _UHD_DPDK_UDP_H_ */