diff options
author | Alex Williams <alex.williams@ni.com> | 2018-08-31 11:35:07 -0700 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-15 17:14:57 -0800 |
commit | e2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch) | |
tree | 296ffd98c620c4ad3e313cd697891418af26cc94 /host/lib/transport/uhd-dpdk/uhd_dpdk.c | |
parent | e2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff) | |
download | uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2 uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip |
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration
requests had their blocking calls re-implemented on top of this API, and
it is also used to service requests to wait on RX packets (w/ timeout).
The wait API involves a multi-producer, single-consumer queue per I/O
thread (waiter_ring), with a condition variable used for sleeping. The
data structure is shared between user thread and I/O thread, and because
timeouts make resource release time non-deterministic, we use reference
counting on the shared resource.
One reference is generated by the user thread and passed to the I/O
thread to consume. A user thread that still needs the data after waking
must get() another reference, to postpone the destruction of the
resource until it is done.
Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0
indicates blocking indefinitely, and a negative timeout indicates no
timeout is desired.
Also drop timeout for closing sockets in uhd-dpdk.
The timeout would allow a user thread to pre-empt the I/O thread's
cleanup process. The user thread would free data structures the I/O
thread still needed to function. Since this timeout is superfluous
anyway, let's just get rid of it.
Also add some more input checking and error reporting.
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk.c')
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk.c | 72 |
1 files changed, 51 insertions, 21 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c index d6a8a5aa6..1be6b2335 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -4,9 +4,11 @@ // SPDX-License-Identifier: GPL-3.0-or-later // #include "uhd_dpdk_ctx.h" +#include "uhd_dpdk_wait.h" #include "uhd_dpdk_udp.h" #include "uhd_dpdk_driver.h" #include <stdlib.h> +#include <sched.h> #include <rte_errno.h> #include <rte_malloc.h> #include <rte_log.h> @@ -25,7 +27,6 @@ struct uhd_dpdk_ctx *ctx = NULL; /* TODO: For nice scheduling options later, make sure to separate RX and TX activity */ - int uhd_dpdk_port_count(void) { if (!ctx) @@ -89,9 +90,11 @@ static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port, /* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */ retval = rte_eth_dev_set_mtu(port->id, mtu); if (retval) { + uint16_t actual_mtu; RTE_LOG(WARNING, EAL, "%d: Could not set mtu to %d\n", retval, mtu); - rte_eth_dev_get_mtu(port->id, &mtu); - RTE_LOG(WARNING, EAL, "Current mtu=%d\n", mtu); + rte_eth_dev_get_mtu(port->id, &actual_mtu); + RTE_LOG(WARNING, EAL, "Current mtu=%d\n", actual_mtu); + mtu = actual_mtu; } /* FIXME: Check if hw_ip_checksum is possible */ struct rte_eth_conf port_conf = { @@ -175,19 +178,19 @@ port_init_fail: return rte_errno; } -static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id) +static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lcore) { if (!ctx || !thread) return -EINVAL; - unsigned int socket_id = rte_lcore_to_socket_id(id); - thread->id = id; + unsigned int socket_id = rte_lcore_to_socket_id(lcore); + thread->lcore = lcore; thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id]; thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id]; LIST_INIT(&thread->port_list); char name[32]; - snprintf(name, sizeof(name), "sockreq_ring_%u", id); + snprintf(name, sizeof(name), "sockreq_ring_%u", lcore); thread->sock_req_ring = rte_ring_create( name, UHD_DPDK_MAX_PENDING_SOCK_REQS, @@ -196,6 +199,15 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id) ); if (!thread->sock_req_ring) return -ENOMEM; + snprintf(name, sizeof(name), "waiter_ring_%u", lcore); + thread->waiter_ring = rte_ring_create( + name, + UHD_DPDK_MAX_WAITERS, + socket_id, + RING_F_SC_DEQ + ); + if (!thread->waiter_ring) + return -ENOMEM; return 0; } @@ -290,7 +302,7 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, continue; if (((unsigned int) thread_id) == master_lcore) RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i); - if (ctx->threads[thread_id].id != (unsigned int) thread_id) + if (ctx->threads[thread_id].lcore != (unsigned int) thread_id) rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); struct uhd_dpdk_port *port = &ctx->ports[i]; @@ -310,14 +322,33 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, /* FIXME: Create functions to do this */ RTE_LOG(INFO, EAL, "Starting I/O threads!\n"); + cpu_set_t io_cpuset; + CPU_ZERO(&io_cpuset); for (int i = rte_get_next_lcore(-1, 1, 0); (i < RTE_MAX_LCORE); i = rte_get_next_lcore(i, 1, 0)) { struct uhd_dpdk_thread *t = &ctx->threads[i]; if (!LIST_EMPTY(&t->port_list)) { - rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id); - } + rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].lcore); + struct uhd_dpdk_wait_req *waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!waiter) { + rte_exit(EXIT_FAILURE, "%s: Failed to get wait request\n", __func__); + } + uhd_dpdk_waiter_get(waiter); + uhd_dpdk_waiter_wait(waiter, -1, &ctx->threads[i]); + uhd_dpdk_waiter_put(waiter); + CPU_OR(&io_cpuset, &io_cpuset, &t->cpu_affinity); + } + } + cpu_set_t user_cpuset; + CPU_ZERO(&user_cpuset); + for (int i = 0; i < CPU_SETSIZE; i++) { + CPU_SET(i, &user_cpuset); + } + CPU_XOR(&user_cpuset, &user_cpuset, &io_cpuset); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &user_cpuset)) { + rte_exit(EXIT_FAILURE, "%s: Failed to set CPU affinity\n", __func__); } return 0; } @@ -332,6 +363,12 @@ int uhd_dpdk_destroy(void) if (!req) return -ENOMEM; + req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!req->waiter) { + rte_free(req); + return -ENOMEM; + } + req->req_type = UHD_DPDK_LCORE_TERM; for (int i = rte_get_next_lcore(-1, 1, 0); @@ -343,26 +380,19 @@ int uhd_dpdk_destroy(void) if (LIST_EMPTY(&t->port_list)) continue; - if (rte_eal_get_lcore_state(t->id) == FINISHED) + if (rte_eal_get_lcore_state(t->lcore) == FINISHED) continue; - pthread_mutex_init(&req->mutex, NULL); - pthread_cond_init(&req->cond, NULL); - pthread_mutex_lock(&req->mutex); if (rte_ring_enqueue(t->sock_req_ring, req)) { - pthread_mutex_unlock(&req->mutex); RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i); + rte_free(req->waiter); rte_free(req); return -ENOSPC; } - struct timespec timeout = { - .tv_sec = 1, - .tv_nsec = 0 - }; - pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); - pthread_mutex_unlock(&req->mutex); + uhd_dpdk_config_req_submit(req, 1, t); } + uhd_dpdk_waiter_put(req->waiter); rte_free(req); return 0; } |