diff options
author | Alex Williams <alex.williams@ni.com> | 2018-08-31 11:35:07 -0700 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-01-15 17:14:57 -0800 |
commit | e2195ac505bd423d3d2f973bbe94da1c78296aa6 (patch) | |
tree | 296ffd98c620c4ad3e313cd697891418af26cc94 | |
parent | e2cde21ceb7497dcc1ef25156afa6472fe64f009 (diff) | |
download | uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.gz uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.tar.bz2 uhd-e2195ac505bd423d3d2f973bbe94da1c78296aa6.zip |
transport: Add blocking recv calls to uhd-dpdk
This adds an internal wait queue API to uhd-dpdk. Socket configuration
requests had their blocking calls re-implemented on top of this API, and
it is also used to service requests to wait on RX packets (w/ timeout).
The wait API involves a multi-producer, single-consumer queue per I/O
thread (waiter_ring), with a condition variable used for sleeping. The
data structure is shared between user thread and I/O thread, and because
timeouts make resource release time non-deterministic, we use reference
counting on the shared resource.
One reference is generated by the user thread and passed to the I/O
thread to consume. A user thread that still needs the data after waking
must get() another reference, to postpone the destruction of the
resource until it is done.
Timeouts are based on CLOCK_MONOTONIC. For recv, a timeout of 0
indicates blocking indefinitely, and a negative timeout indicates no
timeout is desired.
Also drop timeout for closing sockets in uhd-dpdk.
The timeout would allow a user thread to pre-empt the I/O thread's
cleanup process. The user thread would free data structures the I/O
thread still needed to function. Since this timeout is superfluous
anyway, let's just get rid of it.
Also add some more input checking and error reporting.
-rw-r--r-- | host/lib/include/uhd/transport/uhd-dpdk.h | 16 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 4 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk.c | 72 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h | 45 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 299 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c | 135 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h | 8 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 158 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h | 11 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c | 114 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h | 62 |
11 files changed, 704 insertions, 220 deletions
diff --git a/host/lib/include/uhd/transport/uhd-dpdk.h b/host/lib/include/uhd/transport/uhd-dpdk.h index 68438fe40..8d46912bd 100644 --- a/host/lib/include/uhd/transport/uhd-dpdk.h +++ b/host/lib/include/uhd/transport/uhd-dpdk.h @@ -148,10 +148,11 @@ int uhd_dpdk_destroy(void); * @param sock pointer to socket * @param bufs pointer to array of buffers (to store buffer locations) * @param num_bufs number of buffers requested + * @param timeout Time (in us) to wait for a buffer * * @return Returns number of buffers retrieved or negative error code */ -int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs); +int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs, int timeout); /** * Enqueues num_bufs buffers in sock TX buffer. Uses pointers to buffers in bufs table. @@ -241,7 +242,18 @@ int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, struct uhd_dpdk_sockarg_ * * @return Return 0 for success, else failed */ -int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count); +int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, size_t *count); + +/** + * Get transferred packet count of provided socket + * Currently only tracks received packets (i.e. for RX) + * + * @param sock Socket to get information from + * @param count Pointer to location where information will be stored + * + * @return Return 0 for success, else failed + */ +int uhd_dpdk_get_xfer_count(struct uhd_dpdk_socket *sock, size_t *count); #ifdef __cplusplus } diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index be683aaf2..0beb1eed5 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -22,13 +22,15 @@ if(ENABLE_DPDK) ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c ) set_source_files_properties( ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_driver.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_fops.c ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_udp.c - PROPERTIES COMPILE_FLAGS ${UHD_DPDK_CFLAGS} + ${CMAKE_CURRENT_SOURCE_DIR}/uhd_dpdk_wait.c + PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" ) include_directories(${DPDK_INCLUDE_DIR}) LIBUHD_APPEND_LIBS(${DPDK_LIBRARIES}) diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c index d6a8a5aa6..1be6b2335 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -4,9 +4,11 @@ // SPDX-License-Identifier: GPL-3.0-or-later // #include "uhd_dpdk_ctx.h" +#include "uhd_dpdk_wait.h" #include "uhd_dpdk_udp.h" #include "uhd_dpdk_driver.h" #include <stdlib.h> +#include <sched.h> #include <rte_errno.h> #include <rte_malloc.h> #include <rte_log.h> @@ -25,7 +27,6 @@ struct uhd_dpdk_ctx *ctx = NULL; /* TODO: For nice scheduling options later, make sure to separate RX and TX activity */ - int uhd_dpdk_port_count(void) { if (!ctx) @@ -89,9 +90,11 @@ static inline int uhd_dpdk_port_init(struct uhd_dpdk_port *port, /* Set up Ethernet device with defaults (1 RX ring, 1 TX ring) */ retval = rte_eth_dev_set_mtu(port->id, mtu); if (retval) { + uint16_t actual_mtu; RTE_LOG(WARNING, EAL, "%d: Could not set mtu to %d\n", retval, mtu); - rte_eth_dev_get_mtu(port->id, &mtu); - RTE_LOG(WARNING, EAL, "Current mtu=%d\n", mtu); + rte_eth_dev_get_mtu(port->id, &actual_mtu); + RTE_LOG(WARNING, EAL, "Current mtu=%d\n", actual_mtu); + mtu = actual_mtu; } /* FIXME: Check if hw_ip_checksum is possible */ struct rte_eth_conf port_conf = { @@ -175,19 +178,19 @@ port_init_fail: return rte_errno; } -static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id) +static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lcore) { if (!ctx || !thread) return -EINVAL; - unsigned int socket_id = rte_lcore_to_socket_id(id); - thread->id = id; + unsigned int socket_id = rte_lcore_to_socket_id(lcore); + thread->lcore = lcore; thread->rx_pktbuf_pool = ctx->rx_pktbuf_pools[socket_id]; thread->tx_pktbuf_pool = ctx->tx_pktbuf_pools[socket_id]; LIST_INIT(&thread->port_list); char name[32]; - snprintf(name, sizeof(name), "sockreq_ring_%u", id); + snprintf(name, sizeof(name), "sockreq_ring_%u", lcore); thread->sock_req_ring = rte_ring_create( name, UHD_DPDK_MAX_PENDING_SOCK_REQS, @@ -196,6 +199,15 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int id) ); if (!thread->sock_req_ring) return -ENOMEM; + snprintf(name, sizeof(name), "waiter_ring_%u", lcore); + thread->waiter_ring = rte_ring_create( + name, + UHD_DPDK_MAX_WAITERS, + socket_id, + RING_F_SC_DEQ + ); + if (!thread->waiter_ring) + return -ENOMEM; return 0; } @@ -290,7 +302,7 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, continue; if (((unsigned int) thread_id) == master_lcore) RTE_LOG(WARNING, EAL, "User requested master lcore for port %u\n", i); - if (ctx->threads[thread_id].id != (unsigned int) thread_id) + if (ctx->threads[thread_id].lcore != (unsigned int) thread_id) rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); struct uhd_dpdk_port *port = &ctx->ports[i]; @@ -310,14 +322,33 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, /* FIXME: Create functions to do this */ RTE_LOG(INFO, EAL, "Starting I/O threads!\n"); + cpu_set_t io_cpuset; + CPU_ZERO(&io_cpuset); for (int i = rte_get_next_lcore(-1, 1, 0); (i < RTE_MAX_LCORE); i = rte_get_next_lcore(i, 1, 0)) { struct uhd_dpdk_thread *t = &ctx->threads[i]; if (!LIST_EMPTY(&t->port_list)) { - rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].id); - } + rte_eal_remote_launch(_uhd_dpdk_driver_main, NULL, ctx->threads[i].lcore); + struct uhd_dpdk_wait_req *waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!waiter) { + rte_exit(EXIT_FAILURE, "%s: Failed to get wait request\n", __func__); + } + uhd_dpdk_waiter_get(waiter); + uhd_dpdk_waiter_wait(waiter, -1, &ctx->threads[i]); + uhd_dpdk_waiter_put(waiter); + CPU_OR(&io_cpuset, &io_cpuset, &t->cpu_affinity); + } + } + cpu_set_t user_cpuset; + CPU_ZERO(&user_cpuset); + for (int i = 0; i < CPU_SETSIZE; i++) { + CPU_SET(i, &user_cpuset); + } + CPU_XOR(&user_cpuset, &user_cpuset, &io_cpuset); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &user_cpuset)) { + rte_exit(EXIT_FAILURE, "%s: Failed to set CPU affinity\n", __func__); } return 0; } @@ -332,6 +363,12 @@ int uhd_dpdk_destroy(void) if (!req) return -ENOMEM; + req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!req->waiter) { + rte_free(req); + return -ENOMEM; + } + req->req_type = UHD_DPDK_LCORE_TERM; for (int i = rte_get_next_lcore(-1, 1, 0); @@ -343,26 +380,19 @@ int uhd_dpdk_destroy(void) if (LIST_EMPTY(&t->port_list)) continue; - if (rte_eal_get_lcore_state(t->id) == FINISHED) + if (rte_eal_get_lcore_state(t->lcore) == FINISHED) continue; - pthread_mutex_init(&req->mutex, NULL); - pthread_cond_init(&req->cond, NULL); - pthread_mutex_lock(&req->mutex); if (rte_ring_enqueue(t->sock_req_ring, req)) { - pthread_mutex_unlock(&req->mutex); RTE_LOG(ERR, USER2, "Failed to terminate thread %d\n", i); + rte_free(req->waiter); rte_free(req); return -ENOSPC; } - struct timespec timeout = { - .tv_sec = 1, - .tv_nsec = 0 - }; - pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); - pthread_mutex_unlock(&req->mutex); + uhd_dpdk_config_req_submit(req, 1, t); } + uhd_dpdk_waiter_put(req->waiter); rte_free(req); return 0; } diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h index c6afab85d..ca0fa77b3 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h @@ -13,6 +13,7 @@ #include <rte_mbuf.h> #include <rte_hash.h> #include <rte_eal.h> +#include <rte_atomic.h> #include <uhd/transport/uhd-dpdk.h> //#include <pthread.h> @@ -20,16 +21,18 @@ #define UHD_DPDK_MAX_SOCKET_CNT 1024 #define UHD_DPDK_MAX_PENDING_SOCK_REQS 16 +#define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT #define UHD_DPDK_TXQ_SIZE 64 #define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1) #define UHD_DPDK_RXQ_SIZE 64 #define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1) struct uhd_dpdk_port; +struct uhd_dpdk_tx_queue; /** * - * All memory allocation for port, rx_ring, and tx_ring owned by I/O thread + * All memory allocation for port, rx_ring, and tx_queue owned by I/O thread * Rest owned by user thread * * port: port servicing this socket @@ -37,7 +40,7 @@ struct uhd_dpdk_port; * sock_type: Type of socket * priv: Private data, based on sock_type * rx_ring: pointer to individual rx_ring (created during init--Also used as free buffer ring for TX) - * tx_ring: pointer to shared tx_ring (with all sockets for this tid) + * tx_queue: pointer to tx queue structure * tx_buf_count: Number of buffers currently outside the rings * tx_entry: List node for TX Queue tracking * @@ -46,19 +49,21 @@ struct uhd_dpdk_port; */ struct uhd_dpdk_socket { struct uhd_dpdk_port *port; - pid_t tid; + pthread_t tid; enum uhd_dpdk_sock_type sock_type; void *priv; struct rte_ring *rx_ring; - struct rte_ring *tx_ring; + struct uhd_dpdk_tx_queue *tx_queue; int tx_buf_count; LIST_ENTRY(uhd_dpdk_socket) tx_entry; }; LIST_HEAD(uhd_dpdk_tx_head, uhd_dpdk_socket); /************************************************ - * Configuration + * Configuration and Blocking ************************************************/ +struct uhd_dpdk_wait_req; + enum uhd_dpdk_sock_req { UHD_DPDK_SOCK_OPEN = 0, UHD_DPDK_SOCK_CLOSE, @@ -70,20 +75,20 @@ enum uhd_dpdk_sock_req { * port: port associated with this request * sock: socket associated with this request * req_type: Open, Close, or terminate lcore - * sock_type: Only udp is supported * cond: Used to sleep until socket creation is finished * mutex: associated with cond * entry: List node for requests pending ARP responses * priv: private data * retval: Result of call (needed post-wakeup) + * + * config_reqs are assumed not to time out + * The interaction with wait_reqs currently makes this impossible to do safely */ struct uhd_dpdk_config_req { struct uhd_dpdk_port *port; struct uhd_dpdk_socket *sock; enum uhd_dpdk_sock_req req_type; - enum uhd_dpdk_sock_type sock_type; - pthread_cond_t cond; - pthread_mutex_t mutex; + struct uhd_dpdk_wait_req *waiter; LIST_ENTRY(uhd_dpdk_config_req) entry; void *priv; int retval; @@ -107,24 +112,25 @@ struct uhd_dpdk_ipv4_5tuple { }; /** - * Used for blocking calls to RX + * Entry for RX table + * req used for blocking calls to RX */ -struct uhd_dpdk_sock_cond { +struct uhd_dpdk_rx_entry { struct uhd_dpdk_socket *sock; - pthread_cond_t cond; - pthread_mutex_t mutex; + struct uhd_dpdk_wait_req *waiter; }; /************************************************ * TX Queues * - * 1 TX Queue per thread sending through a hardware port + * 1 TX Queue per socket sending through a hardware port * All memory allocation owned by I/O thread * * tid: thread id * queue: TX queue holding threads prepared packets (via send()) * retry_queue: queue holding packets that couldn't be sent * freebufs: queue holding empty buffers + * waiter: Request to wait for a free buffer * tx_list: list of sockets using this queue * entry: list node for port to track TX queues * @@ -133,17 +139,17 @@ struct uhd_dpdk_sock_cond { * For queue, user thread is producer, I/O thread is consumer * For freebufs, user thread is consumer, I/O thread is consumer * - * All queues are same size, and they are shared between all sockets on one - * thread (tid is the identifier) + * All queues are same size * 1. Buffers start in freebufs (user gets buffers from freebufs) * 2. User submits packet to queue * 3. If packet couldn't be sent, it is (re)enqueued on retry_queue ************************************************/ struct uhd_dpdk_tx_queue { - pid_t tid; + pthread_t tid; struct rte_ring *queue; struct rte_ring *retry_queue; struct rte_ring *freebufs; + struct uhd_dpdk_wait_req *waiter; struct uhd_dpdk_tx_head tx_list; LIST_ENTRY(uhd_dpdk_tx_queue) entry; }; @@ -199,6 +205,7 @@ LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port); * sock_req_ring: Queue for user threads to submit service requests to the lcore * * sock_req_ring is a multi-producer, single-consumer queue + * It must NOT BE ACCESSED SIMULTANEOUSLY by two threads not using SCHED_OTHER(cfs) * * For threads that have ports: * Launch individually @@ -208,12 +215,14 @@ LIST_HEAD(uhd_dpdk_port_head, uhd_dpdk_port); * REMEMBER: Without args, DPDK creates an lcore for each CPU core! */ struct uhd_dpdk_thread { - unsigned int id; + unsigned int lcore; + cpu_set_t cpu_affinity; struct rte_mempool *rx_pktbuf_pool; struct rte_mempool *tx_pktbuf_pool; int num_ports; struct uhd_dpdk_port_head port_list; struct rte_ring *sock_req_ring; + struct rte_ring *waiter_ring; }; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c index 8388359e7..f603f1f8f 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -6,6 +6,7 @@ #include "uhd_dpdk_driver.h" #include "uhd_dpdk_fops.h" #include "uhd_dpdk_udp.h" +#include "uhd_dpdk_wait.h" #include <rte_malloc.h> #include <rte_mempool.h> #include <arpa/inet.h> @@ -145,19 +146,25 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str .dst_port = pkt->dst_port }; - struct uhd_dpdk_socket *sock = NULL; - rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &sock); - if (!sock) { + struct uhd_dpdk_rx_entry *entry = NULL; + rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry); + if (!entry) { status = -ENODEV; + //RTE_LOG(WARNING, USER1, "%s: Dropping packet to UDP port %d\n", __func__, ntohs(pkt->dst_port)); goto udp_rx_drop; } - struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; - status = rte_ring_enqueue(sock->rx_ring, mbuf); + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv; + status = rte_ring_enqueue(entry->sock->rx_ring, mbuf); + if (entry->waiter) { + _uhd_dpdk_waiter_wake(entry->waiter, port->parent); + entry->waiter = NULL; + } if (status) { pdata->dropped_pkts++; goto udp_rx_drop; } + pdata->xferd_pkts++; return 0; udp_rx_drop: @@ -220,6 +227,7 @@ static int _uhd_dpdk_send(struct uhd_dpdk_port *port, return status; } } + status = rte_eth_tx_burst(port->id, 0, &buf, 1); /* Automatically frees mbuf */ if (status != 1) { status = rte_ring_enqueue(txq->retry_queue, buf); @@ -247,6 +255,10 @@ static inline int _uhd_dpdk_restore_bufs(struct uhd_dpdk_port *port, /* Enqueue the buffers for the user thread to retrieve */ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) freebufs, num_bufs, NULL); + if (q->waiter && rte_ring_count(q->freebufs) > 0) { + _uhd_dpdk_waiter_wake(q->waiter, port->parent); + q->waiter = NULL; + } if (enqd != num_bufs) { RTE_LOG(ERR, USER1, "Could not enqueue pktmbufs!\n"); return status; @@ -299,7 +311,19 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) } } - /* Now can free memory */ + /* Now clean up waiters + * TODO: Determine if better to wake threads + */ + int num_waiters = rte_ring_count(t->waiter_ring); + for (int i = 0; i < num_waiters; i++) { + struct uhd_dpdk_wait_req *req = NULL; + rte_ring_dequeue(t->waiter_ring, (void **) &req); + uhd_dpdk_waiter_put(req); + } + if (rte_ring_count(t->waiter_ring)) + return -EAGAIN; + + /* Now can free memory, except sock_req_ring and waiter_ring */ LIST_FOREACH(port, &t->port_list, port_entry) { rte_hash_free(port->rx_table); @@ -341,29 +365,12 @@ static inline int _uhd_dpdk_driver_cleanup(struct uhd_dpdk_thread *t) return 0; } -int _uhd_dpdk_driver_main(void *arg) +static inline int _uhd_dpdk_service_config_req(struct rte_ring *sock_req_ring) { - - /* Don't currently have use for arguments */ - if (arg) - return -EINVAL; - - /* Check that this is a valid lcore */ - unsigned int lcore_id = rte_lcore_id(); - if (lcore_id == LCORE_ID_ANY) - return -ENODEV; - - /* Check that this lcore has ports */ - struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; - if (t->id != lcore_id) - return -ENODEV; - - RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); int status = 0; - while (!status) { - /* Check for open()/close() requests and service 1 at a time */ - struct uhd_dpdk_config_req *sock_req; - if (rte_ring_dequeue(t->sock_req_ring, (void **) &sock_req) == 0) { + struct uhd_dpdk_config_req *sock_req; + if (rte_ring_dequeue(sock_req_ring, (void **) &sock_req) == 0) { + if (sock_req) { /* FIXME: Not checking return vals */ switch (sock_req->req_type) { case UHD_DPDK_SOCK_OPEN: @@ -373,7 +380,7 @@ int _uhd_dpdk_driver_main(void *arg) _uhd_dpdk_sock_release(sock_req); break; case UHD_DPDK_LCORE_TERM: - RTE_LOG(INFO, EAL, "Terminating lcore %u\n", lcore_id); + RTE_LOG(INFO, EAL, "Terminating lcore %u\n", rte_lcore_id()); status = 1; _uhd_dpdk_config_req_compl(sock_req, 0); break; @@ -381,61 +388,207 @@ int _uhd_dpdk_driver_main(void *arg) RTE_LOG(ERR, USER2, "Invalid socket request %d\n", sock_req->req_type); break; } + } else { + RTE_LOG(ERR, USER1, "%s: NULL service request received\n", __func__); + } + } + return status; +} + +/* Do a burst of RX on port */ +static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port) +{ + struct ether_hdr *hdr; + char *l2_data; + struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; + const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, + bufs, UHD_DPDK_RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + return; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); + l2_data = (char *) &hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); + } else { + _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } +} + +/* Do a burst of TX on port's tx q */ +static inline int _uhd_dpdk_tx_burst(struct uhd_dpdk_port *port, + struct uhd_dpdk_tx_queue *q) +{ + if (!rte_ring_empty(q->retry_queue)) { + int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); + _uhd_dpdk_restore_bufs(port, q, num_retry); + if (!rte_ring_empty(q->retry_queue)) { + return -EAGAIN; } + } + if (rte_ring_empty(q->queue)) { + return 0; + } + int num_tx = _uhd_dpdk_send(port, q, q->queue); + if (num_tx > 0) { + _uhd_dpdk_restore_bufs(port, q, num_tx); + return 0; + } else { + return num_tx; + } +} + +/* Process threads requesting to block on RX */ +static inline void _uhd_dpdk_process_rx_wait(struct uhd_dpdk_thread *t, + struct uhd_dpdk_wait_req *req) +{ + struct uhd_dpdk_socket *sock = req->sock; + if (!sock) + goto rx_wait_skip; + if (!sock->port) + goto rx_wait_skip; + if (!sock->port->rx_table) + goto rx_wait_skip; + + if (!rte_ring_empty(sock->rx_ring)) + goto rx_wait_skip; + + struct uhd_dpdk_ipv4_5tuple ht_key; + if (_uhd_dpdk_sock_rx_key(sock, &ht_key)) + goto rx_wait_skip; + + struct uhd_dpdk_rx_entry *entry = NULL; + rte_hash_lookup_data(sock->port->rx_table, &ht_key, (void **) &entry); + entry->waiter = req; + return; + +rx_wait_skip: + _uhd_dpdk_waiter_wake(req, t); +} + +/* Process threads requesting to block on TX bufs*/ +static inline void _uhd_dpdk_process_tx_buf_wait(struct uhd_dpdk_thread *t, + struct uhd_dpdk_wait_req *req) +{ + struct uhd_dpdk_socket *sock = req->sock; + if (!sock) + goto tx_wait_skip; + if (!sock->port) + goto tx_wait_skip; + if (!sock->tx_queue) + goto tx_wait_skip; + + struct uhd_dpdk_tx_queue *q = sock->tx_queue; + if (!q->freebufs || !q->retry_queue || !q->queue) + goto tx_wait_skip; + + if (!rte_ring_empty(q->freebufs)) + goto tx_wait_skip; + + sock->tx_queue->waiter = req; + + // Attempt to restore bufs only if failed before + unsigned int num_bufs = sock->tx_buf_count + rte_ring_count(q->queue) + + rte_ring_count(q->retry_queue); + unsigned int max_bufs = rte_ring_get_size(q->freebufs); + if (num_bufs < max_bufs) { + _uhd_dpdk_restore_bufs(sock->port, q, max_bufs - num_bufs); + } + return; + +tx_wait_skip: + _uhd_dpdk_waiter_wake(req, t); +} + +/* Process threads making requests to wait */ +static inline void _uhd_dpdk_process_waiters(struct uhd_dpdk_thread *t) +{ + int num_waiters = rte_ring_count(t->waiter_ring); + num_waiters = (num_waiters > UHD_DPDK_MAX_PENDING_SOCK_REQS) ? + UHD_DPDK_MAX_PENDING_SOCK_REQS : + num_waiters; + for (int i = 0; i < num_waiters; i++) { + /* Dequeue */ + struct uhd_dpdk_wait_req *req = NULL; + if (rte_ring_dequeue(t->waiter_ring, (void **) &req)) + break; + switch (req->reason) { + case UHD_DPDK_WAIT_SIMPLE: + _uhd_dpdk_waiter_wake(req, t); + break; + case UHD_DPDK_WAIT_RX: + _uhd_dpdk_process_rx_wait(t, req); + break; + default: + RTE_LOG(ERR, USER2, "Invalid reason associated with wait request\n"); + _uhd_dpdk_waiter_wake(req, t); + break; + } + } +} + +int _uhd_dpdk_driver_main(void *arg) +{ + + /* Don't currently have use for arguments */ + if (arg) + return -EINVAL; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + struct uhd_dpdk_thread *t = &ctx->threads[lcore_id]; + if (t->lcore != lcore_id) + return -ENODEV; + + pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), + &t->cpu_affinity); + char name[16]; + snprintf(name, sizeof(name), "dpdk-io_%u", lcore_id); + pthread_setname_np(pthread_self(), name); + + RTE_LOG(INFO, USER2, "Thread %d started\n", lcore_id); + int status = 0; + while (!status) { + /* Check for open()/close()/term() requests and service 1 at a time */ + status = _uhd_dpdk_service_config_req(t->sock_req_ring); /* For each port, attempt to receive packets and process */ struct uhd_dpdk_port *port = NULL; LIST_FOREACH(port, &t->port_list, port_entry) { - struct ether_hdr *hdr; - char *l2_data; - struct rte_mbuf *bufs[UHD_DPDK_RX_BURST_SIZE]; - const uint16_t num_rx = rte_eth_rx_burst(port->id, 0, - bufs, UHD_DPDK_RX_BURST_SIZE); - if (unlikely(num_rx == 0)) { - continue; - } - - for (int buf = 0; buf < num_rx; buf++) { - uint64_t ol_flags = bufs[buf]->ol_flags; - hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr *); - l2_data = (char *) &hdr[1]; - switch (rte_be_to_cpu_16(hdr->ether_type)) { - case ETHER_TYPE_ARP: - _uhd_dpdk_process_arp(port, (struct arp_hdr *) l2_data); - rte_pktmbuf_free(bufs[buf]); - break; - case ETHER_TYPE_IPv4: - if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ - RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); - } else { - _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); - } - break; - default: - rte_pktmbuf_free(bufs[buf]); - break; - } - } + _uhd_dpdk_rx_burst(port); } + + /* TODO: Handle waiter_ring + * Also use it for config_req wake retries + * Also take care of RX table with new struct w/ waiter + * (construction, adding, destruction) + */ + _uhd_dpdk_process_waiters(t); + /* For each port's TX queues, do TX */ LIST_FOREACH(port, &t->port_list, port_entry) { struct uhd_dpdk_tx_queue *q = NULL; LIST_FOREACH(q, &port->txq_list, entry) { - if (!rte_ring_empty(q->retry_queue)) { - int num_retry = _uhd_dpdk_send(port, q, q->retry_queue); - _uhd_dpdk_restore_bufs(port, q, num_retry); - if (!rte_ring_empty(q->retry_queue)) { - break; - } - } - if (rte_ring_empty(q->queue)) { - continue; - } - int num_tx = _uhd_dpdk_send(port, q, q->queue); - if (num_tx > 0) { - _uhd_dpdk_restore_bufs(port, q, num_tx); - } else { + if (_uhd_dpdk_tx_burst(port, q)) break; - } } } } diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c index 309e5e643..605f01de3 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.c @@ -5,6 +5,7 @@ // #include "uhd_dpdk_fops.h" #include "uhd_dpdk_udp.h" +#include "uhd_dpdk_wait.h" #include <rte_malloc.h> #include <rte_ip.h> @@ -22,24 +23,14 @@ int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval) { req->retval = retval; - int stat = pthread_mutex_trylock(&req->mutex); - if (stat) { - RTE_LOG(ERR, USER1, "%s: Could not lock req mutex\n", __func__); - return stat; - } - stat = pthread_cond_signal(&req->cond); - pthread_mutex_unlock(&req->mutex); - if (stat) { - RTE_LOG(ERR, USER1, "%s: Could not signal req cond\n", __func__); - return stat; - } - return 0; + int stat = _uhd_dpdk_waiter_wake(req->waiter, req->sock->port->parent); + return stat; } int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req) { int stat = 0; - switch (req->sock_type) { + switch (req->sock->sock_type) { case UHD_DPDK_SOCK_UDP: stat = _uhd_dpdk_udp_setup(req); break; @@ -53,7 +44,7 @@ int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req) int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req) { int stat = 0; - switch (req->sock_type) { + switch (req->sock->sock_type) { case UHD_DPDK_SOCK_UDP: stat = _uhd_dpdk_udp_release(req); break; @@ -65,6 +56,22 @@ int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req) return stat; } +int _uhd_dpdk_sock_rx_key(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_ipv4_5tuple *key) +{ + int stat = 0; + if (!key) + return -EINVAL; + + switch (sock->sock_type) { + case UHD_DPDK_SOCK_UDP: + stat = _uhd_dpdk_udp_rx_key(sock, key); + break; + default: + stat = -EINVAL; + } + return stat; +} /************************************************ * API calls */ @@ -85,12 +92,17 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, return NULL; } - struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); if (!req) { return NULL; } + req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!req->waiter) { + req->retval = -ENOMEM; + goto sock_open_end; + } + struct uhd_dpdk_socket *s = (struct uhd_dpdk_socket *) rte_zmalloc(NULL, sizeof(*s), 0); if (!s) { goto sock_open_end; @@ -99,13 +111,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, s->port = port; req->sock = s; req->req_type = UHD_DPDK_SOCK_OPEN; - req->sock_type = t; + req->sock->sock_type = t; req->retval = -ETIMEDOUT; - pthread_mutex_init(&req->mutex, NULL); - pthread_condattr_t condattr; - pthread_condattr_init(&condattr); - pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC); - pthread_cond_init(&req->cond, &condattr); switch (t) { case UHD_DPDK_SOCK_UDP: @@ -121,6 +128,8 @@ struct uhd_dpdk_socket* uhd_dpdk_sock_open(unsigned int portid, } sock_open_end: + if (req->waiter) + uhd_dpdk_waiter_put(req->waiter); rte_free(req); return s; } @@ -133,12 +142,15 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) struct uhd_dpdk_config_req *req = (struct uhd_dpdk_config_req *) rte_zmalloc(NULL, sizeof(*req), 0); if (!req) return -ENOMEM; + + req->waiter = uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_SIMPLE); + if (!req->waiter) { + rte_free(req); + return -ENOMEM; + } req->sock = sock; req->req_type = UHD_DPDK_SOCK_CLOSE; - req->sock_type = sock->sock_type; req->retval = -ETIMEDOUT; - pthread_mutex_init(&req->mutex, NULL); - pthread_cond_init(&req->cond, NULL); switch (sock->sock_type) { case UHD_DPDK_SOCK_UDP: @@ -148,6 +160,8 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) break; } + uhd_dpdk_waiter_put(req->waiter); + if (req->retval) { rte_free(req); return req->retval; @@ -157,26 +171,34 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock) return 0; } -/* - * TODO: - * Add blocking calls with timeout - * Implementation would involve a condition variable, like config reqs - * Also would create a cleanup section in I/O main loop (like config reqs) - */ int uhd_dpdk_request_tx_bufs(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, - unsigned int num_bufs) + unsigned int num_bufs, int timeout) { if (!sock || !bufs || !num_bufs) { return -EINVAL; } *bufs = NULL; - if (!sock->tx_ring) + if (!sock->tx_queue) return -EINVAL; - unsigned int num_tx = rte_ring_count(sock->rx_ring); + if (!sock->tx_queue->freebufs) + return -EINVAL; + + struct rte_ring *freebufs = sock->tx_queue->freebufs; + unsigned int num_tx = rte_ring_count(freebufs); + if (timeout != 0 && num_tx == 0) { + struct uhd_dpdk_wait_req *req = + uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_TX_BUF); + req->sock = sock; + uhd_dpdk_waiter_wait(req, timeout, sock->port->parent); + uhd_dpdk_waiter_put(req); + num_tx = rte_ring_count(freebufs); + if (!num_tx) + return -ETIMEDOUT; + } num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; - if (rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_tx, NULL) == 0) + if (rte_ring_dequeue_bulk(freebufs, (void **) bufs, num_tx, NULL) == 0) return -ENOENT; sock->tx_buf_count += num_tx; return num_tx; @@ -187,9 +209,12 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, { if (!sock || !bufs || !num_bufs) return -EINVAL; - if (!sock->tx_ring) + if (!sock->tx_queue) return -EINVAL; - unsigned int num_tx = rte_ring_free_count(sock->tx_ring); + if (!sock->tx_queue->queue) + return -EINVAL; + struct rte_ring *tx_ring = sock->tx_queue->queue; + unsigned int num_tx = rte_ring_free_count(tx_ring); num_tx = (num_tx < num_bufs) ? num_tx : num_bufs; switch (sock->sock_type) { case UHD_DPDK_SOCK_UDP: @@ -201,7 +226,7 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, RTE_LOG(ERR, USER1, "%s: Unsupported sock type\n", __func__); return -EINVAL; } - int status = rte_ring_enqueue_bulk(sock->tx_ring, (void **) bufs, num_tx, NULL); + int status = rte_ring_enqueue_bulk(tx_ring, (void **) bufs, num_tx, NULL); if (status == 0) { RTE_LOG(ERR, USER1, "Invalid shared usage of TX ring detected\n"); return status; @@ -210,10 +235,6 @@ int uhd_dpdk_send(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, return num_tx; } -/* - * TODO: - * Add blocking calls with timeout - */ int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, unsigned int num_bufs, int timeout) { @@ -221,11 +242,21 @@ int uhd_dpdk_recv(struct uhd_dpdk_socket *sock, struct rte_mbuf **bufs, return -EINVAL; if (!sock->rx_ring) return -EINVAL; + unsigned int num_rx = rte_ring_count(sock->rx_ring); + if (timeout != 0 && num_rx == 0) { + struct uhd_dpdk_wait_req *req = + uhd_dpdk_waiter_alloc(UHD_DPDK_WAIT_RX); + req->sock = sock; + uhd_dpdk_waiter_wait(req, timeout, sock->port->parent); + uhd_dpdk_waiter_put(req); + num_rx = rte_ring_count(sock->rx_ring); + if (!num_rx) + return -ETIMEDOUT; + } + num_rx = (num_rx < num_bufs) ? num_rx : num_bufs; - /* if ((timeout > 0) && (num_rx != num_bufs)) { - // Wait for enough bufs - } else*/ if (num_rx) { + if (num_rx) { unsigned int avail = 0; unsigned int status = rte_ring_dequeue_bulk(sock->rx_ring, (void **) bufs, num_rx, &avail); @@ -293,7 +324,7 @@ int uhd_dpdk_get_src_ipv4(struct uhd_dpdk_socket *sock, struct rte_mbuf *buf, return 0; } -int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count) +int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, size_t *count) { if (!sock) return -EINVAL; @@ -306,3 +337,17 @@ int uhd_dpdk_get_drop_count(struct uhd_dpdk_socket *sock, uint32_t *count) *count = pdata->dropped_pkts; return 0; } + +int uhd_dpdk_get_xfer_count(struct uhd_dpdk_socket *sock, size_t *count) +{ + if (!sock) + return -EINVAL; + if (sock->sock_type != UHD_DPDK_SOCK_UDP) + return -EINVAL; + if (!sock->priv) + return -ENODEV; + + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + *count = pdata->xferd_pkts; + return 0; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h index c07c1913a..66adb0f54 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_fops.h @@ -12,4 +12,12 @@ int _uhd_dpdk_config_req_compl(struct uhd_dpdk_config_req *req, int retval); int _uhd_dpdk_sock_setup(struct uhd_dpdk_config_req *req); int _uhd_dpdk_sock_release(struct uhd_dpdk_config_req *req); + +/* + * Get key for RX table corresponding to this socket + * + * This is primarily used to get access to the waiter entry + */ +int _uhd_dpdk_sock_rx_key(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_ipv4_5tuple *key); #endif /* _UHD_DPDK_FOPS_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c index 26cfd43e1..6ea77b930 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -6,17 +6,20 @@ #include "uhd_dpdk_fops.h" #include "uhd_dpdk_udp.h" #include "uhd_dpdk_driver.h" +#include "uhd_dpdk_wait.h" #include <rte_ring.h> #include <rte_malloc.h> #include <unistd.h> #include <sys/syscall.h> #include <arpa/inet.h> +#define MAX_UDP_PORT 65535 + /************************************************ * I/O thread ONLY */ -static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue) +static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue) { *queue = NULL; struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); @@ -108,7 +111,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) /* Add to rx table */ if (pdata->dst_port == 0) { /* Assign unused one in a very slow fashion */ - for (uint16_t i = 1; i > 0; i++) { + for (uint16_t i = MAX_UDP_PORT; i > 0; i--) { ht_key.dst_port = htons(i); if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) { pdata->dst_port = htons(i); @@ -144,9 +147,22 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) _uhd_dpdk_config_req_compl(req, -ENOMEM); return -ENOMEM; } - retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock); + + struct uhd_dpdk_rx_entry *entry = (struct uhd_dpdk_rx_entry *) + rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + rte_ring_free(sock->rx_ring); + RTE_LOG(ERR, USER1, "%s: Cannot create RX entry\n", __func__); + _uhd_dpdk_config_req_compl(req, -ENOMEM); + return -ENOMEM; + } + entry->sock = sock; + entry->waiter = NULL; + + retval = rte_hash_add_key_data(port->rx_table, &ht_key, entry); if (retval != 0) { RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval); + rte_free(entry); rte_ring_free(sock->rx_ring); _uhd_dpdk_config_req_compl(req, retval); return retval; @@ -155,25 +171,25 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) } /* Are we doing TX? */ - if (sock->tx_ring) { - sock->tx_ring = NULL; + if (sock->tx_queue) { + sock->tx_queue = NULL; struct uhd_dpdk_tx_queue *q = NULL; - LIST_FOREACH(q, &port->txq_list, entry) { - if (q->tid == sock->tid) { - LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); - sock->tx_ring = q->queue; - sock->rx_ring = q->freebufs; - break; - } - } - if (!sock->tx_ring) { + // FIXME Not sharing txq across all thread's sockets for now + //LIST_FOREACH(q, &port->txq_list, entry) { + // if (pthread_equal(q->tid, sock->tid)) { + // LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry); + // sock->tx_ring = q->queue; + // sock->rx_ring = q->freebufs; + // break; + // } + //} + if (!sock->tx_queue) { retval = _alloc_txq(port, sock->tid, &q); if (retval) { _uhd_dpdk_config_req_compl(req, retval); return retval; } - sock->tx_ring = q->queue; - sock->rx_ring = q->freebufs; + sock->tx_queue = q; } /* If a broadcast type, just finish setup and return */ if (is_broadcast(port, pdata->dst_ipv4_addr)) { @@ -242,10 +258,23 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) { struct uhd_dpdk_socket *sock = req->sock; + if (req->sock == NULL) { + RTE_LOG(ERR, USER1, "%s: no sock in req\n", __func__); + return -EINVAL; + } struct uhd_dpdk_port *port = req->sock->port; struct uhd_dpdk_config_req *conf_req = NULL; struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; - if (sock->tx_ring) { + if (pdata == NULL) { + RTE_LOG(ERR, USER1, "%s: no pdata in sock\n", __func__); + return -EINVAL; + } + if (sock->tx_queue) { + // FIXME not sharing buffers anymore + LIST_REMOVE(sock->tx_queue, entry); + rte_ring_free(sock->tx_queue->queue); + rte_ring_free(sock->tx_queue->retry_queue); + /* Remove from tx_list */ LIST_REMOVE(sock, tx_entry); /* Check for entry in ARP table */ @@ -260,18 +289,32 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) } } - /* Add outstanding buffers back to TX queue's freebufs */ - struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; - int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); - if (status) { - RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); + // FIXME not sharing buffers anymore + // Remove outstanding buffers from TX queue's freebufs */ + unsigned int bufs = rte_ring_count(sock->tx_queue->freebufs); + for (unsigned int i = 0; i < bufs; i++) { + struct rte_mbuf *buf = NULL; + if (rte_ring_dequeue(sock->tx_queue->freebufs, (void **) &buf)) { + RTE_LOG(ERR, USER1, "%s: Could not dequeue freebufs\n", __func__); + } else if (buf) { + rte_pktmbuf_free(buf); + } } + rte_ring_free(sock->tx_queue->freebufs); + rte_free(sock->tx_queue); - unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); - if (enqd != (unsigned int) sock->tx_buf_count) { - RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); - return status; - } + /* Add outstanding buffers back to TX queue's freebufs */ + //struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE]; + //int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count); + //if (status) { + // RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count); + //} + + //unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL); + //if (enqd != (unsigned int) sock->tx_buf_count) { + // RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__); + // return status; + //} } else if (sock->rx_ring) { struct uhd_dpdk_ipv4_5tuple ht_key = { .sock_type = UHD_DPDK_SOCK_UDP, @@ -280,6 +323,13 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) .dst_ip = 0, .dst_port = pdata->dst_port }; + struct uhd_dpdk_rx_entry *entry = NULL; + rte_hash_lookup_data(port->rx_table, &ht_key, (void **) &entry); + if (entry) { + if (entry->waiter) + uhd_dpdk_waiter_put(entry->waiter); + rte_free(entry); + } rte_hash_del_key(port->rx_table, &ht_key); struct rte_mbuf *mbuf = NULL; while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) { @@ -292,10 +342,21 @@ int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req) return 0; } +int _uhd_dpdk_udp_rx_key(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_ipv4_5tuple *key) +{ + struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; + if (!pdata) + return -EINVAL; + key->sock_type = UHD_DPDK_SOCK_UDP; + key->src_ip = 0; + key->src_port = 0; + key->dst_ip = 0; + key->dst_port = pdata->dst_port; + return 0; +} + /* Configure a socket for UDP - * TODO: Make sure EVERYTHING is configured in socket - * FIXME: Make all of this part of the user thread, except hash table interaction - * and list handling */ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, struct uhd_dpdk_sockarg_udp *arg) @@ -309,8 +370,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, } struct uhd_dpdk_socket *sock = req->sock; - pid_t tid = syscall(__NR_gettid); - sock->tid = tid; + sock->tid = pthread_self(); /* Create private data */ struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0); @@ -324,7 +384,7 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, if (arg->is_tx) { data->src_port = arg->local_port; data->dst_port = arg->remote_port; - sock->tx_ring = (struct rte_ring *) sock; + sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock; } else { data->src_port = arg->remote_port; data->dst_port = arg->local_port; @@ -333,19 +393,10 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, /* TODO: Add support for I/O thread calling (skip locking and sleep) */ /* Add to port's config queue */ - pthread_mutex_lock(&req->mutex); - if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { - pthread_mutex_unlock(&req->mutex); - rte_free(data); - req->retval = -ENOSPC; - return; - } - struct timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += 1; - pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); - pthread_mutex_unlock(&req->mutex); - + int status = uhd_dpdk_config_req_submit(req, -1, sock->port->parent); + if (status) + req->retval = status; + if (req->retval) rte_free(data); } @@ -355,18 +406,7 @@ void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req) if (!req) return; - pthread_mutex_lock(&req->mutex); - if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) { - pthread_mutex_unlock(&req->mutex); - rte_free(req->sock->priv); - req->retval = -ENOSPC; - return; - } - struct timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += 1; - pthread_cond_timedwait(&req->cond, &req->mutex, &timeout); - pthread_mutex_unlock(&req->mutex); + uhd_dpdk_config_req_submit(req, -1, req->sock->port->parent); rte_free(req->sock->priv); } @@ -440,7 +480,7 @@ int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock, return -EINVAL; struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv; - if (sock->tx_ring) { + if (sock->tx_queue) { sockarg->is_tx = true; sockarg->local_port = pdata->src_port; sockarg->remote_port = pdata->dst_port; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h index 651ae144e..39fcb8597 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h @@ -13,7 +13,8 @@ struct uhd_dpdk_udp_priv { uint16_t src_port; uint16_t dst_port; uint32_t dst_ipv4_addr; - uint32_t dropped_pkts; + size_t dropped_pkts; + size_t xferd_pkts; /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */ //struct uhd_dpdk_arp_entry *arp_entry; }; @@ -27,4 +28,12 @@ void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req); int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock, struct rte_mbuf *mbuf); + +/* + * Get key for RX table corresponding to this socket + * + * This is primarily used to get access to the waiter entry + */ +int _uhd_dpdk_udp_rx_key(struct uhd_dpdk_socket *sock, + struct uhd_dpdk_ipv4_5tuple *key); #endif /* _UHD_DPDK_UDP_H_ */ diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c new file mode 100644 index 000000000..c00eaa3c4 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.c @@ -0,0 +1,114 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "uhd_dpdk_wait.h" + +int _uhd_dpdk_waiter_wake(struct uhd_dpdk_wait_req *req, + struct uhd_dpdk_thread *t) +{ + int stat = pthread_mutex_trylock(&req->mutex); + if (stat) { + if (rte_ring_full(t->waiter_ring)) { + RTE_LOG(ERR, USER2, "%s: Could not lock req mutex\n", __func__); + return -ENOBUFS; + } else { + req->reason = UHD_DPDK_WAIT_SIMPLE; + rte_ring_enqueue(t->waiter_ring, req); + return -EAGAIN; + } + } + stat = pthread_cond_signal(&req->cond); + if (stat) + RTE_LOG(ERR, USER2, "%s: Could not signal req cond\n", __func__); + pthread_mutex_unlock(&req->mutex); + uhd_dpdk_waiter_put(req); + return stat; +} + +struct uhd_dpdk_wait_req *uhd_dpdk_waiter_alloc(enum uhd_dpdk_wait_type reason) +{ + struct uhd_dpdk_wait_req *req; + req = (struct uhd_dpdk_wait_req *) rte_zmalloc(NULL, sizeof(*req), 0); + if (!req) + return NULL; + + pthread_mutex_init(&req->mutex, NULL); + pthread_condattr_t condattr; + pthread_condattr_init(&condattr); + pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC); + pthread_cond_init(&req->cond, &condattr); + rte_atomic32_set(&req->refcnt, 1); + req->reason = reason; + return req; +} + +static inline void uhd_dpdk_waiter_prepare(struct uhd_dpdk_wait_req *req) +{ + pthread_mutex_lock(&req->mutex); + /* Get a reference here, to be consumed by other thread (handshake) */ + uhd_dpdk_waiter_get(req); +} + +static inline int uhd_dpdk_waiter_submit(struct uhd_dpdk_wait_req *req, + int timeout) +{ + int retval = 0; + if (timeout < 0) { + retval = pthread_cond_wait(&req->cond, &req->mutex); + } else { + struct timespec timeout_spec; + clock_gettime(CLOCK_MONOTONIC, &timeout_spec); + timeout_spec.tv_sec += timeout/1000000; + timeout_spec.tv_nsec += (timeout % 1000000)*1000; + if (timeout_spec.tv_nsec > 1000000000) { + timeout_spec.tv_sec++; + timeout_spec.tv_nsec -= 1000000000; + } + retval = pthread_cond_timedwait(&req->cond, &req->mutex, &timeout_spec); + } + return retval; +} + +int uhd_dpdk_waiter_wait(struct uhd_dpdk_wait_req *req, int timeout, + struct uhd_dpdk_thread *t) +{ + int ret; + if (!req || !t) + return -EINVAL; + + uhd_dpdk_waiter_prepare(req); + + ret = rte_ring_enqueue(t->waiter_ring, req); + if (ret) { + uhd_dpdk_waiter_put(req); + pthread_mutex_unlock(&req->mutex); + return ret; + } + + uhd_dpdk_waiter_submit(req, timeout); + pthread_mutex_unlock(&req->mutex); + return 0; +} + +int uhd_dpdk_config_req_submit(struct uhd_dpdk_config_req *req, + int timeout, struct uhd_dpdk_thread *t) +{ + int ret; + if (!req || !t) + return -EINVAL; + + uhd_dpdk_waiter_prepare(req->waiter); + + ret = rte_ring_enqueue(t->sock_req_ring, req); + if (ret) { + uhd_dpdk_waiter_put(req->waiter); + pthread_mutex_unlock(&req->waiter->mutex); + return ret; + } + + uhd_dpdk_waiter_submit(req->waiter, timeout); + pthread_mutex_unlock(&req->waiter->mutex); + return 0; +} diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h new file mode 100644 index 000000000..465608810 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_wait.h @@ -0,0 +1,62 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _UHD_DPDK_WAIT_H_ +#define _UHD_DPDK_WAIT_H_ + +#include "uhd_dpdk_ctx.h" +#include <rte_malloc.h> + +enum uhd_dpdk_wait_type { + UHD_DPDK_WAIT_SIMPLE, + UHD_DPDK_WAIT_RX, + UHD_DPDK_WAIT_TX_BUF, + UHD_DPDK_WAIT_TYPE_COUNT +}; + +struct uhd_dpdk_wait_req { + enum uhd_dpdk_wait_type reason; + struct uhd_dpdk_socket *sock; + pthread_cond_t cond; + pthread_mutex_t mutex; + rte_atomic32_t refcnt; /* free resources only when refcnt = 0 */ +}; + +static inline void uhd_dpdk_waiter_put(struct uhd_dpdk_wait_req *req) +{ + if (rte_atomic32_dec_and_test(&req->refcnt)) { + rte_free(req); + } +} + +static inline void uhd_dpdk_waiter_get(struct uhd_dpdk_wait_req *req) +{ + rte_atomic32_inc(&req->refcnt); +} + +/* + * Attempt to wake thread + * Re-enqueue waiter to thread's waiter_queue if fail + */ +int _uhd_dpdk_waiter_wake(struct uhd_dpdk_wait_req *req, + struct uhd_dpdk_thread *t); + +/* + * Allocates wait request and sets refcnt to 1 + */ +struct uhd_dpdk_wait_req *uhd_dpdk_waiter_alloc(enum uhd_dpdk_wait_type reason); + +/* + * Block and send wait request to thread t + */ +int uhd_dpdk_waiter_wait(struct uhd_dpdk_wait_req *req, int timeout, + struct uhd_dpdk_thread *t); + +/* + * Block and submit config request to thread t + */ +int uhd_dpdk_config_req_submit(struct uhd_dpdk_config_req *req, + int timeout, struct uhd_dpdk_thread *t); +#endif /* _UHD_DPDK_WAIT_H_ */ |