aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c')
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c456
1 files changed, 456 insertions, 0 deletions
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
new file mode 100644
index 000000000..26cfd43e1
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -0,0 +1,456 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "uhd_dpdk_fops.h"
+#include "uhd_dpdk_udp.h"
+#include "uhd_dpdk_driver.h"
+#include <rte_ring.h>
+#include <rte_malloc.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <arpa/inet.h>
+
+/************************************************
+ * I/O thread ONLY
+ */
+
+static int _alloc_txq(struct uhd_dpdk_port *port, pid_t tid, struct uhd_dpdk_tx_queue **queue)
+{
+ *queue = NULL;
+ struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
+ if (!q) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate TX queue\n", __func__);
+ return -ENOMEM;
+ }
+ q->tid = tid;
+ LIST_INIT(&q->tx_list);
+
+ char name[32];
+ snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid);
+ q->queue = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid);
+ q->freebufs = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ /* Set up retry queue */
+ snprintf(name, sizeof(name), "retry_queue_%u", port->id);
+ q->retry_queue = rte_ring_create(
+ name,
+ UHD_DPDK_TXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+
+ if (!q->queue || !q->freebufs || !q->retry_queue) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate TX rings\n", __func__);
+ if (q->queue)
+ rte_ring_free(q->queue);
+ if (q->freebufs)
+ rte_ring_free(q->freebufs);
+ if (q->retry_queue)
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ return -ENOMEM;
+ }
+ struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
+ unsigned int num_bufs = rte_ring_free_count(q->freebufs);
+ int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
+ if (buf_stat) {
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->queue);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
+ return -ENOENT;
+ }
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
+ }
+ LIST_INSERT_HEAD(&port->txq_list, q, entry);
+ *queue = q;
+ return 0;
+}
+
+/* Finish setting up UDP socket (unless ARP needs to be done)
+ * Not multi-thread safe!
+ * This call should only be used by the thread servicing the port
+ * In addition, the code below assumes simplex sockets and unique receive ports
+ * FIXME: May need some way to help clean up abandoned socket requests (refcnt check...)
+ */
+int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
+{
+ int retval = 0;
+ struct uhd_dpdk_socket *sock = req->sock;
+ struct uhd_dpdk_udp_priv *pdata = sock->priv;
+ struct uhd_dpdk_port *port = req->sock->port;
+
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pdata->dst_port
+ };
+
+ /* Are we doing RX? */
+ if (sock->rx_ring) {
+ /* Add to rx table */
+ if (pdata->dst_port == 0) {
+ /* Assign unused one in a very slow fashion */
+ for (uint16_t i = 1; i > 0; i++) {
+ ht_key.dst_port = htons(i);
+ if (rte_hash_lookup(port->rx_table, &ht_key) == -ENOENT) {
+ pdata->dst_port = htons(i);
+ break;
+ }
+ }
+ }
+
+ /* Is the port STILL invalid? */
+ if (pdata->dst_port == 0) {
+ RTE_LOG(ERR, USER1, "%s: No available UDP ports\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -EADDRINUSE);
+ return -EADDRINUSE;
+ }
+
+ ht_key.dst_port = pdata->dst_port;
+ if (rte_hash_lookup(port->rx_table, &ht_key) > 0) {
+ RTE_LOG(ERR, USER1, "%s: Cannot add to RX table\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -EADDRINUSE);
+ return -EADDRINUSE;
+ }
+
+ char name[32];
+ snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port));
+ sock->rx_ring = rte_ring_create(
+ name,
+ UHD_DPDK_RXQ_SIZE,
+ rte_socket_id(),
+ RING_F_SC_DEQ | RING_F_SP_ENQ
+ );
+ if (!sock->rx_ring) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate RX ring\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ retval = rte_hash_add_key_data(port->rx_table, &ht_key, sock);
+ if (retval != 0) {
+ RTE_LOG(WARNING, TABLE, "Could not add new RX socket to port %d: %d\n", port->id, retval);
+ rte_ring_free(sock->rx_ring);
+ _uhd_dpdk_config_req_compl(req, retval);
+ return retval;
+ }
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+
+ /* Are we doing TX? */
+ if (sock->tx_ring) {
+ sock->tx_ring = NULL;
+ struct uhd_dpdk_tx_queue *q = NULL;
+ LIST_FOREACH(q, &port->txq_list, entry) {
+ if (q->tid == sock->tid) {
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ sock->tx_ring = q->queue;
+ sock->rx_ring = q->freebufs;
+ break;
+ }
+ }
+ if (!sock->tx_ring) {
+ retval = _alloc_txq(port, sock->tid, &q);
+ if (retval) {
+ _uhd_dpdk_config_req_compl(req, retval);
+ return retval;
+ }
+ sock->tx_ring = q->queue;
+ sock->rx_ring = q->freebufs;
+ }
+ /* If a broadcast type, just finish setup and return */
+ if (is_broadcast(port, pdata->dst_ipv4_addr)) {
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ _uhd_dpdk_config_req_compl(req, 0);
+ return 0;
+ }
+ /* Otherwise... Check for entry in ARP table */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ int arp_table_stat = rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry);
+ if (entry) {
+ /* Check for null entry */
+ if ((entry->mac_addr.addr_bytes[0] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[1] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[2] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[3] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[4] == 0xFF) &&
+ (entry->mac_addr.addr_bytes[5] == 0xFF)) {
+ arp_table_stat = -ENOENT;
+ }
+ } else {
+ /* No entry -> Add null entry */
+ entry = rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate ARP entry\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ memset(entry->mac_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
+ LIST_INIT(&entry->pending_list);
+
+ if (rte_hash_add_key_data(port->arp_table, &pdata->dst_ipv4_addr, entry) < 0) {
+ rte_free(entry);
+ RTE_LOG(ERR, USER1, "%s: Cannot add entry to ARP table\n", __func__);
+ _uhd_dpdk_config_req_compl(req, -ENOMEM);
+ return -ENOMEM;
+ }
+ }
+
+ /* Was there a valid address? */
+ if (arp_table_stat == -ENOENT) {
+ /* Get valid address and have requestor continue waiting */
+ int arp_stat = 0;
+ do { /* Keep trying to send request if no descriptor */
+ arp_stat = _uhd_dpdk_arp_request(port, pdata->dst_ipv4_addr);
+ } while (arp_stat == -EAGAIN);
+
+ if (arp_stat) {
+ /* Config request errors out */
+ RTE_LOG(ERR, USER1, "%s: Cannot make ARP request\n", __func__);
+ _uhd_dpdk_config_req_compl(req, arp_stat);
+ return arp_stat;
+ }
+ /* Append req to pending list. Will signal later. */
+ LIST_INSERT_HEAD(&entry->pending_list, req, entry);
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ } else {
+ /* We have a valid address. All good. */
+ LIST_INSERT_HEAD(&q->tx_list, sock, tx_entry);
+ _uhd_dpdk_config_req_compl(req, 0);
+ }
+ }
+ return 0;
+}
+
+int _uhd_dpdk_udp_release(struct uhd_dpdk_config_req *req)
+{
+ struct uhd_dpdk_socket *sock = req->sock;
+ struct uhd_dpdk_port *port = req->sock->port;
+ struct uhd_dpdk_config_req *conf_req = NULL;
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (sock->tx_ring) {
+ /* Remove from tx_list */
+ LIST_REMOVE(sock, tx_entry);
+ /* Check for entry in ARP table */
+ struct uhd_dpdk_arp_entry *entry = NULL;
+ rte_hash_lookup_data(port->arp_table, &pdata->dst_ipv4_addr, (void **) &entry);
+ if (entry) {
+ LIST_FOREACH(conf_req, &entry->pending_list, entry) {
+ if (conf_req->sock == sock) {
+ LIST_REMOVE(conf_req, entry);
+ break;
+ }
+ }
+ }
+
+ /* Add outstanding buffers back to TX queue's freebufs */
+ struct rte_mbuf *freebufs[UHD_DPDK_TXQ_SIZE];
+ int status = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, freebufs, sock->tx_buf_count);
+ if (status) {
+ RTE_LOG(ERR, USER1, "%d %s: Could not restore %u TX buffers in bulk!\n", status, __func__, sock->tx_buf_count);
+ }
+
+ unsigned int enqd = rte_ring_enqueue_bulk(sock->rx_ring, (void **) freebufs, sock->tx_buf_count, NULL);
+ if (enqd != (unsigned int) sock->tx_buf_count) {
+ RTE_LOG(ERR, USER1, "%s: Could not enqueue TX buffers!\n", __func__);
+ return status;
+ }
+ } else if (sock->rx_ring) {
+ struct uhd_dpdk_ipv4_5tuple ht_key = {
+ .sock_type = UHD_DPDK_SOCK_UDP,
+ .src_ip = 0,
+ .src_port = 0,
+ .dst_ip = 0,
+ .dst_port = pdata->dst_port
+ };
+ rte_hash_del_key(port->rx_table, &ht_key);
+ struct rte_mbuf *mbuf = NULL;
+ while (rte_ring_dequeue(sock->rx_ring, (void **) &mbuf) == 0) {
+ rte_pktmbuf_free(mbuf);
+ }
+ rte_ring_free(sock->rx_ring);
+ }
+
+ _uhd_dpdk_config_req_compl(req, 0);
+ return 0;
+}
+
+/* Configure a socket for UDP
+ * TODO: Make sure EVERYTHING is configured in socket
+ * FIXME: Make all of this part of the user thread, except hash table interaction
+ * and list handling
+ */
+void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
+ struct uhd_dpdk_sockarg_udp *arg)
+{
+ if (!req)
+ return;
+
+ if (!arg) {
+ req->retval = -EINVAL;
+ return;
+ }
+
+ struct uhd_dpdk_socket *sock = req->sock;
+ pid_t tid = syscall(__NR_gettid);
+ sock->tid = tid;
+
+ /* Create private data */
+ struct uhd_dpdk_udp_priv *data = (struct uhd_dpdk_udp_priv *) rte_zmalloc(NULL, sizeof(*data), 0);
+ if (!data) {
+ req->retval = -ENOMEM;
+ return;
+ }
+ sock->priv = data;
+
+ data->dst_ipv4_addr = arg->dst_addr;
+ if (arg->is_tx) {
+ data->src_port = arg->local_port;
+ data->dst_port = arg->remote_port;
+ sock->tx_ring = (struct rte_ring *) sock;
+ } else {
+ data->src_port = arg->remote_port;
+ data->dst_port = arg->local_port;
+ sock->rx_ring = (struct rte_ring *) sock;
+ }
+
+ /* TODO: Add support for I/O thread calling (skip locking and sleep) */
+ /* Add to port's config queue */
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(data);
+ req->retval = -ENOSPC;
+ return;
+ }
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += 1;
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+
+ if (req->retval)
+ rte_free(data);
+}
+
+void uhd_dpdk_udp_close(struct uhd_dpdk_config_req *req)
+{
+ if (!req)
+ return;
+
+ pthread_mutex_lock(&req->mutex);
+ if (rte_ring_enqueue(req->sock->port->parent->sock_req_ring, req)) {
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(req->sock->priv);
+ req->retval = -ENOSPC;
+ return;
+ }
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_sec += 1;
+ pthread_cond_timedwait(&req->cond, &req->mutex, &timeout);
+ pthread_mutex_unlock(&req->mutex);
+ rte_free(req->sock->priv);
+}
+
+/*
+ * Note: I/O thread will fill in destination MAC address (doesn't happen here)
+ */
+static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port,
+ struct rte_mbuf *mbuf,
+ uint32_t dst_ipv4_addr,
+ uint8_t proto_id,
+ uint32_t payload_len)
+{
+ struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+
+ ether_addr_copy(&port->mac_addr, &eth_hdr->s_addr);
+ eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+
+ ip_hdr->version_ihl = 0x40 | 5;
+ ip_hdr->type_of_service = 0;
+ ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len);
+ ip_hdr->packet_id = 0;
+ ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG);
+ ip_hdr->time_to_live = 64;
+ ip_hdr->next_proto_id = proto_id;
+ ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */
+ ip_hdr->src_addr = port->ipv4_addr;
+ ip_hdr->dst_addr = dst_ipv4_addr;
+
+ mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+ mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+}
+
+int uhd_dpdk_udp_prep(struct uhd_dpdk_socket *sock,
+ struct rte_mbuf *mbuf)
+{
+ struct ether_hdr *eth_hdr;
+ struct ipv4_hdr *ip_hdr;
+ struct udp_hdr *tx_hdr;
+ struct uhd_dpdk_port *port = sock->port;
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+
+ if (unlikely(mbuf == NULL || pdata == NULL || port == NULL))
+ return -EINVAL;
+
+ uint32_t udp_data_len = mbuf->data_len;
+ uhd_dpdk_ipv4_prep(port,
+ mbuf,
+ pdata->dst_ipv4_addr,
+ 0x11,
+ 8 + udp_data_len);
+
+ eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
+ ip_hdr = (struct ipv4_hdr *) &eth_hdr[1];
+ tx_hdr = (struct udp_hdr *) &ip_hdr[1];
+
+ tx_hdr->src_port = pdata->src_port;
+ tx_hdr->dst_port = pdata->dst_port;
+ tx_hdr->dgram_len = rte_cpu_to_be_16(8 + udp_data_len);
+ tx_hdr->dgram_cksum = 0;
+
+ return 0;
+}
+
+int uhd_dpdk_udp_get_info(struct uhd_dpdk_socket *sock,
+ struct uhd_dpdk_sockarg_udp *sockarg)
+{
+ if (unlikely(sock == NULL || sockarg == NULL))
+ return -EINVAL;
+ if (sock->sock_type != UHD_DPDK_SOCK_UDP)
+ return -EINVAL;
+
+ struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) sock->priv;
+ if (sock->tx_ring) {
+ sockarg->is_tx = true;
+ sockarg->local_port = pdata->src_port;
+ sockarg->remote_port = pdata->dst_port;
+ sockarg->dst_addr = pdata->dst_ipv4_addr;
+ } else {
+ sockarg->is_tx = false;
+ sockarg->local_port = pdata->dst_port;
+ sockarg->remote_port = pdata->src_port;
+ sockarg->dst_addr = 0;
+ }
+ return 0;
+}
+