aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2018-08-31 11:35:07 -0700
committerBrent Stapleton <brent.stapleton@ettus.com>2019-01-15 17:14:57 -0800
commite2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch)
tree296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport
parente2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff)
downloaduhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz
uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2
uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration requests had their blocking calls re-implemented on top of this API, and it is also used to service requests to wait on RX packets (w/ timeout). The wait API involves a multi-producer, single-consumer queue per I/O thread (waiter_ring), with a condition variable used for sleeping. The data structure is shared between user thread and I/O thread, and because timeouts make resource release time non-deterministic, we use reference counting on the shared resource. One reference is generated by the user thread and passed to the I/O thread to consume. A user thread that still needs the data after waking must get() another reference, to postpone the destruction of the resource until it is done. Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0 indicates blocking indefinitely, and a negative timeout indicates no timeout is desired. Also drop timeout for closing sockets in uhd-dpdk. The timeout would allow a user thread to pre-empt the I/O thread's cleanup process. The user thread would free data structures the I/O thread still needed to function. Since this timeout is superfluous anyway, let's just get rid of it. Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/uhd-dpdk/CMakeLists.txt4
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk.c72
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h45
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c299
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c135
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h8
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c158
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h11
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c114
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h62
10 files changed, 690 insertions, 218 deletions
diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt
index be683aaf2..0beb1eed5 100644
--- a/host/lib/transport/uhd-dpdk/CMakeLists.txt
+++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt
@@ -22,13 +22,15 @@ if(ENABLE_DPDK)
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c
)
set_source_files_properties(
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c
${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c
- PROPERTIES COMPILE_FLAGS ${UHD_DPDK_CFLAGS}
+ ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c
+ PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE"
)
include_directories(${DPDK_INCLUDE_DIR})
LIBUHD_APPEND_LIBS(${DPDK_LIBRARIES})
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
index d6a8a5aa6..1be6b2335 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
@@ -4,9 +4,11 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include "uhd_dpdk_ctx.h"
+#include "uhd_dpdk_wait.h"
#include "uhd_dpdk_udp.h"
#include "uhd_dpdk_driver.h"
#include <stdlib.h>
+#include <sched.h>
#include <rte_errno.h>
#include <rte_malloc.h>
#include <rte_log.h>
@@ -25,7 +27,6 @@ struct uhd_dpdk_ctx *ctx = NULL;
/* TODO: For nice scheduling options later, make sure to separate RX and TX activity */
-
int uhd_dpdk_port_count(void)
{
if (!ctx)
@@ -89,9 +90,11 @@ static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port,
/* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */
retval = rte_eth_dev_set_mtu(port->id, mtu);
if (retval) {
+ uint16_t actual_mtu;
RTE_LOG(WARNING, EAL, "%d: Could not set mtu to %d\n", retval, mtu);
- rte_eth_dev_get_mtu(port->id, &mtu);
- RTE_LOG(WARNING, EAL, "Current mtu=%d\n", mtu);
+ rte_eth_dev_get_mtu(port->id, &actual_mtu);
+ RTE_LOG(WARNING, EAL, "Current mtu=%d\n", actual_mtu);
+ mtu = actual_mtu;
}
/* FIXME: Check if hw_ip_checksum is possible */
struct rte_eth_conf port_conf = {
@@ -175,19 +178,19 @@ port_init_fail:
return rte_errno;
}
-static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id)
+static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lcore)
{
if (!ctx || !thread)
return -EINVAL;
- unsigned int socket_id = rte_lcore_to_socket_id(id);
- thread->id = id;
+ unsigned int socket_id = rte_lcore_to_socket_id(lcore);
+ thread->lcore = lcore;
thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id];
thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id];
LIST_INIT(&thread->port_list);
char name[32];
- snprintf(name, sizeof(name), "sockreq_ring_%u", id);
+ snprintf(name, sizeof(name), "sockreq_ring_%u", lcore);
thread->sock_req_ring = rte_ring_create(
name,
UHD_DPDK_MAX_PENDING_SOCK_REQS,
@@ -196,6 +199,15 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id)
);
if (!thread->sock_req_ring)
return -ENOMEM;
+ snprintf(name, sizeof(name), "waiter_ring_%u", lcore);
+ thread->waiter_ring = rte_ring_create(
+ name,
+ UHD_DPDK_MAX_WAITERS,
+ socket_id,
+ RING_F_SC_DEQ
+ );
+ if (!thread->waiter_ring)
+ return -ENOMEM;
return 0;
}
@@ -290,7 +302,7 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
continue;
if (((unsigned int) thread_id) == master_lcore)
RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i);
- if (ctx->threads[thread_id].id != (unsigned int) thread_id)
+ if (ctx->threads[thread_id].lcore != (unsigned int) thread_id)
rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i);
struct uhd_dpdk_port *port = &ctx->ports[i];
@@ -310,14 +322,33 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
/* FIXME: Create functions to do this */
RTE_LOG(INFO, EAL, "Starting I/O threads!\n");
+ cpu_set_t io_cpuset;
+ CPU_ZERO(&io_cpuset);
for (int i = rte_get_next_lcore(-1, 1, 0);
(i < RTE_MAX_LCORE);
i = rte_get_next_lcore(i, 1, 0))
{
struct uhd_dpdk_thread *t = &ctx->threads[i];
if (!LIST_EMPTY(&t->port_list)) {
- rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id);
- }
+ rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].lcore);
+ struct uhd_dpdk_wait_req *waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE);
+ if (!waiter) {
+ rte_exit(EXIT_FAILURE, "%s: Failed to get wait request\n", __func__);
+ }
+ uhd_dpdk_waiter_get(waiter);
+ uhd_dpdk_waiter_wait(waiter, -1, &ctx->threads[i]);
+ uhd_dpdk_waiter_put(waiter);
+ CPU_OR(&io_cpuset, &io_cpuset, &t->cpu_affinity);
+ }
+ }
+ cpu_set_t user_cpuset;
+ CPU_ZERO(&user_cpuset);
+ for (int i = 0; i < CPU_SETSIZE; i++) {
+ CPU_SET(i, &user_cpuset);
+ }
+ CPU_XOR(&user_cpuset, &user_cpuset, &io_cpuset);
+ if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &user_cpuset)) {
+ rte_exit(EXIT_FAILURE, "%s: Failed to set CPU affinity\n", __func__);
}
return 0;
}
@@ -332,6 +363,12 @@ int uhd_dpdk_destroy(void)
if (!req)
return -ENOMEM;
+ req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE);
+ if (!req->waiter) {
+ rte_free(req);
+ return -ENOMEM;
+ }
+
req->req_type = UHD_DPDK_LCORE_TERM;
for (int i = rte_get_next_lcore(-1, 1, 0);
@@ -343,26 +380,19 @@ int uhd_dpdk_destroy(void)
if (LIST_EMPTY(&t->port_list))
continue;
- if (rte_eal_get_lcore_state(t->id) == FINISHED)
+ if (rte_eal_get_lcore_state(t->lcore) == FINISHED)
continue;
- pthread_mutex_init(&req->mutex, NULL);
- pthread_cond_init(&req->cond, NULL);
- pthread_mutex_lock(&req->mutex);
if (rte_ring_enqueue(t->sock_req_ring, req)) {
- pthread_mutex_unlock(&req->mutex);
RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i);
+ rte_free(req->waiter);
rte_free(req);
return -ENOSPC;
}
- struct timespec timeout = {
- .tv_sec = 1,
- .tv_nsec = 0
- };
- pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
- pthread_mutex_unlock(&req->mutex);
+ uhd_dpdk_config_req_submit(req, 1, t);
}
+ uhd_dpdk_waiter_put(req->waiter);
rte_free(req);
return 0;
}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
index c6afab85d..ca0fa77b3 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
@@ -13,6 +13,7 @@
#include <rte_mbuf.h>
#include <rte_hash.h>
#include <rte_eal.h>
+#include <rte_atomic.h>
#include <uhd/transport/uhd-dpdk.h>
//#include <pthread.h>
@@ -20,16 +21,18 @@
#define UHD_DPDK_MAX_SOCKET_CNT 1024
#define UHD_DPDK_MAX_PENDING_SOCK_REQS 16
+#define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT
#define UHD_DPDK_TXQ_SIZE 64
#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1)
#define UHD_DPDK_RXQ_SIZE 64
#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1)
struct uhd_dpdk_port;
+struct uhd_dpdk_tx_queue;
/**
*
- * All memory allocation for port, rx_ring, and tx_ring owned by I/O thread
+ * All memory allocation for port, rx_ring, and tx_queue owned by I/O thread
* Rest owned by user thread
*
* port: port servicing this socket
@@ -37,7 +40,7 @@ struct uhd_dpdk_port;
* sock_type: Type of socket
* priv: Private data, based on sock_type
* rx_ring: pointer to individual rx_ring (created during init--Also used as free buffer ring for TX)
- * tx_ring: pointer to shared tx_ring (with all sockets for this tid)
+ * tx_queue: pointer to tx queue structure
* tx_buf_count: Number of buffers currently outside the rings
* tx_entry: List node for TX Queue tracking
*
@@ -46,19 +49,21 @@ struct uhd_dpdk_port;
*/
struct uhd_dpdk_socket {
struct uhd_dpdk_port *port;
- pid_t tid;
+ pthread_t tid;
enum uhd_dpdk_sock_type sock_type;
void *priv;
struct rte_ring *rx_ring;
- struct rte_ring *tx_ring;
+ struct uhd_dpdk_tx_queue *tx_queue;
int tx_buf_count;
LIST_ENTRY(uhd_dpdk_socket) tx_entry;
};
LIST_HEAD(uhd_dpdk_tx_head, uhd_dpdk_socket);
/************************************************
- * Configuration
+ * Configuration and Blocking
************************************************/
+struct uhd_dpdk_wait_req;
+
enum uhd_dpdk_sock_req {
UHD_DPDK_SOCK_OPEN = 0,
UHD_DPDK_SOCK_CLOSE,
@@ -70,20 +75,20 @@ enum uhd_dpdk_sock_req {
* port: port associated with this request
* sock: socket associated with this request
* req_type: Open, Close, or terminate lcore
- * sock_type: Only udp is supported
* cond: Used to sleep until socket creation is finished
* mutex: associated with cond
* entry: List node for requests pending ARP responses
* priv: private data
* retval: Result of call (needed post-wakeup)
+ *
+ * config_reqs are assumed not to time out
+ * The interaction with wait_reqs currently makes this impossible to do safely
*/
struct uhd_dpdk_config_req {
struct uhd_dpdk_port *port;
struct uhd_dpdk_socket *sock;
enum uhd_dpdk_sock_req req_type;
- enum uhd_dpdk_sock_type sock_type;
- pthread_cond_t cond;
- pthread_mutex_t mutex;
+ struct uhd_dpdk_wait_req *waiter;
LIST_ENTRY(uhd_dpdk_config_req) entry;
void *priv;
int retval;
@@ -107,24 +112,25 @@ struct uhd_dpdk_ipv4_5tuple {
};
/**
- * Used for blocking calls to RX
+ * Entry for RX table
+ * req used for blocking calls to RX
*/
-struct uhd_dpdk_sock_cond {
+struct uhd_dpdk_rx_entry {
struct uhd_dpdk_socket *sock;
- pthread_cond_t cond;
- pthread_mutex_t mutex;
+ struct uhd_dpdk_wait_req *waiter;
};
/************************************************
* TX Queues
*
- * 1 TX Queue per thread sending through a hardware port
+ * 1 TX Queue per socket sending through a hardware port
* All memory allocation owned by I/O thread
*
* tid: thread id
* queue: TX queue holding threads prepared packets (via send())
* retry_queue: queue holding packets that couldn't be sent
* freebufs: queue holding empty buffers
+ * waiter: Request to wait for a free buffer
* tx_list: list of sockets using this queue
* entry: list node for port to track TX queues
*
@@ -133,17 +139,17 @@ struct uhd_dpdk_sock_cond {
* For queue, user thread is producer, I/O thread is consumer
* For freebufs, user thread is consumer, I/O thread is consumer
*
- * All queues are same size, and they are shared between all sockets on one
- * thread (tid is the identifier)
+ * All queues are same size
* 1. Buffers start in freebufs (user gets buffers from freebufs)
* 2. User submits packet to queue
* 3. If packet couldn't be sent, it is (re)enqueued on retry_queue
************************************************/
struct uhd_dpdk_tx_queue {
- pid_t tid;
+ pthread_t tid;
struct rte_ring *queue;
struct rte_ring *retry_queue;
struct rte_ring *freebufs;
+ struct uhd_dpdk_wait_req *waiter;
struct uhd_dpdk_tx_head tx_list;
LIST_ENTRY(uhd_dpdk_tx_queue) entry;
};
@@ -199,6 +205,7 @@ LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port);
* sock_req_ring: Queue for user threads to submit service requests to the lcore
*
* sock_req_ring is a multi-producer, single-consumer queue
+ * It must NOT BE ACCESSED SIMULTANEOUSLY by two threads not using SCHED_OTHER(cfs)
*
* For threads that have ports:
* Launch individually
@@ -208,12 +215,14 @@ LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port);
* REMEMBER: Without args, DPDK creates an lcore for each CPU core!
*/
struct uhd_dpdk_thread {
- unsigned int id;
+ unsigned int lcore;
+ cpu_set_t cpu_affinity;
struct rte_mempool *rx_pktbuf_pool;
struct rte_mempool *tx_pktbuf_pool;
int num_ports;
struct uhd_dpdk_port_head port_list;
struct rte_ring *sock_req_ring;
+ struct rte_ring *waiter_ring;
};
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
index 8388359e7..f603f1f8f 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
@@ -6,6 +6,7 @@
#include "uhd_dpdk_driver.h"
#include "uhd_dpdk_fops.h"
#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_wait.h"
#include <rte_malloc.h>
#include <rte_mempool.h>
#include <arpa/inet.h>
@@ -145,19 +146,25 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str
.dst_port = pkt->dst_port
};
- struct uhd_dpdk_socket *sock = NULL;
- rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock);
- if (!sock) {
+ struct uhd_dpdk_rx_entry *entry = NULL;
+ rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry);
+ if (!entry) {
status = -ENODEV;
+ //RTE_LOG(WARNING, USER1, "%s: Dropping packet to UDP port %d\n", __func__, ntohs(pkt->dst_port));
goto udp_rx_drop;
}
- struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
- status = rte_ring_enqueue(sock->rx_ring, mbuf);
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv;
+ status = rte_ring_enqueue(entry->sock->rx_ring, mbuf);
+ if (entry->waiter) {
+ _uhd_dpdk_waiter_wake(entry->waiter, port->parent);
+ entry->waiter = NULL;
+ }
if (status) {
pdata->dropped_pkts++;
goto udp_rx_drop;
}
+ pdata->xferd_pkts++;
return 0;
udp_rx_drop:
@@ -220,6 +227,7 @@ static int _uhd_dpdk_send(struct uhd_dpdk_port *port,
return status;
}
}
+
status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */
if (status != 1) {
status = rte_ring_enqueue(txq->retry_queue, buf);
@@ -247,6 +255,10 @@ static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port,
/* Enqueue the buffers for the user thread to retrieve */
unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL);
+ if (q->waiter && rte_ring_count(q->freebufs) > 0) {
+ _uhd_dpdk_waiter_wake(q->waiter, port->parent);
+ q->waiter = NULL;
+ }
if (enqd != num_bufs) {
RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n");
return status;
@@ -299,7 +311,19 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t)
}
}
- /* Now can free memory */
+ /* Now clean up waiters
+ * TODO: Determine if better to wake threads
+ */
+ int num_waiters = rte_ring_count(t->waiter_ring);
+ for (int i = 0; i < num_waiters; i++) {
+ struct uhd_dpdk_wait_req *req = NULL;
+ rte_ring_dequeue(t->waiter_ring, (void **) &req);
+ uhd_dpdk_waiter_put(req);
+ }
+ if (rte_ring_count(t->waiter_ring))
+ return -EAGAIN;
+
+ /* Now can free memory, except sock_req_ring and waiter_ring */
LIST_FOREACH(port, &t->port_list, port_entry) {
rte_hash_free(port->rx_table);
@@ -341,29 +365,12 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t)
return 0;
}
-int _uhd_dpdk_driver_main(void *arg)
+static inline int _uhd_dpdk_service_config_req(struct rte_ring *sock_req_ring)
{
-
- /* Don't currently have use for arguments */
- if (arg)
- return -EINVAL;
-
- /* Check that this is a valid lcore */
- unsigned int lcore_id = rte_lcore_id();
- if (lcore_id == LCORE_ID_ANY)
- return -ENODEV;
-
- /* Check that this lcore has ports */
- struct uhd_dpdk_thread *t = &ctx->threads[lcore_id];
- if (t->id != lcore_id)
- return -ENODEV;
-
- RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id);
int status = 0;
- while (!status) {
- /* Check for open()/close() requests and service 1 at a time */
- struct uhd_dpdk_config_req *sock_req;
- if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) {
+ struct uhd_dpdk_config_req *sock_req;
+ if (rte_ring_dequeue(sock_req_ring, (void **) &sock_req) == 0) {
+ if (sock_req) {
/* FIXME: Not checking return vals */
switch (sock_req->req_type) {
case UHD_DPDK_SOCK_OPEN:
@@ -373,7 +380,7 @@ int _uhd_dpdk_driver_main(void *arg)
_uhd_dpdk_sock_release(sock_req);
break;
case UHD_DPDK_LCORE_TERM:
- RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id);
+ RTE_LOG(INFO, EAL, "Terminating lcore %u\n", rte_lcore_id());
status = 1;
_uhd_dpdk_config_req_compl(sock_req, 0);
break;
@@ -381,61 +388,207 @@ int _uhd_dpdk_driver_main(void *arg)
RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type);
break;
}
+ } else {
+ RTE_LOG(ERR, USER1, "%s: NULL service request received\n", __func__);
+ }
+ }
+ return status;
+}
+
+/* Do a burst of RX on port */
+static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port)
+{
+ struct ether_hdr *hdr;
+ char *l2_data;
+ struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE];
+ const uint16_t num_rx = rte_eth_rx_burst(port->id, 0,
+ bufs, UHD_DPDK_RX_BURST_SIZE);
+ if (unlikely(num_rx == 0)) {
+ return;
+ }
+
+ for (int buf = 0; buf < num_rx; buf++) {
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *);
+ l2_data = (char *) &hdr[1];
+ switch (rte_be_to_cpu_16(hdr->ether_type)) {
+ case ETHER_TYPE_ARP:
+ _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data);
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ case ETHER_TYPE_IPv4:
+ if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
+ RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
+ } else {
+ _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
+ }
+ break;
+ default:
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ }
+ }
+}
+
+/* Do a burst of TX on port's tx q */
+static inline int _uhd_dpdk_tx_burst(struct uhd_dpdk_port *port,
+ struct uhd_dpdk_tx_queue *q)
+{
+ if (!rte_ring_empty(q->retry_queue)) {
+ int num_retry = _uhd_dpdk_send(port, q, q->retry_queue);
+ _uhd_dpdk_restore_bufs(port, q, num_retry);
+ if (!rte_ring_empty(q->retry_queue)) {
+ return -EAGAIN;
}
+ }
+ if (rte_ring_empty(q->queue)) {
+ return 0;
+ }
+ int num_tx = _uhd_dpdk_send(port, q, q->queue);
+ if (num_tx > 0) {
+ _uhd_dpdk_restore_bufs(port, q, num_tx);
+ return 0;
+ } else {
+ return num_tx;
+ }
+}
+
+/* Process threads requesting to block on RX */
+static inline void _uhd_dpdk_process_rx_wait(struct uhd_dpdk_thread *t,
+ struct uhd_dpdk_wait_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ if (!sock)
+ goto rx_wait_skip;
+ if (!sock->port)
+ goto rx_wait_skip;
+ if (!sock->port->rx_table)
+ goto rx_wait_skip;
+
+ if (!rte_ring_empty(sock->rx_ring))
+ goto rx_wait_skip;
+
+ struct uhd_dpdk_ipv4_5tuple ht_key;
+ if (_uhd_dpdk_sock_rx_key(sock, &ht_key))
+ goto rx_wait_skip;
+
+ struct uhd_dpdk_rx_entry *entry = NULL;
+ rte_hash_lookup_data(sock->port->rx_table, &ht_key, (void **) &entry);
+ entry->waiter = req;
+ return;
+
+rx_wait_skip:
+ _uhd_dpdk_waiter_wake(req, t);
+}
+
+/* Process threads requesting to block on TX bufs*/
+static inline void _uhd_dpdk_process_tx_buf_wait(struct uhd_dpdk_thread *t,
+ struct uhd_dpdk_wait_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ if (!sock)
+ goto tx_wait_skip;
+ if (!sock->port)
+ goto tx_wait_skip;
+ if (!sock->tx_queue)
+ goto tx_wait_skip;
+
+ struct uhd_dpdk_tx_queue *q = sock->tx_queue;
+ if (!q->freebufs || !q->retry_queue || !q->queue)
+ goto tx_wait_skip;
+
+ if (!rte_ring_empty(q->freebufs))
+ goto tx_wait_skip;
+
+ sock->tx_queue->waiter = req;
+
+ // Attempt to restore bufs only if failed before
+ unsigned int num_bufs = sock->tx_buf_count + rte_ring_count(q->queue) +
+ rte_ring_count(q->retry_queue);
+ unsigned int max_bufs = rte_ring_get_size(q->freebufs);
+ if (num_bufs < max_bufs) {
+ _uhd_dpdk_restore_bufs(sock->port, q, max_bufs - num_bufs);
+ }
+ return;
+
+tx_wait_skip:
+ _uhd_dpdk_waiter_wake(req, t);
+}
+
+/* Process threads making requests to wait */
+static inline void _uhd_dpdk_process_waiters(struct uhd_dpdk_thread *t)
+{
+ int num_waiters = rte_ring_count(t->waiter_ring);
+ num_waiters = (num_waiters > UHD_DPDK_MAX_PENDING_SOCK_REQS) ?
+ UHD_DPDK_MAX_PENDING_SOCK_REQS :
+ num_waiters;
+ for (int i = 0; i < num_waiters; i++) {
+ /* Dequeue */
+ struct uhd_dpdk_wait_req *req = NULL;
+ if (rte_ring_dequeue(t->waiter_ring, (void **) &req))
+ break;
+ switch (req->reason) {
+ case UHD_DPDK_WAIT_SIMPLE:
+ _uhd_dpdk_waiter_wake(req, t);
+ break;
+ case UHD_DPDK_WAIT_RX:
+ _uhd_dpdk_process_rx_wait(t, req);
+ break;
+ default:
+ RTE_LOG(ERR, USER2, "Invalid reason associated with wait request\n");
+ _uhd_dpdk_waiter_wake(req, t);
+ break;
+ }
+ }
+}
+
+int _uhd_dpdk_driver_main(void *arg)
+{
+
+ /* Don't currently have use for arguments */
+ if (arg)
+ return -EINVAL;
+
+ /* Check that this is a valid lcore */
+ unsigned int lcore_id = rte_lcore_id();
+ if (lcore_id == LCORE_ID_ANY)
+ return -ENODEV;
+
+ /* Check that this lcore has ports */
+ struct uhd_dpdk_thread *t = &ctx->threads[lcore_id];
+ if (t->lcore != lcore_id)
+ return -ENODEV;
+
+ pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t),
+ &t->cpu_affinity);
+ char name[16];
+ snprintf(name, sizeof(name), "dpdk-io_%u", lcore_id);
+ pthread_setname_np(pthread_self(), name);
+
+ RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id);
+ int status = 0;
+ while (!status) {
+ /* Check for open()/close()/term() requests and service 1 at a time */
+ status = _uhd_dpdk_service_config_req(t->sock_req_ring);
/* For each port, attempt to receive packets and process */
struct uhd_dpdk_port *port = NULL;
LIST_FOREACH(port, &t->port_list, port_entry) {
- struct ether_hdr *hdr;
- char *l2_data;
- struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE];
- const uint16_t num_rx = rte_eth_rx_burst(port->id, 0,
- bufs, UHD_DPDK_RX_BURST_SIZE);
- if (unlikely(num_rx == 0)) {
- continue;
- }
-
- for (int buf = 0; buf < num_rx; buf++) {
- uint64_t ol_flags = bufs[buf]->ol_flags;
- hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *);
- l2_data = (char *) &hdr[1];
- switch (rte_be_to_cpu_16(hdr->ether_type)) {
- case ETHER_TYPE_ARP:
- _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data);
- rte_pktmbuf_free(bufs[buf]);
- break;
- case ETHER_TYPE_IPv4:
- if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
- RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
- } else {
- _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
- }
- break;
- default:
- rte_pktmbuf_free(bufs[buf]);
- break;
- }
- }
+ _uhd_dpdk_rx_burst(port);
}
+
+ /* TODO: Handle waiter_ring
+ * Also use it for config_req wake retries
+ * Also take care of RX table with new struct w/ waiter
+ * (construction, adding, destruction)
+ */
+ _uhd_dpdk_process_waiters(t);
+
/* For each port's TX queues, do TX */
LIST_FOREACH(port, &t->port_list, port_entry) {
struct uhd_dpdk_tx_queue *q = NULL;
LIST_FOREACH(q, &port->txq_list, entry) {
- if (!rte_ring_empty(q->retry_queue)) {
- int num_retry = _uhd_dpdk_send(port, q, q->retry_queue);
- _uhd_dpdk_restore_bufs(port, q, num_retry);
- if (!rte_ring_empty(q->retry_queue)) {
- break;
- }
- }
- if (rte_ring_empty(q->queue)) {
- continue;
- }
- int num_tx = _uhd_dpdk_send(port, q, q->queue);
- if (num_tx > 0) {
- _uhd_dpdk_restore_bufs(port, q, num_tx);
- } else {
+ if (_uhd_dpdk_tx_burst(port, q))
break;
- }
}
}
}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
index 309e5e643..605f01de3 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c
@@ -5,6 +5,7 @@
//
#include "uhd_dpdk_fops.h"
#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_wait.h"
#include <rte_malloc.h>
#include <rte_ip.h>
@@ -22,24 +23,14 @@
int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval)
{
req->retval = retval;
- int stat = pthread_mutex_trylock(&req->mutex);
- if (stat) {
- RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__);
- return stat;
- }
- stat = pthread_cond_signal(&req->cond);
- pthread_mutex_unlock(&req->mutex);
- if (stat) {
- RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__);
- return stat;
- }
- return 0;
+ int stat = _uhd_dpdk_waiter_wake(req->waiter, req->sock->port->parent);
+ return stat;
}
int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req)
{
int stat = 0;
- switch (req->sock_type) {
+ switch (req->sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
stat = _uhd_dpdk_udp_setup(req);
break;
@@ -53,7 +44,7 @@ int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req)
int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req)
{
int stat = 0;
- switch (req->sock_type) {
+ switch (req->sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
stat = _uhd_dpdk_udp_release(req);
break;
@@ -65,6 +56,22 @@ int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req)
return stat;
}
+int _uhd_dpdk_sock_rx_key(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_ipv4_5tuple *key)
+{
+ int stat = 0;
+ if (!key)
+ return -EINVAL;
+
+ switch (sock->sock_type) {
+ case UHD_DPDK_SOCK_UDP:
+ stat = _uhd_dpdk_udp_rx_key(sock, key);
+ break;
+ default:
+ stat = -EINVAL;
+ }
+ return stat;
+}
/************************************************
* API calls
*/
@@ -85,12 +92,17 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
return NULL;
}
-
struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
if (!req) {
return NULL;
}
+ req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE);
+ if (!req->waiter) {
+ req->retval = -ENOMEM;
+ goto sock_open_end;
+ }
+
struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0);
if (!s) {
goto sock_open_end;
@@ -99,13 +111,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
s->port = port;
req->sock = s;
req->req_type = UHD_DPDK_SOCK_OPEN;
- req->sock_type = t;
+ req->sock->sock_type = t;
req->retval = -ETIMEDOUT;
- pthread_mutex_init(&req->mutex, NULL);
- pthread_condattr_t condattr;
- pthread_condattr_init(&condattr);
- pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC);
- pthread_cond_init(&req->cond, &condattr);
switch (t) {
case UHD_DPDK_SOCK_UDP:
@@ -121,6 +128,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid,
}
sock_open_end:
+ if (req->waiter)
+ uhd_dpdk_waiter_put(req->waiter);
rte_free(req);
return s;
}
@@ -133,12 +142,15 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0);
if (!req)
return -ENOMEM;
+
+ req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE);
+ if (!req->waiter) {
+ rte_free(req);
+ return -ENOMEM;
+ }
req->sock = sock;
req->req_type = UHD_DPDK_SOCK_CLOSE;
- req->sock_type = sock->sock_type;
req->retval = -ETIMEDOUT;
- pthread_mutex_init(&req->mutex, NULL);
- pthread_cond_init(&req->cond, NULL);
switch (sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
@@ -148,6 +160,8 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
break;
}
+ uhd_dpdk_waiter_put(req->waiter);
+
if (req->retval) {
rte_free(req);
return req->retval;
@@ -157,26 +171,34 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock)
return 0;
}
-/*
- * TODO:
- * Add blocking calls with timeout
- * Implementation would involve a condition variable, like config reqs
- * Also would create a cleanup section in I/O main loop (like config reqs)
- */
int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
- unsigned int num_bufs)
+ unsigned int num_bufs, int timeout)
{
if (!sock || !bufs || !num_bufs) {
return -EINVAL;
}
*bufs = NULL;
- if (!sock->tx_ring)
+ if (!sock->tx_queue)
return -EINVAL;
- unsigned int num_tx = rte_ring_count(sock->rx_ring);
+ if (!sock->tx_queue->freebufs)
+ return -EINVAL;
+
+ struct rte_ring *freebufs = sock->tx_queue->freebufs;
+ unsigned int num_tx = rte_ring_count(freebufs);
+ if (timeout != 0 && num_tx == 0) {
+ struct uhd_dpdk_wait_req *req =
+ uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_TX_BUF);
+ req->sock = sock;
+ uhd_dpdk_waiter_wait(req, timeout, sock->port->parent);
+ uhd_dpdk_waiter_put(req);
+ num_tx = rte_ring_count(freebufs);
+ if (!num_tx)
+ return -ETIMEDOUT;
+ }
num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
- if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0)
+ if (rte_ring_dequeue_bulk(freebufs, (void **) bufs, num_tx, NULL) == 0)
return -ENOENT;
sock->tx_buf_count += num_tx;
return num_tx;
@@ -187,9 +209,12 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
{
if (!sock || !bufs || !num_bufs)
return -EINVAL;
- if (!sock->tx_ring)
+ if (!sock->tx_queue)
return -EINVAL;
- unsigned int num_tx = rte_ring_free_count(sock->tx_ring);
+ if (!sock->tx_queue->queue)
+ return -EINVAL;
+ struct rte_ring *tx_ring = sock->tx_queue->queue;
+ unsigned int num_tx = rte_ring_free_count(tx_ring);
num_tx = (num_tx < num_bufs) ? num_tx : num_bufs;
switch (sock->sock_type) {
case UHD_DPDK_SOCK_UDP:
@@ -201,7 +226,7 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__);
return -EINVAL;
}
- int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL);
+ int status = rte_ring_enqueue_bulk(tx_ring, (void **) bufs, num_tx, NULL);
if (status == 0) {
RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n");
return status;
@@ -210,10 +235,6 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
return num_tx;
}
-/*
- * TODO:
- * Add blocking calls with timeout
- */
int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
unsigned int num_bufs, int timeout)
{
@@ -221,11 +242,21 @@ int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs,
return -EINVAL;
if (!sock->rx_ring)
return -EINVAL;
+
unsigned int num_rx = rte_ring_count(sock->rx_ring);
+ if (timeout != 0 && num_rx == 0) {
+ struct uhd_dpdk_wait_req *req =
+ uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_RX);
+ req->sock = sock;
+ uhd_dpdk_waiter_wait(req, timeout, sock->port->parent);
+ uhd_dpdk_waiter_put(req);
+ num_rx = rte_ring_count(sock->rx_ring);
+ if (!num_rx)
+ return -ETIMEDOUT;
+ }
+
num_rx = (num_rx < num_bufs) ? num_rx : num_bufs;
- /* if ((timeout > 0) && (num_rx != num_bufs)) {
- // Wait for enough bufs
- } else*/ if (num_rx) {
+ if (num_rx) {
unsigned int avail = 0;
unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring,
(void **) bufs, num_rx, &avail);
@@ -293,7 +324,7 @@ int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf,
return 0;
}
-int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count)
+int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, size_t *count)
{
if (!sock)
return -EINVAL;
@@ -306,3 +337,17 @@ int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count)
*count = pdata->dropped_pkts;
return 0;
}
+
+int uhd_dpdk_get_xfer_count(struct uhd_dpdk_socket *sock, size_t *count)
+{
+ if (!sock)
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+ if (!sock->priv)
+ return -ENODEV;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ *count = pdata->xferd_pkts;
+ return 0;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
index c07c1913a..66adb0f54 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h
@@ -12,4 +12,12 @@ int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval);
int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req);
int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req);
+
+/*
+ * Get key for RX table corresponding to this socket
+ *
+ * This is primarily used to get access to the waiter entry
+ */
+int _uhd_dpdk_sock_rx_key(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_ipv4_5tuple *key);
#endif /* _UHD_DPDK_FOPS_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
index 26cfd43e1..6ea77b930 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -6,17 +6,20 @@
#include "uhd_dpdk_fops.h"
#include "uhd_dpdk_udp.h"
#include "uhd_dpdk_driver.h"
+#include "uhd_dpdk_wait.h"
#include <rte_ring.h>
#include <rte_malloc.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <arpa/inet.h>
+#define MAX_UDP_PORT 65535
+
/************************************************
* I/O thread ONLY
*/
-static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue)
+static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue)
{
*queue = NULL;
struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
@@ -108,7 +111,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
/* Add to rx table */
if (pdata->dst_port == 0) {
/* Assign unused one in a very slow fashion */
- for (uint16_t i = 1; i > 0; i++) {
+ for (uint16_t i = MAX_UDP_PORT; i > 0; i--) {
ht_key.dst_port = htons(i);
if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) {
pdata->dst_port = htons(i);
@@ -144,9 +147,22 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
_uhd_dpdk_config_req_compl(req, -ENOMEM);
return -ENOMEM;
}
- retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock);
+
+ struct uhd_dpdk_rx_entry *entry = (struct uhd_dpdk_rx_entry *)
+ rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ rte_ring_free(sock->rx_ring);
+ RTE_LOG(ERR, USER1, "%s: Cannot create RX entry\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ entry->sock = sock;
+ entry->waiter = NULL;
+
+ retval = rte_hash_add_key_data(port->rx_table, &ht_key, entry);
if (retval != 0) {
RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval);
+ rte_free(entry);
rte_ring_free(sock->rx_ring);
_uhd_dpdk_config_req_compl(req, retval);
return retval;
@@ -155,25 +171,25 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
}
/* Are we doing TX? */
- if (sock->tx_ring) {
- sock->tx_ring = NULL;
+ if (sock->tx_queue) {
+ sock->tx_queue = NULL;
struct uhd_dpdk_tx_queue *q = NULL;
- LIST_FOREACH(q, &port->txq_list, entry) {
- if (q->tid == sock->tid) {
- LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
- sock->tx_ring = q->queue;
- sock->rx_ring = q->freebufs;
- break;
- }
- }
- if (!sock->tx_ring) {
+ // FIXME Not sharing txq across all thread's sockets for now
+ //LIST_FOREACH(q, &port->txq_list, entry) {
+ // if (pthread_equal(q->tid, sock->tid)) {
+ // LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ // sock->tx_ring = q->queue;
+ // sock->rx_ring = q->freebufs;
+ // break;
+ // }
+ //}
+ if (!sock->tx_queue) {
retval = _alloc_txq(port, sock->tid, &q);
if (retval) {
_uhd_dpdk_config_req_compl(req, retval);
return retval;
}
- sock->tx_ring = q->queue;
- sock->rx_ring = q->freebufs;
+ sock->tx_queue = q;
}
/* If a broadcast type, just finish setup and return */
if (is_broadcast(port, pdata->dst_ipv4_addr)) {
@@ -242,10 +258,23 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
{
struct uhd_dpdk_socket *sock = req->sock;
+ if (req->sock == NULL) {
+ RTE_LOG(ERR, USER1, "%s: no sock in req\n", __func__);
+ return -EINVAL;
+ }
struct uhd_dpdk_port *port = req->sock->port;
struct uhd_dpdk_config_req *conf_req = NULL;
struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
- if (sock->tx_ring) {
+ if (pdata == NULL) {
+ RTE_LOG(ERR, USER1, "%s: no pdata in sock\n", __func__);
+ return -EINVAL;
+ }
+ if (sock->tx_queue) {
+ // FIXME not sharing buffers anymore
+ LIST_REMOVE(sock->tx_queue, entry);
+ rte_ring_free(sock->tx_queue->queue);
+ rte_ring_free(sock->tx_queue->retry_queue);
+
/* Remove from tx_list */
LIST_REMOVE(sock, tx_entry);
/* Check for entry in ARP table */
@@ -260,18 +289,32 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
}
}
- /* Add outstanding buffers back to TX queue's freebufs */
- struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
- int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
- if (status) {
- RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ // FIXME not sharing buffers anymore
+ // Remove outstanding buffers from TX queue's freebufs */
+ unsigned int bufs = rte_ring_count(sock->tx_queue->freebufs);
+ for (unsigned int i = 0; i < bufs; i++) {
+ struct rte_mbuf *buf = NULL;
+ if (rte_ring_dequeue(sock->tx_queue->freebufs, (void **) &buf)) {
+ RTE_LOG(ERR, USER1, "%s: Could not dequeue freebufs\n", __func__);
+ } else if (buf) {
+ rte_pktmbuf_free(buf);
+ }
}
+ rte_ring_free(sock->tx_queue->freebufs);
+ rte_free(sock->tx_queue);
- unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
- if (enqd != (unsigned int) sock->tx_buf_count) {
- RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
- return status;
- }
+ /* Add outstanding buffers back to TX queue's freebufs */
+ //struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ //int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
+ //if (status) {
+ // RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ //}
+
+ //unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
+ //if (enqd != (unsigned int) sock->tx_buf_count) {
+ // RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
+ // return status;
+ //}
} else if (sock->rx_ring) {
struct uhd_dpdk_ipv4_5tuple ht_key = {
.sock_type = UHD_DPDK_SOCK_UDP,
@@ -280,6 +323,13 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
.dst_ip = 0,
.dst_port = pdata->dst_port
};
+ struct uhd_dpdk_rx_entry *entry = NULL;
+ rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry);
+ if (entry) {
+ if (entry->waiter)
+ uhd_dpdk_waiter_put(entry->waiter);
+ rte_free(entry);
+ }
rte_hash_del_key(port->rx_table, &ht_key);
struct rte_mbuf *mbuf = NULL;
while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) {
@@ -292,10 +342,21 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
return 0;
}
+int _uhd_dpdk_udp_rx_key(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_ipv4_5tuple *key)
+{
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (!pdata)
+ return -EINVAL;
+ key->sock_type = UHD_DPDK_SOCK_UDP;
+ key->src_ip = 0;
+ key->src_port = 0;
+ key->dst_ip = 0;
+ key->dst_port = pdata->dst_port;
+ return 0;
+}
+
/* Configure a socket for UDP
- * TODO: Make sure EVERYTHING is configured in socket
- * FIXME: Make all of this part of the user thread, except hash table interaction
- * and list handling
*/
void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
struct uhd_dpdk_sockarg_udp *arg)
@@ -309,8 +370,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
}
struct uhd_dpdk_socket *sock = req->sock;
- pid_t tid = syscall(__NR_gettid);
- sock->tid = tid;
+ sock->tid = pthread_self();
/* Create private data */
struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0);
@@ -324,7 +384,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
if (arg->is_tx) {
data->src_port = arg->local_port;
data->dst_port = arg->remote_port;
- sock->tx_ring = (struct rte_ring *) sock;
+ sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock;
} else {
data->src_port = arg->remote_port;
data->dst_port = arg->local_port;
@@ -333,19 +393,10 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
/* TODO: Add support for I/O thread calling (skip locking and sleep) */
/* Add to port's config queue */
- pthread_mutex_lock(&req->mutex);
- if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
- pthread_mutex_unlock(&req->mutex);
- rte_free(data);
- req->retval = -ENOSPC;
- return;
- }
- struct timespec timeout;
- clock_gettime(CLOCK_MONOTONIC, &timeout);
- timeout.tv_sec += 1;
- pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
- pthread_mutex_unlock(&req->mutex);
-
+ int status = uhd_dpdk_config_req_submit(req, -1, sock->port->parent);
+ if (status)
+ req->retval = status;
+
if (req->retval)
rte_free(data);
}
@@ -355,18 +406,7 @@ void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req)
if (!req)
return;
- pthread_mutex_lock(&req->mutex);
- if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
- pthread_mutex_unlock(&req->mutex);
- rte_free(req->sock->priv);
- req->retval = -ENOSPC;
- return;
- }
- struct timespec timeout;
- clock_gettime(CLOCK_MONOTONIC, &timeout);
- timeout.tv_sec += 1;
- pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
- pthread_mutex_unlock(&req->mutex);
+ uhd_dpdk_config_req_submit(req, -1, req->sock->port->parent);
rte_free(req->sock->priv);
}
@@ -440,7 +480,7 @@ int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock,
return -EINVAL;
struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
- if (sock->tx_ring) {
+ if (sock->tx_queue) {
sockarg->is_tx = true;
sockarg->local_port = pdata->src_port;
sockarg->remote_port = pdata->dst_port;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
index 651ae144e..39fcb8597 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
@@ -13,7 +13,8 @@ struct uhd_dpdk_udp_priv {
uint16_t src_port;
uint16_t dst_port;
uint32_t dst_ipv4_addr;
- uint32_t dropped_pkts;
+ size_t dropped_pkts;
+ size_t xferd_pkts;
/* TODO: Cache destination address ptr to avoid ARP table lookup cost? */
//struct uhd_dpdk_arp_entry *arp_entry;
};
@@ -27,4 +28,12 @@ void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req);
int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock,
struct rte_mbuf *mbuf);
+
+/*
+ * Get key for RX table corresponding to this socket
+ *
+ * This is primarily used to get access to the waiter entry
+ */
+int _uhd_dpdk_udp_rx_key(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_ipv4_5tuple *key);
#endif /* _UHD_DPDK_UDP_H_ */
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c
new file mode 100644
index 000000000..c00eaa3c4
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c
@@ -0,0 +1,114 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_wait.h"
+
+int _uhd_dpdk_waiter_wake(struct uhd_dpdk_wait_req *req,
+ struct uhd_dpdk_thread *t)
+{
+ int stat = pthread_mutex_trylock(&req->mutex);
+ if (stat) {
+ if (rte_ring_full(t->waiter_ring)) {
+ RTE_LOG(ERR, USER2, "%s: Could not lock req mutex\n", __func__);
+ return -ENOBUFS;
+ } else {
+ req->reason = UHD_DPDK_WAIT_SIMPLE;
+ rte_ring_enqueue(t->waiter_ring, req);
+ return -EAGAIN;
+ }
+ }
+ stat = pthread_cond_signal(&req->cond);
+ if (stat)
+ RTE_LOG(ERR, USER2, "%s: Could not signal req cond\n", __func__);
+ pthread_mutex_unlock(&req->mutex);
+ uhd_dpdk_waiter_put(req);
+ return stat;
+}
+
+struct uhd_dpdk_wait_req *uhd_dpdk_waiter_alloc(enum uhd_dpdk_wait_type reason)
+{
+ struct uhd_dpdk_wait_req *req;
+ req = (struct uhd_dpdk_wait_req *) rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req)
+ return NULL;
+
+ pthread_mutex_init(&req->mutex, NULL);
+ pthread_condattr_t condattr;
+ pthread_condattr_init(&condattr);
+ pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC);
+ pthread_cond_init(&req->cond, &condattr);
+ rte_atomic32_set(&req->refcnt, 1);
+ req->reason = reason;
+ return req;
+}
+
+static inline void uhd_dpdk_waiter_prepare(struct uhd_dpdk_wait_req *req)
+{
+ pthread_mutex_lock(&req->mutex);
+ /* Get a reference here, to be consumed by other thread (handshake) */
+ uhd_dpdk_waiter_get(req);
+}
+
+static inline int uhd_dpdk_waiter_submit(struct uhd_dpdk_wait_req *req,
+ int timeout)
+{
+ int retval = 0;
+ if (timeout < 0) {
+ retval = pthread_cond_wait(&req->cond, &req->mutex);
+ } else {
+ struct timespec timeout_spec;
+ clock_gettime(CLOCK_MONOTONIC, &timeout_spec);
+ timeout_spec.tv_sec += timeout/1000000;
+ timeout_spec.tv_nsec += (timeout % 1000000)*1000;
+ if (timeout_spec.tv_nsec > 1000000000) {
+ timeout_spec.tv_sec++;
+ timeout_spec.tv_nsec -= 1000000000;
+ }
+ retval = pthread_cond_timedwait(&req->cond, &req->mutex, &timeout_spec);
+ }
+ return retval;
+}
+
+int uhd_dpdk_waiter_wait(struct uhd_dpdk_wait_req *req, int timeout,
+ struct uhd_dpdk_thread *t)
+{
+ int ret;
+ if (!req || !t)
+ return -EINVAL;
+
+ uhd_dpdk_waiter_prepare(req);
+
+ ret = rte_ring_enqueue(t->waiter_ring, req);
+ if (ret) {
+ uhd_dpdk_waiter_put(req);
+ pthread_mutex_unlock(&req->mutex);
+ return ret;
+ }
+
+ uhd_dpdk_waiter_submit(req, timeout);
+ pthread_mutex_unlock(&req->mutex);
+ return 0;
+}
+
+int uhd_dpdk_config_req_submit(struct uhd_dpdk_config_req *req,
+ int timeout, struct uhd_dpdk_thread *t)
+{
+ int ret;
+ if (!req || !t)
+ return -EINVAL;
+
+ uhd_dpdk_waiter_prepare(req->waiter);
+
+ ret = rte_ring_enqueue(t->sock_req_ring, req);
+ if (ret) {
+ uhd_dpdk_waiter_put(req->waiter);
+ pthread_mutex_unlock(&req->waiter->mutex);
+ return ret;
+ }
+
+ uhd_dpdk_waiter_submit(req->waiter, timeout);
+ pthread_mutex_unlock(&req->waiter->mutex);
+ return 0;
+}
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h
new file mode 100644
index 000000000..465608810
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _UHD_DPDK_WAIT_H_
+#define _UHD_DPDK_WAIT_H_
+
+#include "uhd_dpdk_ctx.h"
+#include <rte_malloc.h>
+
+enum uhd_dpdk_wait_type {
+ UHD_DPDK_WAIT_SIMPLE,
+ UHD_DPDK_WAIT_RX,
+ UHD_DPDK_WAIT_TX_BUF,
+ UHD_DPDK_WAIT_TYPE_COUNT
+};
+
+struct uhd_dpdk_wait_req {
+ enum uhd_dpdk_wait_type reason;
+ struct uhd_dpdk_socket *sock;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ rte_atomic32_t refcnt; /* free resources only when refcnt = 0 */
+};
+
+static inline void uhd_dpdk_waiter_put(struct uhd_dpdk_wait_req *req)
+{
+ if (rte_atomic32_dec_and_test(&req->refcnt)) {
+ rte_free(req);
+ }
+}
+
+static inline void uhd_dpdk_waiter_get(struct uhd_dpdk_wait_req *req)
+{
+ rte_atomic32_inc(&req->refcnt);
+}
+
+/*
+ * Attempt to wake thread
+ * Re-enqueue waiter to thread's waiter_queue if fail
+ */
+int _uhd_dpdk_waiter_wake(struct uhd_dpdk_wait_req *req,
+ struct uhd_dpdk_thread *t);
+
+/*
+ * Allocates wait request and sets refcnt to 1
+ */
+struct uhd_dpdk_wait_req *uhd_dpdk_waiter_alloc(enum uhd_dpdk_wait_type reason);
+
+/*
+ * Block and send wait request to thread t
+ */
+int uhd_dpdk_waiter_wait(struct uhd_dpdk_wait_req *req, int timeout,
+ struct uhd_dpdk_thread *t);
+
+/*
+ * Block and submit config request to thread t
+ */
+int uhd_dpdk_config_req_submit(struct uhd_dpdk_config_req *req,
+ int timeout, struct uhd_dpdk_thread *t);
+#endif /* _UHD_DPDK_WAIT_H_ */