aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2018-03-07 15:24:04 -0800
committerMartin Braun <martin.braun@ettus.com>2018-08-29 15:52:00 -0700
commit2084a5a72df45fbcda82839ea35486f8583227bc (patch)
tree150695c61e0d2ec3eee906da87dd0a9e5546d725 /host
parent3f39388059546d44e4302e098fc241f1a71e6d4e (diff)
downloaduhd-2084a5a72df45fbcda82839ea35486f8583227bc.tar.gz
uhd-2084a5a72df45fbcda82839ea35486f8583227bc.tar.bz2
uhd-2084a5a72df45fbcda82839ea35486f8583227bc.zip
uhd-dpdk: Add DPDK-based sockets-like library
This library makes available a userspace network stack with a socket-like interface for applications (except the sockets pass around pointers to buffers and use the buffers directly--It's sockets + a put/get for buffer management). Supported services are ARP and UDP. Destinations can be unicast or broadcast. Multicast is not currently supported. The implementation has two driver layers. The upper layer runs within the caller's context. The caller will make requests through lockless ring buffers (including socket creation and packet transmission), and the lower layer will implement the requests and provide a response. Currently, the lower layer runs in a separate I/O thread, and the caller will block until it receives a response. The I/O thread's main body is in src/uhd_dpdk_driver.c. You'll find that all I/O thread functions are prefixed by an underscore, and user thread functions do not. src/uhd_dpdk.c is used to initialize uhd-dpdk and bring up the network interfaces. src/uhd_dpdk_fops.c and src/uhd_dpdk_udp.c are for network services. The test is a benchmark of a flow control loop using a certain made-up protocol with credits and sequence number tracking.
Diffstat (limited to 'host')
-rw-r--r--host/cmake/Modules/FindDPDK.cmake32
-rw-r--r--host/lib/CMakeLists.txt2
-rw-r--r--host/lib/include/uhd/transport/uhd-dpdk.h249
-rw-r--r--host/lib/transport/CMakeLists.txt6
-rw-r--r--host/lib/transport/uhd-dpdk/CMakeLists.txt36
-rw-r--r--host/lib/transport/uhd-dpdk/test/Makefile53
-rw-r--r--host/lib/transport/uhd-dpdk/test/test.c303
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk.c363
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h253
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c401
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h32
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c306
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h15
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c456
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h30
15 files changed, 2535 insertions, 2 deletions
diff --git a/host/cmake/Modules/FindDPDK.cmake b/host/cmake/Modules/FindDPDK.cmake
new file mode 100644
index 000000000..4382a8cce
--- /dev/null
+++ b/host/cmake/Modules/FindDPDK.cmake
@@ -0,0 +1,32 @@
+#
+# Copyright 2018 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# - Find DPDK
+# Find the DPDK includes and client library
+# This module defines
+# DPDK_INCLUDE_DIR, where to find rte_config.h
+# DPDK_LIBRARIES, the libraries needed by a DPDK user
+# DPDK_FOUND, If false, do not try to use DPDK.
+# also defined, but not for general use are
+# DPDK_LIBRARY, where to find the DPDK library.
+
+include(FindPackageHandleStandardArgs)
+
+find_path ( DPDK_INCLUDE_DIR rte_config.h
+ PATHS ENV RTE_INCLUDE
+ PATH_SUFFIXES dpdk
+)
+
+find_library(DPDK_LIBRARY
+ PATHS $ENV{RTE_SDK_DIR}/$ENV{RTE_TARGET}/lib
+)
+
+list(APPEND DPDK_LIBRARIES dpdk)
+
+find_package_handle_standard_args(dpdk
+ DEFAULT_MSG
+ DPDK_INCLUDE_DIR
+ DPDK_LIBRARIES
+)
diff --git a/host/lib/CMakeLists.txt b/host/lib/CMakeLists.txt
index 4461612de..99598570e 100644
--- a/host/lib/CMakeLists.txt
+++ b/host/lib/CMakeLists.txt
@@ -62,6 +62,7 @@ MESSAGE(STATUS "")
FIND_PACKAGE(USB1)
FIND_PACKAGE(GPSD)
FIND_PACKAGE(LIBERIO)
+FIND_PACKAGE(DPDK)
LIBUHD_REGISTER_COMPONENT("LIBERIO" ENABLE_LIBERIO ON "ENABLE_LIBUHD;LIBERIO_FOUND" OFF OFF)
LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF)
LIBUHD_REGISTER_COMPONENT("GPSD" ENABLE_GPSD OFF "ENABLE_LIBUHD;ENABLE_GPSD;LIBGPS_FOUND" OFF OFF)
@@ -77,6 +78,7 @@ LIBUHD_REGISTER_COMPONENT("MPMD" ENABLE_MPMD ON "ENABLE_LIBUHD" OFF OFF)
LIBUHD_REGISTER_COMPONENT("N300" ENABLE_N300 ON "ENABLE_LIBUHD;ENABLE_MPMD" OFF OFF)
LIBUHD_REGISTER_COMPONENT("E320" ENABLE_E320 ON "ENABLE_LIBUHD;ENABLE_MPMD" OFF OFF)
LIBUHD_REGISTER_COMPONENT("OctoClock" ENABLE_OCTOCLOCK ON "ENABLE_LIBUHD" OFF OFF)
+LIBUHD_REGISTER_COMPONENT("DPDK" ENABLE_DPDK ON "ENABLE_MPMD;DPDK_FOUND" OFF OFF)
########################################################################
# Include subdirectories (different than add)
diff --git a/host/lib/include/uhd/transport/uhd-dpdk.h b/host/lib/include/uhd/transport/uhd-dpdk.h
new file mode 100644
index 000000000..5f74ee9b4
--- /dev/null
+++ b/host/lib/include/uhd/transport/uhd-dpdk.h
@@ -0,0 +1,249 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_H_
+#define _UHD_DPDK_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <rte_mbuf.h>
+
+/* For MAC address */
+struct eth_addr {
+ uint8_t addr[6];
+};
+
+/* Opaque type representing a socket
+ * May NOT be shared between threads
+ */
+struct uhd_dpdk_socket;
+
+/* Only support UDP sockets currently */
+enum uhd_dpdk_sock_type {
+ UHD_DPDK_SOCK_UDP = 0,
+ UHD_DPDK_SOCK_TYPE_COUNT
+};
+
+/**
+ * Init UHD-DPDK environment and bring up ports (link UP).
+ *
+ * Offload capabilities will be used if available
+ *
+ * @param argc passed directly to rte_eal_init()
+ * @param argv passed directly to rte_eal_init()
+ * @param num_ports number of network interfaces to map
+ * @param port_thread_mapping array of num_ports entries specifying which thread
+ * will drive the I/O for a given port (determined by array index)
+ * @param num_mbufs number of packets in each packet buffer pool (multiplied by num_ports)
+ * There is one RX and one TX buffer pool per CPU socket
+ * @param mbuf_cache_size Number of packet buffers to put in core-local cache
+ * @param mtu Maximum frame size
+ *
+ * @return Returns negative error code if there were issues, else 0
+ */
+int uhd_dpdk_init(int argc, char **argv, unsigned int num_ports,
+ int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
+ int mtu);
+
+/**
+ * @return Returns number of ports registered to DPDK.
+ * Returns negative error value if uhd-dpdk hasn't been init'd
+ */
+int uhd_dpdk_port_count(void);
+
+/**
+ * @return Returns Ethernet MAC address of requested port
+ *
+ * @param portid ID number of network interface
+ */
+struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid);
+
+/**
+ * Get IPv4 address of requested port
+ *
+ * @param portid ID number of network interface
+ * @param ipv4_addr pointer to uint32_t where ipv4 address is stored
+ * Must be non-NULL
+ * @param netmask pointer to uint32_t where netmask is stored
+ * May be left NULL
+ *
+ * @return Returns
+ * 0 = success
+ * nonzero = failure
+ */
+int uhd_dpdk_get_ipv4_addr(unsigned int portid, uint32_t *ipv4_addr, uint32_t *netmask);
+
+/**
+ * Sets IPv4 address of requested port
+ *
+ * @param portid ID number of network interface
+ * @param ipv4_addr must be in network format
+ * @param netmask must be in network format
+ *
+ * @return Return values:
+ * 0 = success
+ * nonzero = failure
+ */
+int uhd_dpdk_set_ipv4_addr(unsigned int portid, uint32_t ipv4_addr, uint32_t netmask);
+
+/**
+ * Create new socket of type sock_type on port portid
+ * Copies needed info from sockarg
+ * Do NOT share struct uhd_dpdk_socket between threads!
+ *
+ * @param portid ID number of network interface
+ * @param t Type of socket to create (only UDP supported currently)
+ * @param sockarg Pointer to arguments for corresponding socket type
+ *
+ * @return Returns pointer to socket structure on success, else NULL
+ */
+struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
+ enum uhd_dpdk_sock_type t, void *sockarg);
+
+/**
+ * Close socket created by uhd_dpdk_sock_open
+ *
+ * Note: Outstanding packet buffers must still be freed by user
+ *
+ * @param sock Socket to close
+ *
+ * @return Returns
+ * 0 = success
+ * nonzero = failure
+ */
+int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock);
+
+/**
+ * Arguments for a UDP socket
+ * All data should be provided in network format
+ */
+struct uhd_dpdk_sockarg_udp {
+ /*! True for TX socket, false for RX socket */
+ bool is_tx;
+ /*! Local udp port. This is dst_port for RX, src_port for TX */
+ uint16_t local_port;
+ /*! Remote udp port. This is dst_port for TX */
+ uint16_t remote_port;
+ /*! IPv4 address for destination (TX) */
+ uint32_t dst_addr;
+};
+
+/**
+ * Brings all ports and threads down in preparation for a clean program exit
+ *
+ * All sockets will need to be closed by the user for a thread to terminate in
+ * this function.
+ */
+int uhd_dpdk_destroy(void);
+
+/**
+ * Requests num_bufs buffers from sock. Places pointers to buffers in bufs table.
+ *
+ * @param sock pointer to socket
+ * @param bufs pointer to array of buffers (to store buffer locations)
+ * @param num_bufs number of buffers requested
+ *
+ * @return Returns number of buffers retrieved or negative error code
+ */
+int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs);
+
+/**
+ * Enqueues num_bufs buffers in sock TX buffer. Uses pointers to buffers in bufs table.
+ *
+ * @param sock pointer to socket
+ * @param bufs pointer to array of buffers (to retrieve buffer locations)
+ * @param num_bufs number of buffers requested
+ *
+ * @return Returns number of buffers enqueued or negative error code
+ */
+int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs);
+
+/**
+ * Dequeues num_bufs buffers from sock RX buffer. Uses pointers to buffers in bufs table.
+ *
+ * @param sock pointer to socket
+ * @param bufs pointer to array of buffers (to store buffer locations)
+ * @param num_bufs number of buffers requested
+ * @param timeout Time (in us) to wait for a packet
+ *
+ * @return Returns number of buffers dequeued or negative error code
+ *
+ * NOTE: MUST free buffers with uhd_dpdk_free_buf once finished
+ */
+int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs, unsigned int timeout);
+
+/**
+ * Frees buffer previously received from uhd_dpdk_recv
+ * (or unused ones from uhd_dpdk_request_tx_bufs)
+ *
+ * @param buf pointer to packet buffer
+ */
+void uhd_dpdk_free_buf(struct rte_mbuf *buf);
+
+/**
+ * Returns pointer to start of data segment of packet buffer
+ *
+ * @param sock Socket associated with packet buffer
+ * @param buf pointer to packet buffer
+ */
+void * uhd_dpdk_buf_to_data(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf);
+
+/**
+ * Returns size of data segment of packet buffer (in bytes)
+ *
+ * This is protocol-dependent. A UDP socket will return the UDP payload size.
+ *
+ * @param sock Socket associated with packet buffer
+ * @param buf pointer to packet buffer
+ *
+ * @return Return 0 for success, else failed
+ */
+int uhd_dpdk_get_len(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf);
+
+/**
+ * Get IPv4 address of sender (for UDP RX socket)
+ *
+ * @param sock Socket associated with packet buffer
+ * @param buf pointer to packet buffer
+ * @param ipv4_addr pointer to buffer where ipv4 address will be written
+ *
+ * @return Return 0 for success, else failed
+ */
+int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf,
+ uint32_t *ipv4_addr);
+
+/**
+ * Get info (local port, remote port, dst addr, etc.) for UDP socket
+ *
+ * @param sock Socket to get information from
+ * @param sockarg Pointer to location where information will be stored
+ *
+ * @return Return 0 for success, else failed
+ */
+int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, struct uhd_dpdk_sockarg_udp *sockarg);
+
+
+/***********************************************
+ * Statistics
+ ***********************************************/
+/**
+ * Get dropped packet count of provided socket
+ *
+ * @param sock Socket to get information from
+ * @param count Pointer to location where information will be stored
+ *
+ * @return Return 0 for success, else failed
+ */
+int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* _UHD_DPDK_H_ */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 15771697a..7c79bc67c 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -130,8 +130,6 @@ IF(ENABLE_X300)
ENDIF(ENABLE_X300)
IF(ENABLE_LIBERIO)
- MESSAGE(STATUS "")
- MESSAGE(STATUS "liberio support enabled.")
INCLUDE_DIRECTORIES(${LIBERIO_INCLUDE_DIRS})
LIBUHD_APPEND_LIBS(${LIBERIO_LIBRARIES})
LIBUHD_APPEND_SOURCES(
@@ -139,6 +137,10 @@ IF(ENABLE_LIBERIO)
)
ENDIF(ENABLE_LIBERIO)
+IF(ENABLE_DPDK)
+ INCLUDE_SUBDIRECTORY(uhd-dpdk)
+ENDIF(ENABLE_DPDK)
+
# Verbose Debug output for send/recv
SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" )
OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" )
diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt
new file mode 100644
index 000000000..be683aaf2
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Copyright 2018 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0
+#
+
+########################################################################
+# Add the subdirectories
+########################################################################
+if(ENABLE_DPDK)
+ if(NOT DEFINED UHD_DPDK_CFLAGS)
+ message(STATUS "")
+ set(UHD_DPDK_CFLAGS "-march=native"
+ CACHE STRING "CFLAGS to use when building uhd-dpdk sources")
+ message(STATUS "DPDK: Using default UHD_DPDK_CFLAGS=" ${UHD_DPDK_CFLAGS})
+ endif(NOT DEFINED UHD_DPDK_CFLAGS)
+
+ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
+
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c
+ )
+ set_source_files_properties(
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c
+ PROPERTIES COMPILE_FLAGS ${UHD_DPDK_CFLAGS}
+ )
+ include_directories(${DPDK_INCLUDE_DIR})
+ LIBUHD_APPEND_LIBS(${DPDK_LIBRARIES})
+endif(ENABLE_DPDK)
+
diff --git a/host/lib/transport/uhd-dpdk/test/Makefile b/host/lib/transport/uhd-dpdk/test/Makefile
new file mode 100644
index 000000000..9d6e60372
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/test/Makefile
@@ -0,0 +1,53 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = test
+
+# all source are stored in SRCS-y
+SRCS-y := test.c
+
+CFLAGS += -O0 -g
+CFLAGS += $(WERROR_FLAGS)
+
+EXTRA_CFLAGS=-I${S}/../include
+EXTRA_LDFLAGS=-L${O}/lib -luhd-dpdk
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/host/lib/transport/uhd-dpdk/test/test.c b/host/lib/transport/uhd-dpdk/test/test.c
new file mode 100644
index 000000000..c324561de
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/test/test.c
@@ -0,0 +1,303 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+/**
+ * Benchmark program to check performance of 2 simultaneous links
+ */
+
+
+#include <uhd-dpdk.h>
+#include <stdio.h>
+#include <stdbool.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <arpa/inet.h>
+
+#define NUM_MBUFS 4095
+#define MBUF_CACHE_SIZE 315
+#define BURST_SIZE 64
+
+#define NUM_PORTS 2
+#define TX_CREDITS 28
+#define RX_CREDITS 64
+#define BENCH_SPP 700
+#define BENCH_IFG 220
+
+
+static uint32_t last_seqno[NUM_PORTS];
+static uint32_t dropped_packets[NUM_PORTS];
+static uint32_t lasts[NUM_PORTS][16], drops[NUM_PORTS][16];
+static uint32_t last_ackno[NUM_PORTS];
+static uint32_t tx_seqno[NUM_PORTS];
+static uint64_t tx_xfer[NUM_PORTS];
+
+static void process_udp(int id, uint32_t *udp_data)
+{
+ if (udp_data[0] != last_seqno[id] + 1) {
+ lasts[id][dropped_packets[id] & 0xf] = last_seqno[id];
+ drops[id][dropped_packets[id] & 0xf] = udp_data[0];
+ dropped_packets[id]++;
+ }
+
+ last_seqno[id] = udp_data[0];
+ last_ackno[id] = udp_data[1];
+}
+
+static void send_ctrl(struct uhd_dpdk_socket *sock, uint32_t run)
+{
+ struct rte_mbuf *mbuf = NULL;
+ uhd_dpdk_request_tx_bufs(sock, &mbuf, 1);
+ if (unlikely(mbuf == NULL))
+ return;
+ uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf);
+ tx_data[0] = (BENCH_SPP << 16) | (TX_CREDITS << 4) | run; // spp, tx_credits, run
+ tx_data[1] = (RX_CREDITS << 16) | (BENCH_IFG); // credits, ifg
+ mbuf->pkt_len = 8;
+ mbuf->data_len = 8;
+ uhd_dpdk_send(sock, &mbuf, 1);
+}
+
+static void send_udp(struct uhd_dpdk_socket *sock, int id, bool fc_only)
+{
+ struct rte_mbuf *mbuf = NULL;
+ uhd_dpdk_request_tx_bufs(sock, &mbuf, 1);
+ if (unlikely(mbuf == NULL))
+ return;
+ uint32_t *tx_data = uhd_dpdk_buf_to_data(sock, mbuf);
+ tx_data[0] = fc_only ? tx_seqno[id] - 1 : tx_seqno[id];
+ tx_data[1] = last_seqno[id];
+ if (!fc_only) {
+ memset(&tx_data[2], last_seqno[id], 8*BENCH_SPP);
+ tx_xfer[id] += 8*BENCH_SPP;
+ }
+ mbuf->pkt_len = 8 + (fc_only ? 0 : 8*BENCH_SPP);
+ mbuf->data_len = 8 + (fc_only ? 0 : 8*BENCH_SPP);
+
+ uhd_dpdk_send(sock, &mbuf, 1);
+
+ if (!fc_only) {
+ tx_seqno[id]++;
+ }
+}
+
+static void bench(struct uhd_dpdk_socket **tx, struct uhd_dpdk_socket **rx, struct uhd_dpdk_socket **ctrl, uint32_t nb_ports)
+{
+ uint64_t total_xfer[NUM_PORTS];
+ uint32_t id;
+ for (id = 0; id < nb_ports; id++) {
+ tx_seqno[id] = 1;
+ tx_xfer[id] = 0;
+ last_ackno[id] = 0;
+ last_seqno[id] = 0;
+ dropped_packets[id] = 0;
+ total_xfer[id] = 0;
+ }
+ sleep(1);
+ struct timeval bench_start, bench_end;
+ gettimeofday(&bench_start, NULL);
+ for (id = 0; id < nb_ports; id++) {
+ send_ctrl(ctrl[id], 0);
+ for (int pktno = 0; (pktno < TX_CREDITS*3/4); pktno++) {
+ send_udp(tx[id], id, false);
+ }
+ }
+ for (id = 0; id < nb_ports; id++) {
+ send_ctrl(ctrl[id], 1);
+ }
+ /*
+ * The test...
+ */
+ uint64_t total_received = 0;
+ uint32_t consec_no_rx = 0;
+ while (total_received < 1000000 ) { //&& consec_no_rx < 10000) {
+ for (id = 0; id < nb_ports; id++) {
+
+ /* Get burst of RX packets, from first port of pair. */
+ struct rte_mbuf *bufs[BURST_SIZE];
+ const int64_t nb_rx = uhd_dpdk_recv(rx[id], bufs, BURST_SIZE);
+
+ if (unlikely(nb_rx <= 0)) {
+ consec_no_rx++;
+ if (consec_no_rx >= 100000) {
+ uint32_t skt_drops = 0;
+ uhd_dpdk_get_drop_count(rx[id], &skt_drops);
+ printf("TX seq %d, TX ack %d, RX seq %d, %d, drops!\n", tx_seqno[id], last_ackno[id], last_seqno[id], skt_drops);
+ consec_no_rx = 0;
+ break;
+ }
+ continue;
+ } else {
+ consec_no_rx = 0;
+ }
+
+ for (int buf = 0; buf < nb_rx; buf++) {
+ total_xfer[id] += bufs[buf]->pkt_len;
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ uint32_t *data = (uint32_t *) uhd_dpdk_buf_to_data(rx[id], bufs[buf]);
+ if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* FIXME: Deprecated/changed in later release */
+ printf("Buf %d: Bad IP cksum\n", buf);
+ } else {
+ process_udp(id, data);
+ }
+ }
+
+ /* Free buffers. */
+ for (int buf = 0; buf < nb_rx; buf++)
+ uhd_dpdk_free_buf(bufs[buf]);
+ total_received += nb_rx;
+ }
+
+ for (id = 0; id < nb_ports; id++) {
+ /* TX portion */
+ uint32_t window_end = last_ackno[id] + TX_CREDITS;
+ //uint32_t window_end = last_seqno[port] + TX_CREDITS;
+ if (window_end <= tx_seqno[id]) {
+ if (consec_no_rx == 9999) {
+ send_udp(tx[id], id, true);
+ }
+ //send_udp(tx[id], id, true);
+ ;
+ } else {
+ for (int pktno = 0; (pktno < BURST_SIZE) && (tx_seqno[id] < window_end); pktno++) {
+ send_udp(tx[id], id, false);
+ }
+ }
+ }
+ }
+ for (id = 0; id < nb_ports; id++) {
+ send_ctrl(ctrl[id], 0);
+ }
+ gettimeofday(&bench_end, NULL);
+ printf("Benchmark complete\n\n");
+
+ for (id = 0; id < nb_ports; id++) {
+ printf("\n");
+ printf("Bytes received = %ld\n", total_xfer[id]);
+ printf("Bytes sent = %ld\n", tx_xfer[id]);
+ printf("Time taken = %ld us\n", (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec));
+ double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_usec - bench_start.tv_usec);
+ elapsed_time *= 1.0e-6;
+ double elapsed_bytes = total_xfer[id];
+ printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
+ elapsed_bytes = tx_xfer[id];
+ printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
+ uint32_t skt_drops = 0;
+ uhd_dpdk_get_drop_count(rx[id], &skt_drops);
+ printf("Dropped %d packets\n", dropped_packets[id]);
+ printf("Socket reports dropped %d packets\n", skt_drops);
+ for (unsigned int i = 0; i < 16; i++) {
+ if (i >= dropped_packets[id])
+ break;
+ printf("Last(%u), Recv(%u)\n", lasts[id][i], drops[id][i]);
+ }
+ //printf("%d missed/dropped packets\n", errors);
+ printf("\n");
+ }
+
+}
+
+int main(int argc, char **argv)
+{
+ int port_thread_mapping[2] = {1, 1};
+ int status = uhd_dpdk_init(argc, argv, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE);
+ if (status) {
+ printf("%d: Something wrong?\n", status);
+ return status;
+ }
+
+ uint32_t eth_ip = htonl(0xc0a80008);
+ uint32_t eth_mask = htonl(0xffffff00);
+ status = uhd_dpdk_set_ipv4_addr(0, eth_ip, eth_mask);
+ if (status) {
+ printf("Error while setting IP0: %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_set_ipv4_addr(1, eth_ip, eth_mask);
+ if (status) {
+ printf("Error while setting IP1: %d\n", status);
+ return status;
+ }
+
+ struct uhd_dpdk_socket *eth_rx[2];
+ struct uhd_dpdk_socket *eth_tx[2];
+ struct uhd_dpdk_socket *eth_ctrl[2];
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .local_port = htons(0xBEE7),
+ .remote_port = htons(0xBEE7),
+ .dst_addr = htonl(0xc0a80004)
+ };
+ eth_rx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_rx[0]) {
+ printf("!eth0_rx\n");
+ return -ENODEV;
+ }
+ eth_rx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_rx[1]) {
+ printf("!eth1_rx\n");
+ return -ENODEV;
+ }
+
+ sockarg.is_tx = true;
+ eth_tx[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_tx[0]) {
+ printf("!eth0_tx\n");
+ return -ENODEV;
+ }
+ eth_tx[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_tx[1]) {
+ printf("!eth1_tx\n");
+ return -ENODEV;
+ }
+
+ sockarg.local_port = htons(0xB4D);
+ sockarg.remote_port = htons(0xB4D);
+ eth_ctrl[0] = uhd_dpdk_sock_open(0, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_ctrl[0]) {
+ printf("!eth0_ctrl\n");
+ return -ENODEV;
+ }
+ eth_ctrl[1] = uhd_dpdk_sock_open(1, UHD_DPDK_SOCK_UDP, &sockarg);
+ if (!eth_ctrl[1]) {
+ printf("!eth1_ctrl\n");
+ return -ENODEV;
+ }
+
+ bench(eth_tx, eth_rx, eth_ctrl, 2);
+
+ status = uhd_dpdk_sock_close(eth_rx[0]);
+ if (status) {
+ printf("Bad close RX0 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_rx[1]);
+ if (status) {
+ printf("Bad close RX1 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_tx[0]);
+ if (status) {
+ printf("Bad close TX0 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_tx[1]);
+ if (status) {
+ printf("Bad close TX1 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_ctrl[0]);
+ if (status) {
+ printf("Bad close Ctrl0 %d\n", status);
+ return status;
+ }
+ status = uhd_dpdk_sock_close(eth_ctrl[1]);
+ if (status) {
+ printf("Bad close Ctrl1 %d\n", status);
+ return status;
+ }
+ return status;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
new file mode 100644
index 000000000..2ee74a201
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
@@ -0,0 +1,363 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_ctx.h"
+#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_driver.h"
+#include <stdlib.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_log.h>
+
+/* FIXME: Replace with configurable values */
+#define DEFAULT_RING_SIZE 512
+
+/* FIXME: This needs to be protected */
+struct uhd_dpdk_ctx *ctx = NULL;
+
+/**
+ * TODO: Probably should provide way to get access to thread for a given port
+ * UHD's first calling thread will be the master thread
+ * In UHD, maybe check thread, and if it is different, pass work to that thread and optionally wait() on it (some condition variable)
+ */
+
+/* TODO: For nice scheduling options later, make sure to separate RX and TX activity */
+
+
+int uhd_dpdk_port_count(void)
+{
+ if (!ctx)
+ return -ENODEV;
+ return ctx->num_ports;
+}
+
+struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid)
+{
+ struct eth_addr retval;
+ memset(retval.addr, 0xff, ETHER_ADDR_LEN);
+
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN);
+ }
+ return retval;
+}
+
+int uhd_dpdk_get_ipv4_addr(unsigned int portid, uint32_t *ipv4_addr, uint32_t *netmask)
+{
+ if (!ipv4_addr)
+ return -EINVAL;
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ *ipv4_addr = p->ipv4_addr;
+ if (netmask) {
+ *netmask = p->netmask;
+ }
+ return 0;
+ }
+ return -ENODEV;
+}
+
+int uhd_dpdk_set_ipv4_addr(unsigned int portid, uint32_t ipv4_addr, uint32_t netmask)
+{
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ p->ipv4_addr = ipv4_addr;
+ p->netmask = netmask;
+ return 0;
+ }
+ return -ENODEV;
+}
+
+/*
+ * Initialize a given port using default settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ * FIXME: Starting with assumption of one thread/core per port
+ */
+static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port,
+ struct rte_mempool *rx_mbuf_pool,
+ unsigned int mtu)
+{
+ int retval;
+
+ /* Check for a valid port */
+ if (port->id >= rte_eth_dev_count())
+ return -ENODEV;
+
+ /* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */
+ /* FIXME: Check if hw_ip_checksum is possible */
+ struct rte_eth_conf port_conf = {
+ .rxmode = {
+ .max_rx_pkt_len = mtu,
+ .jumbo_frame = 1,
+ .hw_ip_checksum = 1,
+ }
+ };
+ retval = rte_eth_dev_configure(port->id, 1, 1, &port_conf);
+ if (retval != 0)
+ return retval;
+
+ retval = rte_eth_rx_queue_setup(port->id, 0, DEFAULT_RING_SIZE,
+ rte_eth_dev_socket_id(port->id), NULL, rx_mbuf_pool);
+ if (retval < 0)
+ return retval;
+
+ retval = rte_eth_tx_queue_setup(port->id, 0, DEFAULT_RING_SIZE,
+ rte_eth_dev_socket_id(port->id), NULL);
+ if (retval < 0)
+ goto port_init_fail;
+
+ /* Create the hash table for the RX sockets */
+ char name[32];
+ snprintf(name, sizeof(name), "rx_table_%u", port->id);
+ struct rte_hash_parameters hash_params = {
+ .name = name,
+ .entries = UHD_DPDK_MAX_SOCKET_CNT,
+ .key_len = sizeof(struct uhd_dpdk_ipv4_5tuple),
+ .hash_func = NULL,
+ .hash_func_init_val = 0,
+ };
+ port->rx_table = rte_hash_create(&hash_params);
+ if (port->rx_table == NULL) {
+ retval = rte_errno;
+ goto port_init_fail;
+ }
+
+ /* Create ARP table */
+ snprintf(name, sizeof(name), "arp_table_%u", port->id);
+ hash_params.name = name;
+ hash_params.entries = UHD_DPDK_MAX_SOCKET_CNT;
+ hash_params.key_len = sizeof(uint32_t);
+ hash_params.hash_func = NULL;
+ hash_params.hash_func_init_val = 0;
+ port->arp_table = rte_hash_create(&hash_params);
+ if (port->arp_table == NULL) {
+ retval = rte_errno;
+ goto free_rx_table;
+ }
+
+ /* Set up list for TX queues */
+ LIST_INIT(&port->txq_list);
+
+ /* Start the Ethernet port. */
+ retval = rte_eth_dev_start(port->id);
+ if (retval < 0) {
+ goto free_arp_table;
+ }
+
+ /* Display the port MAC address. */
+ rte_eth_macaddr_get(port->id, &port->mac_addr);
+ RTE_LOG(INFO, EAL, "Port %u MAC: %02x %02x %02x %02x %02x %02x\n",
+ (unsigned)port->id,
+ port->mac_addr.addr_bytes[0], port->mac_addr.addr_bytes[1],
+ port->mac_addr.addr_bytes[2], port->mac_addr.addr_bytes[3],
+ port->mac_addr.addr_bytes[4], port->mac_addr.addr_bytes[5]);
+
+ struct rte_eth_link link;
+ rte_eth_link_get(port->id, &link);
+ RTE_LOG(INFO, EAL, "Port %u UP: %d\n", port->id, link.link_status);
+
+ return 0;
+
+free_arp_table:
+ rte_hash_free(port->arp_table);
+free_rx_table:
+ rte_hash_free(port->rx_table);
+port_init_fail:
+ return rte_errno;
+}
+
+static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id)
+{
+ if (!ctx || !thread)
+ return -EINVAL;
+
+ unsigned int socket_id = rte_lcore_to_socket_id(id);
+ thread->id = id;
+ thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id];
+ thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id];
+ LIST_INIT(&thread->port_list);
+
+ char name[32];
+ snprintf(name, sizeof(name), "sockreq_ring_%u", id);
+ thread->sock_req_ring = rte_ring_create(
+ name,
+ UHD_DPDK_MAX_PENDING_SOCK_REQS,
+ socket_id,
+ RING_F_SC_DEQ
+ );
+ if (!thread->sock_req_ring)
+ return -ENOMEM;
+ return 0;
+}
+
+
+int uhd_dpdk_init(int argc, char **argv, unsigned int num_ports,
+ int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
+ int mtu)
+{
+ /* Init context only once */
+ if (ctx)
+ return 1;
+
+ if ((num_ports == 0) || (port_thread_mapping == NULL)) {
+ return -EINVAL;
+ }
+
+ /* Grabs arguments intended for DPDK's EAL */
+ int ret = rte_eal_init(argc, argv);
+ if (ret < 0)
+ rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
+
+ ctx = (struct uhd_dpdk_ctx *) rte_zmalloc("uhd_dpdk_ctx", sizeof(*ctx), rte_socket_id());
+ if (!ctx)
+ return -ENOMEM;
+
+ ctx->num_threads = rte_lcore_count();
+ if (ctx->num_threads <= 1)
+ rte_exit(EXIT_FAILURE, "Error: No worker threads enabled\n");
+
+ /* Check that we have ports to send/receive on */
+ ctx->num_ports = rte_eth_dev_count();
+ if (ctx->num_ports < 1)
+ rte_exit(EXIT_FAILURE, "Error: Found no ports\n");
+ if (ctx->num_ports < num_ports)
+ rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n");
+
+ /* Get memory for thread and port data structures */
+ ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0);
+ if (!ctx->threads)
+ rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for thread data\n");
+ ctx->ports = rte_zmalloc("uhd_dpdk_port", ctx->num_ports*sizeof(struct uhd_dpdk_port), 0);
+ if (!ctx->ports)
+ rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n");
+
+ /* Initialize the thread data structures */
+ for (int i = rte_get_next_lcore(-1, 1, 0);
+ (i < RTE_MAX_LCORE);
+ i = rte_get_next_lcore(i, 1, 0))
+ {
+ /* Do one mempool of RX/TX per socket */
+ unsigned int socket_id = rte_lcore_to_socket_id(i);
+ /* FIXME Probably want to take into account actual number of ports per socket */
+ if (ctx->tx_pktbuf_pools[socket_id] == NULL) {
+ /* Creates a new mempool in memory to hold the mbufs.
+ * This is done for each CPU socket
+ */
+ const int mbuf_size = mtu + 2048 + RTE_PKTMBUF_HEADROOM;
+ char name[32];
+ snprintf(name, sizeof(name), "rx_mbuf_pool_%u", socket_id);
+ ctx->rx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create(
+ name,
+ ctx->num_ports*num_mbufs,
+ mbuf_cache_size,
+ 0,
+ mbuf_size,
+ socket_id
+ );
+ snprintf(name, sizeof(name), "tx_mbuf_pool_%u", socket_id);
+ ctx->tx_pktbuf_pools[socket_id] = rte_pktmbuf_pool_create(
+ name,
+ ctx->num_ports*num_mbufs,
+ mbuf_cache_size,
+ 0,
+ mbuf_size,
+ socket_id
+ );
+ if ((ctx->rx_pktbuf_pools[socket_id]== NULL) ||
+ (ctx->tx_pktbuf_pools[socket_id]== NULL))
+ rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
+ }
+
+ if (uhd_dpdk_thread_init(&ctx->threads[i], i) < 0)
+ rte_exit(EXIT_FAILURE, "Error initializing thread %i\n", i);
+ }
+
+ unsigned master_lcore = rte_get_master_lcore();
+
+ /* Assign ports to threads and initialize the port data structures */
+ for (unsigned int i = 0; i < num_ports; i++) {
+ int thread_id = port_thread_mapping[i];
+ if (thread_id < 0)
+ continue;
+ if (((unsigned int) thread_id) == master_lcore)
+ RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i);
+ if (ctx->threads[thread_id].id != (unsigned int) thread_id)
+ rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i);
+
+ struct uhd_dpdk_port *port = &ctx->ports[i];
+ port->id = i;
+ port->parent = &ctx->threads[thread_id];
+ ctx->threads[thread_id].num_ports++;
+ LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry);
+
+ /* Initialize port. */
+ if (uhd_dpdk_port_init(port, port->parent->rx_pktbuf_pool, mtu) != 0)
+ rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+ i);
+ }
+
+ RTE_LOG(INFO, EAL, "Init DONE!\n");
+
+ /* FIXME: Create functions to do this */
+ RTE_LOG(INFO, EAL, "Starting I/O threads!\n");
+
+ for (int i = rte_get_next_lcore(-1, 1, 0);
+ (i < RTE_MAX_LCORE);
+ i = rte_get_next_lcore(i, 1, 0))
+ {
+ struct uhd_dpdk_thread *t = &ctx->threads[i];
+ if (!LIST_EMPTY(&t->port_list)) {
+ rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id);
+ }
+ }
+ return 0;
+}
+
+/* FIXME: This will be changed once we have functions to handle the threads */
+int uhd_dpdk_destroy(void)
+{
+ if (!ctx)
+ return -ENODEV;
+
+ struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req)
+ return -ENOMEM;
+
+ req->req_type = UHD_DPDK_LCORE_TERM;
+
+ for (int i = rte_get_next_lcore(-1, 1, 0);
+ (i < RTE_MAX_LCORE);
+ i = rte_get_next_lcore(i, 1, 0))
+ {
+ struct uhd_dpdk_thread *t = &ctx->threads[i];
+
+ if (LIST_EMPTY(&t->port_list))
+ continue;
+
+ if (rte_eal_get_lcore_state(t->id) == FINISHED)
+ continue;
+
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_cond_init(&req->cond, NULL);
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(t->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i);
+ rte_free(req);
+ return -ENOSPC;
+ }
+ struct timespec timeout = {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+ }
+
+ rte_free(req);
+ return 0;
+}
+
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
new file mode 100644
index 000000000..31c9dba0c
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
@@ -0,0 +1,253 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_CTX_H_
+#define _UHD_DPDK_CTX_H_
+
+#include <stdint.h>
+#include <sys/queue.h>
+#include <sys/types.h>
+#include <rte_ethdev.h>
+#include <rte_mbuf.h>
+#include <rte_hash.h>
+#include <rte_eal.h>
+#include <uhd/transport/uhd-dpdk.h>
+//#include <pthread.h>
+
+/* For nice scheduling options later, make sure to separate RX and TX activity */
+
+#define UHD_DPDK_MAX_SOCKET_CNT 1024
+#define UHD_DPDK_MAX_PENDING_SOCK_REQS 16
+#define UHD_DPDK_TXQ_SIZE 64
+#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1)
+#define UHD_DPDK_RXQ_SIZE 64
+#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1)
+
+struct uhd_dpdk_port;
+
+/**
+ *
+ * All memory allocation for port, rx_ring, and tx_ring owned by I/O thread
+ * Rest owned by user thread
+ *
+ * port: port servicing this socket
+ * tid: thread ID that owns this socket (to be associated with TX queue)
+ * sock_type: Type of socket
+ * priv: Private data, based on sock_type
+ * rx_ring: pointer to individual rx_ring (created during init--Also used as free buffer ring for TX)
+ * tx_ring: pointer to shared tx_ring (with all sockets for this tid)
+ * tx_buf_count: Number of buffers currently outside the rings
+ * tx_entry: List node for TX Queue tracking
+ *
+ * If a user closes a socket without outstanding TX buffers, user must free the
+ * buffers. Otherwise, that memory will be leaked, and usage will grow.
+ */
+struct uhd_dpdk_socket {
+ struct uhd_dpdk_port *port;
+ pid_t tid;
+ enum uhd_dpdk_sock_type sock_type;
+ void *priv;
+ struct rte_ring *rx_ring;
+ struct rte_ring *tx_ring;
+ int tx_buf_count;
+ LIST_ENTRY(uhd_dpdk_socket) tx_entry;
+};
+LIST_HEAD(uhd_dpdk_tx_head, uhd_dpdk_socket);
+
+/************************************************
+ * Configuration
+ ************************************************/
+enum uhd_dpdk_sock_req {
+ UHD_DPDK_SOCK_OPEN = 0,
+ UHD_DPDK_SOCK_CLOSE,
+ UHD_DPDK_LCORE_TERM,
+ UHD_DPDK_SOCK_REQ_COUNT
+};
+
+/**
+ * port: port associated with this request
+ * sock: socket associated with this request
+ * req_type: Open, Close, or terminate lcore
+ * sock_type: Only udp is supported
+ * cond: Used to sleep until socket creation is finished
+ * mutex: associated with cond
+ * entry: List node for requests pending ARP responses
+ * priv: private data
+ * retval: Result of call (needed post-wakeup)
+ */
+struct uhd_dpdk_config_req {
+ struct uhd_dpdk_port *port;
+ struct uhd_dpdk_socket *sock;
+ enum uhd_dpdk_sock_req req_type;
+ enum uhd_dpdk_sock_type sock_type;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ LIST_ENTRY(uhd_dpdk_config_req) entry;
+ void *priv;
+ int retval;
+};
+LIST_HEAD(uhd_dpdk_config_head, uhd_dpdk_config_req);
+
+/************************************************
+ * RX Table
+ ************************************************/
+struct uhd_dpdk_arp_entry {
+ struct ether_addr mac_addr;
+ struct uhd_dpdk_config_head pending_list; /* Config reqs pending ARP--Thread-unsafe */
+};
+
+struct uhd_dpdk_ipv4_5tuple {
+ enum uhd_dpdk_sock_type sock_type;
+ uint32_t src_ip;
+ uint32_t dst_ip;
+ uint16_t src_port;
+ uint16_t dst_port;
+};
+
+/************************************************
+ * TX Queues
+ *
+ * 1 TX Queue per thread sending through a hardware port
+ * All memory allocation owned by I/O thread
+ *
+ * tid: thread id
+ * queue: TX queue holding threads prepared packets (via send())
+ * retry_queue: queue holding packets that couldn't be sent
+ * freebufs: queue holding empty buffers
+ * tx_list: list of sockets using this queue
+ * entry: list node for port to track TX queues
+ *
+ * queue, retry_queue, and freebufs are single-producer, single-consumer queues
+ * retry_queue wholly-owned by I/O thread
+ * For queue, user thread is producer, I/O thread is consumer
+ * For freebufs, user thread is consumer, I/O thread is consumer
+ *
+ * All queues are same size, and they are shared between all sockets on one
+ * thread (tid is the identifier)
+ * 1. Buffers start in freebufs (user gets buffers from freebufs)
+ * 2. User submits packet to queue
+ * 3. If packet couldn't be sent, it is (re)enqueued on retry_queue
+ ************************************************/
+struct uhd_dpdk_tx_queue {
+ pid_t tid;
+ struct rte_ring *queue;
+ struct rte_ring *retry_queue;
+ struct rte_ring *freebufs;
+ struct uhd_dpdk_tx_head tx_list;
+ LIST_ENTRY(uhd_dpdk_tx_queue) entry;
+};
+LIST_HEAD(uhd_dpdk_txq_head, uhd_dpdk_tx_queue);
+
+/************************************************
+ * Port structure
+ *
+ * All memory allocation owned by I/O thread
+ *
+ * id: hardware port id (for DPDK)
+ * parent: I/O thread servicing this port
+ * mac_addr: MAC address of this port
+ * ipv4_addr: IPv4 address of this port
+ * netmask: Subnet mask of this port
+ * arp_table: ARP cache for this port
+ * rx_table: Mapping of 5-tuple key to sockets for RX
+ * txq_list: List of TX queues associated with this port
+ * port_entry: List node entry for I/O thread to track
+ ************************************************/
+struct uhd_dpdk_port {
+ unsigned int id;
+ struct uhd_dpdk_thread *parent;
+ struct ether_addr mac_addr;
+ uint32_t ipv4_addr; /* FIXME: Check this before allowing a socket!!! */
+ uint32_t netmask;
+ /* Key = IP addr
+ * Value = MAC addr (ptr to uhd_dpdk_arp_entry)
+ */
+ struct rte_hash *arp_table;
+ /* hash map of RX sockets
+ * Key = uhd_dpdk_ipv4_5tuple
+ * Value = uhd_dpdk_socket
+ */
+ struct rte_hash *rx_table;
+ /* doubly-linked list of TX sockets */
+ struct uhd_dpdk_txq_head txq_list;
+ LIST_ENTRY(uhd_dpdk_port) port_entry;
+};
+
+LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port);
+
+/************************************************
+ * Thread/lcore-private data structure
+ *
+ * All data owned by global context
+ *
+ * id: lcore id (from DPDK)
+ * rx_pktbuf_pool: memory pool for generating buffers for RX packets
+ * tx_pktbuf_pool: memory pool for generating buffers for TX packets
+ * num_ports: Number of ports this lcore is servicing
+ * port_list: List of ports this lcore is servicing
+ * sock_req_ring: Queue for user threads to submit service requests to the lcore
+ *
+ * sock_req_ring is a multi-producer, single-consumer queue
+ *
+ * For threads that have ports:
+ * Launch individually
+ * For threads without ports:
+ * Do not launch unless user specifically does it themselves.
+ * Should also have master lcore returned to user
+ * REMEMBER: Without args, DPDK creates an lcore for each CPU core!
+ */
+struct uhd_dpdk_thread {
+ unsigned int id;
+ struct rte_mempool *rx_pktbuf_pool;
+ struct rte_mempool *tx_pktbuf_pool;
+ int num_ports;
+ struct uhd_dpdk_port_head port_list;
+ struct rte_ring *sock_req_ring;
+};
+
+
+/************************************************
+ * One global context
+ *
+ * num_threads: Number of DPDK lcores tracked
+ * num_ports: Number of DPDK/NIC ports tracked
+ * threads: Array of all lcores/threads
+ * ports: Array of all DPDK/NIC ports
+ * rx_pktbuf_pools: Array of all packet buffer pools for RX
+ * tx_pktbuf_pools: Array of all packet buffer pools for TX
+ *
+ * The packet buffer pools are memory pools that are associated with a CPU
+ * socket. They will provide storage close to the socket to accommodate NUMA
+ * nodes.
+ ************************************************/
+struct uhd_dpdk_ctx {
+ unsigned int num_threads;
+ unsigned int num_ports;
+ struct uhd_dpdk_thread *threads;
+ struct uhd_dpdk_port *ports;
+ struct rte_mempool *rx_pktbuf_pools[RTE_MAX_NUMA_NODES];
+ struct rte_mempool *tx_pktbuf_pools[RTE_MAX_NUMA_NODES];
+};
+
+extern struct uhd_dpdk_ctx *ctx;
+
+static inline struct uhd_dpdk_port * find_port(unsigned int portid)
+{
+ if (!ctx)
+ return NULL;
+
+ for (unsigned int i = 0; i < ctx->num_threads; i++) {
+ struct uhd_dpdk_thread *t = &ctx->threads[i];
+ struct uhd_dpdk_port *p;
+ LIST_FOREACH(p, &t->port_list, port_entry) {
+ if (p->id == portid) {
+ return p;
+ }
+ }
+ }
+ return NULL;
+}
+
+#endif /* _UHD_DPDK_CTX_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
new file mode 100644
index 000000000..0af9cc4e5
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
@@ -0,0 +1,401 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_driver.h"
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include <rte_malloc.h>
+#include <rte_mempool.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+
+int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame)
+{
+ uint32_t dest_ip = arp_frame->arp_data.arp_sip;
+ struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
+
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &dest_ip, (void **) &entry);
+ if (!entry) {
+ entry = rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ return -ENOMEM;
+ }
+ LIST_INIT(&entry->pending_list);
+ ether_addr_copy(&dest_addr, &entry->mac_addr);
+ if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) {
+ rte_free(entry);
+ return -ENOSPC;
+ }
+ } else {
+ struct uhd_dpdk_config_req *req = NULL;
+ ether_addr_copy(&dest_addr, &entry->mac_addr);
+ /* Now wake any config reqs waiting for the ARP */
+ LIST_FOREACH(req, &entry->pending_list, entry) {
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+ }
+
+ return 0;
+}
+
+int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip)
+{
+ struct rte_mbuf *mbuf;
+ struct ether_hdr *hdr;
+ struct arp_hdr *arp_frame;
+
+ mbuf = rte_pktmbuf_alloc(port->parent->tx_pktbuf_pool);
+ if (unlikely(mbuf == NULL)) {
+ RTE_LOG(WARNING, MEMPOOL, "Could not allocate packet buffer for ARP request\n");
+ return -ENOMEM;
+ }
+
+ hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ arp_frame = (struct arp_hdr *) &hdr[1];
+
+ memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
+ ether_addr_copy(&port->mac_addr, &hdr->s_addr);
+ hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
+
+ arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER);
+ arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+ arp_frame->arp_hln = 6;
+ arp_frame->arp_pln = 4;
+ arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST);
+ ether_addr_copy(&port->mac_addr, &arp_frame->arp_data.arp_sha);
+ arp_frame->arp_data.arp_sip = port->ipv4_addr;
+ memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN);
+ arp_frame->arp_data.arp_tip = ip;
+
+ mbuf->pkt_len = 42;
+ mbuf->data_len = 42;
+ mbuf->ol_flags = PKT_TX_IP_CKSUM;
+
+ if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) {
+ RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__);
+ rte_pktmbuf_free(mbuf);
+ return -EAGAIN;
+ }
+ return 0;
+}
+
+
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt)
+{
+ int status = 0;
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pkt->dst_port
+ };
+
+ struct uhd_dpdk_socket *sock = NULL;
+ rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock);
+ if (!sock) {
+ status = -ENODEV;
+ goto udp_rx_drop;
+ }
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ status = rte_ring_enqueue(sock->rx_ring, mbuf);
+ if (status) {
+ pdata->dropped_pkts++;
+ goto udp_rx_drop;
+ }
+ return 0;
+
+udp_rx_drop:
+ rte_pktmbuf_free(mbuf);
+ return status;
+}
+
+int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt)
+{
+ if (pkt->dst_addr != port->ipv4_addr) {
+ rte_pktmbuf_free(mbuf);
+ return -ENODEV;
+ }
+ if (pkt->next_proto_id == 0x11) {
+ return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]);
+ }
+ rte_pktmbuf_free(mbuf);
+ return -EINVAL;
+}
+
+static int _uhd_dpdk_fill_ipv4_addr(struct uhd_dpdk_port *port,
+ struct rte_mbuf *mbuf)
+{
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+ if (is_broadcast(port, ip_hdr->dst_addr)) {
+ memset(eth_hdr->d_addr.addr_bytes, 0xff, ETHER_ADDR_LEN);
+ } else {
+ /* Lookup dest_addr */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &ip_hdr->dst_addr, (void **) &entry);
+ if (!entry) {
+ RTE_LOG(ERR, USER1, "TX packet on port %d to addr 0x%08x has no ARP entry\n", port->id, ip_hdr->dst_addr);
+ return -ENODEV;
+ }
+
+ ether_addr_copy(&entry->mac_addr, &eth_hdr->d_addr);
+ }
+ return 0;
+}
+
+static int _uhd_dpdk_send(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_tx_queue *txq,
+ struct rte_ring *q)
+{
+ struct rte_mbuf *buf;
+
+ unsigned int num_tx = rte_ring_count(q);
+ num_tx = (num_tx < UHD_DPDK_TX_BURST_SIZE) ? num_tx : UHD_DPDK_TX_BURST_SIZE;
+ for (unsigned int i = 0; i < num_tx; i++) {
+ int status = rte_ring_dequeue(q, (void **) &buf);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%s: Q Count doesn't match actual\n", __func__);
+ break;
+ }
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *);
+ if (eth_hdr->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv4)) {
+ status = _uhd_dpdk_fill_ipv4_addr(port, buf);
+ if (status) {
+ return status;
+ }
+ }
+ status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */
+ if (status != 1) {
+ status = rte_ring_enqueue(txq->retry_queue, buf);
+ if (status)
+ RTE_LOG(WARNING, USER1, "%s: Could not re-enqueue pkt %d\n", __func__, i);
+ num_tx = i;
+ rte_pktmbuf_free(buf);
+ break;
+ }
+ }
+
+ return num_tx;
+}
+
+static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_tx_queue *q,
+ unsigned int num_bufs)
+{
+ /* Allocate more buffers to replace the sent ones */
+ struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, num_bufs);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%d %s: Could not restore %u pktmbufs in bulk!\n", status, __func__, num_bufs);
+ }
+
+ /* Enqueue the buffers for the user thread to retrieve */
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n");
+ return status;
+ }
+
+ return num_bufs;
+}
+
+static inline void _uhd_dpdk_disable_ports(struct uhd_dpdk_thread *t)
+{
+ struct uhd_dpdk_port *port = NULL;
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ rte_eth_dev_stop(port->id);
+ }
+}
+
+static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t)
+{
+ /* Close sockets upon request, but reply to other service requests with
+ * errors
+ */
+ struct uhd_dpdk_config_req *sock_req;
+ if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req)) {
+ switch (sock_req->req_type) {
+ case UHD_DPDK_SOCK_CLOSE:
+ _uhd_dpdk_sock_release(sock_req);
+ break;
+ default:
+ _uhd_dpdk_config_req_compl(sock_req, -ENODEV);
+ break;
+ }
+ }
+
+ /* Do nothing if there are users remaining */
+ struct uhd_dpdk_port *port = NULL;
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ /* Check for RX sockets */
+ const void *hash_key;
+ void *hash_sock;
+ uint32_t hash_next = 0;
+ if (rte_hash_iterate(port->rx_table, &hash_key,
+ &hash_sock, &hash_next) != -ENOENT)
+ return -EAGAIN;
+
+ /* Check for TX sockets */
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (!LIST_EMPTY(&q->tx_list))
+ return -EAGAIN;
+ }
+ }
+
+ /* Now can free memory */
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ rte_hash_free(port->rx_table);
+
+ struct uhd_dpdk_tx_queue *q = LIST_FIRST(&port->txq_list);
+ while (!LIST_EMPTY(&port->txq_list)) {
+ struct uhd_dpdk_tx_queue *nextq = LIST_NEXT(q, entry);
+ while (!rte_ring_empty(q->queue)) {
+ struct rte_buf *buf = NULL;
+ rte_ring_dequeue(q->queue, (void **) &buf);
+ rte_free(buf);
+ }
+ while (!rte_ring_empty(q->freebufs)) {
+ struct rte_buf *buf = NULL;
+ rte_ring_dequeue(q->freebufs, (void **) &buf);
+ rte_free(buf);
+ }
+ while (!rte_ring_empty(q->retry_queue)) {
+ struct rte_buf *buf = NULL;
+ rte_ring_dequeue(q->retry_queue, (void **) &buf);
+ rte_free(buf);
+ }
+ rte_ring_free(q->queue);
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ q = nextq;
+ }
+
+ const void *arp_key;
+ uint32_t arp_key_next = 0;
+ struct uhd_dpdk_arp_entry *arp_entry = NULL;
+ while (rte_hash_iterate(port->arp_table, &arp_key,
+ (void **) &arp_entry, &arp_key_next) >= 0) {
+ rte_free(arp_entry);
+ }
+ rte_hash_free(port->arp_table);
+ }
+
+ return 0;
+}
+
+int _uhd_dpdk_driver_main(void *arg)
+{
+
+ /* Don't currently have use for arguments */
+ if (arg)
+ return -EINVAL;
+
+ /* Check that this is a valid lcore */
+ unsigned int lcore_id = rte_lcore_id();
+ if (lcore_id == LCORE_ID_ANY)
+ return -ENODEV;
+
+ /* Check that this lcore has ports */
+ struct uhd_dpdk_thread *t = &ctx->threads[lcore_id];
+ if (t->id != lcore_id)
+ return -ENODEV;
+
+ RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id);
+ int status = 0;
+ while (!status) {
+ /* Check for open()/close() requests and service 1 at a time */
+ struct uhd_dpdk_config_req *sock_req;
+ if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) {
+ /* FIXME: Not checking return vals */
+ switch (sock_req->req_type) {
+ case UHD_DPDK_SOCK_OPEN:
+ _uhd_dpdk_sock_setup(sock_req);
+ break;
+ case UHD_DPDK_SOCK_CLOSE:
+ _uhd_dpdk_sock_release(sock_req);
+ break;
+ case UHD_DPDK_LCORE_TERM:
+ RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id);
+ status = 1;
+ _uhd_dpdk_config_req_compl(sock_req, 0);
+ break;
+ default:
+ RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type);
+ break;
+ }
+ }
+ /* For each port, attempt to receive packets and process */
+ struct uhd_dpdk_port *port = NULL;
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ struct ether_hdr *hdr;
+ char *l2_data;
+ struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE];
+ const uint16_t num_rx = rte_eth_rx_burst(port->id, 0,
+ bufs, UHD_DPDK_RX_BURST_SIZE);
+ if (unlikely(num_rx == 0)) {
+ continue;
+ }
+
+ for (int buf = 0; buf < num_rx; buf++) {
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *);
+ l2_data = (char *) &hdr[1];
+ switch (rte_be_to_cpu_16(hdr->ether_type)) {
+ case ETHER_TYPE_ARP:
+ _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data);
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ case ETHER_TYPE_IPv4:
+ if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
+ RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
+ } else {
+ _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
+ }
+ break;
+ default:
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ }
+ }
+ }
+ /* For each port's TX queues, do TX */
+ LIST_FOREACH(port, &t->port_list, port_entry) {
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (!rte_ring_empty(q->retry_queue)) {
+ int num_retry = _uhd_dpdk_send(port, q, q->retry_queue);
+ _uhd_dpdk_restore_bufs(port, q, num_retry);
+ if (!rte_ring_empty(q->retry_queue)) {
+ break;
+ }
+ }
+ if (rte_ring_empty(q->queue)) {
+ continue;
+ }
+ int num_tx = _uhd_dpdk_send(port, q, q->queue);
+ if (num_tx > 0) {
+ _uhd_dpdk_restore_bufs(port, q, num_tx);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /* Now turn off ports */
+ _uhd_dpdk_disable_ports(t);
+
+ /* Now clean up before exiting */
+ int cleaning = -EAGAIN;
+ while (cleaning == -EAGAIN) {
+ cleaning = _uhd_dpdk_driver_cleanup(t);
+ }
+ return status;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
new file mode 100644
index 000000000..b0d3e42cd
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
@@ -0,0 +1,32 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_DRIVER_H_
+#define _UHD_DPDK_DRIVER_H_
+
+#include "uhd_dpdk_ctx.h"
+#include <rte_mbuf.h>
+#include <rte_arp.h>
+#include <rte_udp.h>
+#include <rte_ip.h>
+
+static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_addr)
+{
+ uint32_t network = port->netmask | ((~port->netmask) & dst_ipv4_addr);
+ return (network == 0xffffffff);
+}
+
+
+int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame);
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt);
+int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt);
+int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf);
+int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port,
+ uint32_t ip);
+
+int _uhd_dpdk_driver_main(void *arg);
+#endif /* _UHD_DPDK_DRIVER_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
new file mode 100644
index 000000000..3acc3d709
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
@@ -0,0 +1,306 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include <rte_malloc.h>
+#include <rte_ip.h>
+
+/************************************************
+ * I/O thread ONLY
+ *
+ * TODO: Decide whether to allow blocking on mutex
+ * This would cause the I/O thread to sleep, which isn't desireable
+ * Could throw in a "request completion cleanup" section in I/O thread's
+ * main loop, though. Just keep trying until the requesting thred is woken
+ * up. This would be to handle the case where the thread hadn't finished
+ * setting itself up to wait on the condition variable, but the I/O thread
+ * still got the request.
+ */
+int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval)
+{
+ req->retval = retval;
+ int stat = pthread_mutex_trylock(&req->mutex);
+ if (stat) {
+ RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__);
+ return stat;
+ }
+ stat = pthread_cond_signal(&req->cond);
+ pthread_mutex_unlock(&req->mutex);
+ if (stat) {
+ RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__);
+ return stat;
+ }
+ return 0;
+}
+
+int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req)
+{
+ int stat = 0;
+ switch (req->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ stat = _uhd_dpdk_udp_setup(req);
+ break;
+ default:
+ stat = -EINVAL;
+ _uhd_dpdk_config_req_compl(req, -EINVAL);
+ }
+ return stat;
+}
+
+int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req)
+{
+ int stat = 0;
+ switch (req->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ stat = _uhd_dpdk_udp_release(req);
+ break;
+ default:
+ stat = -EINVAL;
+ _uhd_dpdk_config_req_compl(req, -EINVAL);
+ }
+
+ return stat;
+}
+
+/************************************************
+ * API calls
+ */
+struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
+ enum uhd_dpdk_sock_type t, void *sockarg)
+{
+ if (!ctx || (t >= UHD_DPDK_SOCK_TYPE_COUNT)) {
+ return NULL;
+ }
+
+ struct uhd_dpdk_port *port = find_port(portid);
+ if (!port) {
+ return NULL;
+ }
+
+ if (!port->ipv4_addr) {
+ RTE_LOG(WARNING, EAL, "Please set IPv4 address for port %u before opening socket\n", portid);
+ return NULL;
+ }
+
+
+ struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req) {
+ return NULL;
+ }
+
+ struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0);
+ if (!s) {
+ goto sock_open_end;
+ }
+
+ s->port = port;
+ req->sock = s;
+ req->req_type = UHD_DPDK_SOCK_OPEN;
+ req->sock_type = t;
+ req->retval = -ETIMEDOUT;
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_condattr_t condattr;
+ pthread_condattr_init(&condattr);
+ pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC);
+ pthread_cond_init(&req->cond, &condattr);
+
+ switch (t) {
+ case UHD_DPDK_SOCK_UDP:
+ uhd_dpdk_udp_open(req, sockarg);
+ break;
+ default:
+ break;
+ }
+
+ if (req->retval) {
+ rte_free(s);
+ s = NULL;
+ }
+
+sock_open_end:
+ rte_free(req);
+ return s;
+}
+
+int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
+{
+ if (!ctx || !sock)
+ return -EINVAL;
+
+ struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req)
+ return -ENOMEM;
+ req->sock = sock;
+ req->req_type = UHD_DPDK_SOCK_CLOSE;
+ req->sock_type = sock->sock_type;
+ req->retval = -ETIMEDOUT;
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_cond_init(&req->cond, NULL);
+
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ uhd_dpdk_udp_close(req);
+ break;
+ default:
+ break;
+ }
+
+ if (req->retval) {
+ rte_free(req);
+ return req->retval;
+ }
+
+ rte_free(sock);
+ return 0;
+}
+
+/*
+ * TODO:
+ * Add blocking calls with timeout
+ * Implementation would involve a condition variable, like config reqs
+ * Also would create a cleanup section in I/O main loop (like config reqs)
+ */
+int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs)
+{
+ if (!sock || !bufs || !num_bufs) {
+ return -EINVAL;
+ }
+ *bufs = NULL;
+
+ if (!sock->tx_ring)
+ return -EINVAL;
+
+ unsigned int num_tx = rte_ring_count(sock->rx_ring);
+ num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
+ if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0)
+ return -ENOENT;
+ sock->tx_buf_count += num_tx;
+ return num_tx;
+}
+
+int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs)
+{
+ if (!sock || !bufs || !num_bufs)
+ return -EINVAL;
+ if (!sock->tx_ring)
+ return -EINVAL;
+ unsigned int num_tx = rte_ring_free_count(sock->tx_ring);
+ num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ for (unsigned int i = 0; i < num_tx; i++) {
+ uhd_dpdk_udp_prep(sock, bufs[i]);
+ }
+ break;
+ default:
+ RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__);
+ return -EINVAL;
+ }
+ int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL);
+ if (status == 0) {
+ RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n");
+ return status;
+ }
+ sock->tx_buf_count -= num_tx;
+ return num_tx;
+}
+
+/*
+ * TODO:
+ * Add blocking calls with timeout
+ */
+int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
+ unsigned int num_bufs, unsigned int timeout)
+{
+ if (!sock || !bufs || !num_bufs)
+ return -EINVAL;
+ if (!sock->rx_ring)
+ return -EINVAL;
+ unsigned int num_rx = rte_ring_count(sock->rx_ring);
+ num_rx = (num_rx < num_bufs) ? num_rx : num_bufs;
+ if (num_rx) {
+ unsigned int avail = 0;
+ unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring,
+ (void **) bufs, num_rx, &avail);
+ if (status == 0) {
+ RTE_LOG(ERR, USER1, "Invalid shared usage of RX ring detected\n");
+ RTE_LOG(ERR, USER1, "Requested %u, but %u available\n",
+ num_rx, avail);
+ return -ENOENT;
+ }
+ }
+ return num_rx;
+}
+
+void uhd_dpdk_free_buf(struct rte_mbuf *buf)
+{
+ rte_pktmbuf_free(buf);
+}
+
+void * uhd_dpdk_buf_to_data(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf)
+{
+ if (!sock || !buf)
+ return NULL;
+
+ /* TODO: Support for more types? */
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ return rte_pktmbuf_mtod_offset(buf, void *, sizeof(struct ether_hdr) +
+ sizeof(struct ipv4_hdr) +
+ sizeof(struct udp_hdr));
+ default:
+ return NULL;
+ }
+}
+
+
+int uhd_dpdk_get_len(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf)
+{
+ if (!sock || !buf)
+ return -EINVAL;
+
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct udp_hdr *hdr = (struct udp_hdr *) ((uint8_t *) uhd_dpdk_buf_to_data(sock, buf) - sizeof(struct udp_hdr));
+ if (!hdr)
+ return -EINVAL;
+
+ /* Report dgram length - header */
+ return ntohs(hdr->dgram_len) - 8;
+}
+
+int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf,
+ uint32_t *ipv4_addr)
+{
+ if (!sock || !buf || !ipv4_addr)
+ return -EINVAL;
+
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct ipv4_hdr *hdr = rte_pktmbuf_mtod_offset(buf, struct ipv4_hdr *,
+ sizeof(struct ether_hdr));
+
+ *ipv4_addr = hdr->src_addr;
+ return 0;
+}
+
+int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count)
+{
+ if (!sock)
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+ if (!sock->priv)
+ return -ENODEV;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ *count = pdata->dropped_pkts;
+ return 0;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
new file mode 100644
index 000000000..c07c1913a
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
@@ -0,0 +1,15 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_FOPS_H_
+#define _UHD_DPDK_FOPS_H_
+
+#include "uhd_dpdk_ctx.h"
+
+int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval);
+
+int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req);
+int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req);
+#endif /* _UHD_DPDK_FOPS_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
new file mode 100644
index 000000000..26cfd43e1
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -0,0 +1,456 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_driver.h"
+#include <rte_ring.h>
+#include <rte_malloc.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <arpa/inet.h>
+
+/************************************************
+ * I/O thread ONLY
+ */
+
+static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue)
+{
+ *queue = NULL;
+ struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
+ if (!q) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate TX queue\n", __func__);
+ return -ENOMEM;
+ }
+ q->tid = tid;
+ LIST_INIT(&q->tx_list);
+
+ char name[32];
+ snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid);
+ q->queue = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid);
+ q->freebufs = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ /* Set up retry queue */
+ snprintf(name, sizeof(name), "retry_queue_%u", port->id);
+ q->retry_queue = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+
+ if (!q->queue || !q->freebufs || !q->retry_queue) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate TX rings\n", __func__);
+ if (q->queue)
+ rte_ring_free(q->queue);
+ if (q->freebufs)
+ rte_ring_free(q->freebufs);
+ if (q->retry_queue)
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ return -ENOMEM;
+ }
+ struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
+ unsigned int num_bufs = rte_ring_free_count(q->freebufs);
+ int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
+ if (buf_stat) {
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->queue);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
+ return -ENOENT;
+ }
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
+ }
+ LIST_INSERT_HEAD(&port->txq_list, q, entry);
+ *queue = q;
+ return 0;
+}
+
+/* Finish setting up UDP socket (unless ARP needs to be done)
+ * Not multi-thread safe!
+ * This call should only be used by the thread servicing the port
+ * In addition, the code below assumes simplex sockets and unique receive ports
+ * FIXME: May need some way to help clean up abandoned socket requests (refcnt check...)
+ */
+int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
+{
+ int retval = 0;
+ struct uhd_dpdk_socket *sock = req->sock;
+ struct uhd_dpdk_udp_priv *pdata = sock->priv;
+ struct uhd_dpdk_port *port = req->sock->port;
+
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pdata->dst_port
+ };
+
+ /* Are we doing RX? */
+ if (sock->rx_ring) {
+ /* Add to rx table */
+ if (pdata->dst_port == 0) {
+ /* Assign unused one in a very slow fashion */
+ for (uint16_t i = 1; i > 0; i++) {
+ ht_key.dst_port = htons(i);
+ if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) {
+ pdata->dst_port = htons(i);
+ break;
+ }
+ }
+ }
+
+ /* Is the port STILL invalid? */
+ if (pdata->dst_port == 0) {
+ RTE_LOG(ERR, USER1, "%s: No available UDP ports\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -EADDRINUSE);
+ return -EADDRINUSE;
+ }
+
+ ht_key.dst_port = pdata->dst_port;
+ if (rte_hash_lookup(port->rx_table, &ht_key) > 0) {
+ RTE_LOG(ERR, USER1, "%s: Cannot add to RX table\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -EADDRINUSE);
+ return -EADDRINUSE;
+ }
+
+ char name[32];
+ snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port));
+ sock->rx_ring = rte_ring_create(
+ name,
+ UHD_DPDK_RXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ if (!sock->rx_ring) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate RX ring\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock);
+ if (retval != 0) {
+ RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval);
+ rte_ring_free(sock->rx_ring);
+ _uhd_dpdk_config_req_compl(req, retval);
+ return retval;
+ }
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+
+ /* Are we doing TX? */
+ if (sock->tx_ring) {
+ sock->tx_ring = NULL;
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (q->tid == sock->tid) {
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ sock->tx_ring = q->queue;
+ sock->rx_ring = q->freebufs;
+ break;
+ }
+ }
+ if (!sock->tx_ring) {
+ retval = _alloc_txq(port, sock->tid, &q);
+ if (retval) {
+ _uhd_dpdk_config_req_compl(req, retval);
+ return retval;
+ }
+ sock->tx_ring = q->queue;
+ sock->rx_ring = q->freebufs;
+ }
+ /* If a broadcast type, just finish setup and return */
+ if (is_broadcast(port, pdata->dst_ipv4_addr)) {
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ _uhd_dpdk_config_req_compl(req, 0);
+ return 0;
+ }
+ /* Otherwise... Check for entry in ARP table */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ int arp_table_stat = rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry);
+ if (entry) {
+ /* Check for null entry */
+ if ((entry->mac_addr.addr_bytes[0] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[1] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[2] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[3] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[4] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[5] == 0xFF)) {
+ arp_table_stat = -ENOENT;
+ }
+ } else {
+ /* No entry -> Add null entry */
+ entry = rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate ARP entry\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ memset(entry->mac_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
+ LIST_INIT(&entry->pending_list);
+
+ if (rte_hash_add_key_data(port->arp_table, &pdata->dst_ipv4_addr, entry) < 0) {
+ rte_free(entry);
+ RTE_LOG(ERR, USER1, "%s: Cannot add entry to ARP table\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ }
+
+ /* Was there a valid address? */
+ if (arp_table_stat == -ENOENT) {
+ /* Get valid address and have requestor continue waiting */
+ int arp_stat = 0;
+ do { /* Keep trying to send request if no descriptor */
+ arp_stat = _uhd_dpdk_arp_request(port, pdata->dst_ipv4_addr);
+ } while (arp_stat == -EAGAIN);
+
+ if (arp_stat) {
+ /* Config request errors out */
+ RTE_LOG(ERR, USER1, "%s: Cannot make ARP request\n", __func__);
+ _uhd_dpdk_config_req_compl(req, arp_stat);
+ return arp_stat;
+ }
+ /* Append req to pending list. Will signal later. */
+ LIST_INSERT_HEAD(&entry->pending_list, req, entry);
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ } else {
+ /* We have a valid address. All good. */
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+ }
+ return 0;
+}
+
+int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ struct uhd_dpdk_port *port = req->sock->port;
+ struct uhd_dpdk_config_req *conf_req = NULL;
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (sock->tx_ring) {
+ /* Remove from tx_list */
+ LIST_REMOVE(sock, tx_entry);
+ /* Check for entry in ARP table */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry);
+ if (entry) {
+ LIST_FOREACH(conf_req, &entry->pending_list, entry) {
+ if (conf_req->sock == sock) {
+ LIST_REMOVE(conf_req, entry);
+ break;
+ }
+ }
+ }
+
+ /* Add outstanding buffers back to TX queue's freebufs */
+ struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ }
+
+ unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
+ if (enqd != (unsigned int) sock->tx_buf_count) {
+ RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
+ return status;
+ }
+ } else if (sock->rx_ring) {
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pdata->dst_port
+ };
+ rte_hash_del_key(port->rx_table, &ht_key);
+ struct rte_mbuf *mbuf = NULL;
+ while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) {
+ rte_pktmbuf_free(mbuf);
+ }
+ rte_ring_free(sock->rx_ring);
+ }
+
+ _uhd_dpdk_config_req_compl(req, 0);
+ return 0;
+}
+
+/* Configure a socket for UDP
+ * TODO: Make sure EVERYTHING is configured in socket
+ * FIXME: Make all of this part of the user thread, except hash table interaction
+ * and list handling
+ */
+void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
+ struct uhd_dpdk_sockarg_udp *arg)
+{
+ if (!req)
+ return;
+
+ if (!arg) {
+ req->retval = -EINVAL;
+ return;
+ }
+
+ struct uhd_dpdk_socket *sock = req->sock;
+ pid_t tid = syscall(__NR_gettid);
+ sock->tid = tid;
+
+ /* Create private data */
+ struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0);
+ if (!data) {
+ req->retval = -ENOMEM;
+ return;
+ }
+ sock->priv = data;
+
+ data->dst_ipv4_addr = arg->dst_addr;
+ if (arg->is_tx) {
+ data->src_port = arg->local_port;
+ data->dst_port = arg->remote_port;
+ sock->tx_ring = (struct rte_ring *) sock;
+ } else {
+ data->src_port = arg->remote_port;
+ data->dst_port = arg->local_port;
+ sock->rx_ring = (struct rte_ring *) sock;
+ }
+
+ /* TODO: Add support for I/O thread calling (skip locking and sleep) */
+ /* Add to port's config queue */
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(data);
+ req->retval = -ENOSPC;
+ return;
+ }
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += 1;
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+
+ if (req->retval)
+ rte_free(data);
+}
+
+void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req)
+{
+ if (!req)
+ return;
+
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(req->sock->priv);
+ req->retval = -ENOSPC;
+ return;
+ }
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += 1;
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(req->sock->priv);
+}
+
+/*
+ * Note: I/O thread will fill in destination MAC address (doesn't happen here)
+ */
+static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port,
+ struct rte_mbuf *mbuf,
+ uint32_t dst_ipv4_addr,
+ uint8_t proto_id,
+ uint32_t payload_len)
+{
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+
+ ether_addr_copy(&port->mac_addr, &eth_hdr->s_addr);
+ eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+
+ ip_hdr->version_ihl = 0x40 | 5;
+ ip_hdr->type_of_service = 0;
+ ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len);
+ ip_hdr->packet_id = 0;
+ ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG);
+ ip_hdr->time_to_live = 64;
+ ip_hdr->next_proto_id = proto_id;
+ ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */
+ ip_hdr->src_addr = port->ipv4_addr;
+ ip_hdr->dst_addr = dst_ipv4_addr;
+
+ mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+ mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+}
+
+int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf)
+{
+ struct ether_hdr *eth_hdr;
+ struct ipv4_hdr *ip_hdr;
+ struct udp_hdr *tx_hdr;
+ struct uhd_dpdk_port *port = sock->port;
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+
+ if (unlikely(mbuf == NULL || pdata == NULL || port == NULL))
+ return -EINVAL;
+
+ uint32_t udp_data_len = mbuf->data_len;
+ uhd_dpdk_ipv4_prep(port,
+ mbuf,
+ pdata->dst_ipv4_addr,
+ 0x11,
+ 8 + udp_data_len);
+
+ eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+ tx_hdr = (struct udp_hdr *) &ip_hdr[1];
+
+ tx_hdr->src_port = pdata->src_port;
+ tx_hdr->dst_port = pdata->dst_port;
+ tx_hdr->dgram_len = rte_cpu_to_be_16(8 + udp_data_len);
+ tx_hdr->dgram_cksum = 0;
+
+ return 0;
+}
+
+int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_sockarg_udp *sockarg)
+{
+ if (unlikely(sock == NULL || sockarg == NULL))
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (sock->tx_ring) {
+ sockarg->is_tx = true;
+ sockarg->local_port = pdata->src_port;
+ sockarg->remote_port = pdata->dst_port;
+ sockarg->dst_addr = pdata->dst_ipv4_addr;
+ } else {
+ sockarg->is_tx = false;
+ sockarg->local_port = pdata->dst_port;
+ sockarg->remote_port = pdata->src_port;
+ sockarg->dst_addr = 0;
+ }
+ return 0;
+}
+
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
new file mode 100644
index 000000000..651ae144e
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_UDP_H_
+#define _UHD_DPDK_UDP_H_
+
+#include "uhd_dpdk_ctx.h"
+#include <rte_udp.h>
+
+struct uhd_dpdk_udp_priv {
+ uint16_t src_port;
+ uint16_t dst_port;
+ uint32_t dst_ipv4_addr;
+ uint32_t dropped_pkts;
+ /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */
+ //struct uhd_dpdk_arp_entry *arp_entry;
+};
+
+int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req);
+int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req);
+
+void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
+ struct uhd_dpdk_sockarg_udp *arg);
+void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req);
+
+int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf);
+#endif /* _UHD_DPDK_UDP_H_ */