From ad2720d7188aece35e8aa4c65118a33b8b9ae690 Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Tue, 25 Sep 2018 14:49:45 -0700 Subject: mpmd,transport,prefs: Add xport_mgr for dpdk_zero_copy Add configuration sections to the UHD config file for NIC entries. Keys are based on MAC addresses, and the entries beneath the section describe which CPU and I/O thread to use for the NIC and its IPv4 address. Make ring sizes configurable for uhd-dpdk. Ring size is now an argument for packet buffers. Note that the maximum number of available buffers is still determined at init! Add ability to receive broadcasts to uhd-dpdk. This is controllable by a boolean in the sockarg during socket creation. dpdk_zero_copy will filter broadcast packets out. Add dpdk_simple transport (to mirror udp_simple). This transport allows receiving from broadcast addresses, but it only permits one outstanding buffer at a time. Fix IP checksum handling in UHD-DPDK. TX checksums were not being calculated in the NIC, and in RX, the check for IP checksums allowed values of zero (reported as none). Now packets with bad IP checksums will be dropped. --- host/lib/transport/uhd-dpdk/uhd_dpdk.c | 49 ++++++++++++++---- host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h | 13 ++--- host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 19 ++++--- host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h | 3 +- host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 74 ++++++++++++++++++--------- host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h | 1 + 6 files changed, 108 insertions(+), 51 deletions(-) (limited to 'host/lib/transport/uhd-dpdk') diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c index 1be6b2335..09897eb11 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -34,6 +34,20 @@ int uhd_dpdk_port_count(void) return ctx->num_ports; } +int uhd_dpdk_port_link_status(unsigned int portid) +{ + if (!ctx) + return -ENODEV; + + struct uhd_dpdk_port *p = find_port(portid); + if (p) { + struct rte_eth_link link; + rte_eth_link_get_nowait(p->id, &link); + return link.link_status; + } + return -ENODEV; +} + struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) { struct eth_addr retval; @@ -43,6 +57,7 @@ struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) if (p) { memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN); } + return retval; } @@ -211,19 +226,12 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lco return 0; } - -int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, - int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, - int mtu) +int uhd_dpdk_init(int argc, const char **argv) { /* Init context only once */ if (ctx) return 1; - if ((num_ports == 0) || (port_thread_mapping == NULL)) { - return -EINVAL; - } - /* Grabs arguments intended for DPDK's EAL */ int ret = rte_eal_init(argc, (char **) argv); if (ret < 0) @@ -241,8 +249,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, ctx->num_ports = rte_eth_dev_count(); if (ctx->num_ports < 1) rte_exit(EXIT_FAILURE, "Error: Found no ports\n"); - if (ctx->num_ports < num_ports) - rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); /* Get memory for thread and port data structures */ ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0); @@ -252,6 +258,28 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, if (!ctx->ports) rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n"); + for (size_t i = 0; i < ctx->num_ports; i++) { + struct uhd_dpdk_port *port = &ctx->ports[i]; + port->id = i; + rte_eth_macaddr_get(port->id, &port->mac_addr); + } + + return 0; +} + +int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping, + int num_mbufs, int mbuf_cache_size, int mtu) +{ + if (!ctx) + return -EIO; + + if ((num_ports == 0) || (port_thread_mapping == NULL)) { + return -EINVAL; + } + + if (ctx->num_ports < num_ports) + rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); + /* Initialize the thread data structures */ for (int i = rte_get_next_lcore(-1, 1, 0); (i < RTE_MAX_LCORE); @@ -306,7 +334,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); struct uhd_dpdk_port *port = &ctx->ports[i]; - port->id = i; port->parent = &ctx->threads[thread_id]; ctx->threads[thread_id].num_ports++; LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry); diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h index d497e5d5a..6f43ae1cf 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h @@ -24,7 +24,7 @@ #define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT #define UHD_DPDK_TXQ_SIZE 64 #define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1) -#define UHD_DPDK_RXQ_SIZE 64 +#define UHD_DPDK_RXQ_SIZE 128 #define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1) struct uhd_dpdk_port; @@ -256,13 +256,10 @@ static inline struct uhd_dpdk_port * find_port(unsigned int portid) if (!ctx) return NULL; - for (unsigned int i = 0; i < ctx->num_threads; i++) { - struct uhd_dpdk_thread *t = &ctx->threads[i]; - struct uhd_dpdk_port *p; - LIST_FOREACH(p, &t->port_list, port_entry) { - if (p->id == portid) { - return p; - } + for (unsigned int i = 0; i < ctx->num_ports; i++) { + struct uhd_dpdk_port *p = &ctx->ports[i]; + if (p->id == portid) { + return p; } } return NULL; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c index f603f1f8f..61ed836df 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -43,7 +43,6 @@ int _uhd_dpdk_arp_reply(struct uhd_dpdk_port *port, struct arp_hdr *arp_req) mbuf->pkt_len = 42; mbuf->data_len = 42; - mbuf->ol_flags = PKT_TX_IP_CKSUM; if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); @@ -125,7 +124,6 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) mbuf->pkt_len = 42; mbuf->data_len = 42; - mbuf->ol_flags = PKT_TX_IP_CKSUM; if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); @@ -135,7 +133,8 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) return 0; } -int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt) +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct udp_hdr *pkt, bool bcast) { int status = 0; struct uhd_dpdk_ipv4_5tuple ht_key = { @@ -155,6 +154,10 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str } struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv; + if (bcast && pdata->filter_bcast) { + // Filter broadcast packets if not listening + goto udp_rx_drop; + } status = rte_ring_enqueue(entry->sock->rx_ring, mbuf); if (entry->waiter) { _uhd_dpdk_waiter_wake(entry->waiter, port->parent); @@ -172,14 +175,16 @@ udp_rx_drop: return status; } -int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt) +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct ipv4_hdr *pkt) { - if (pkt->dst_addr != port->ipv4_addr) { + bool bcast = is_broadcast(port, pkt->dst_addr); + if (pkt->dst_addr != port->ipv4_addr && !bcast) { rte_pktmbuf_free(mbuf); return -ENODEV; } if (pkt->next_proto_id == 0x11) { - return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]); + return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1], bcast); } rte_pktmbuf_free(mbuf); return -EINVAL; @@ -417,7 +422,7 @@ static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port) rte_pktmbuf_free(bufs[buf]); break; case ETHER_TYPE_IPv4: - if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + if (ol_flags & PKT_RX_IP_CKSUM_BAD) { RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); } else { _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h index b0d3e42cd..f94a678ba 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h @@ -20,7 +20,8 @@ static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_ad int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame); -int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt); +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct udp_hdr *pkt, bool bcast); int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt); int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port, struct uhd_dpdk_socket *sock, diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c index 6ea77b930..4fc375b77 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -19,7 +19,8 @@ * I/O thread ONLY */ -static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue) +static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, + struct uhd_dpdk_tx_queue **queue, size_t num_bufs) { *queue = NULL; struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); @@ -31,25 +32,25 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk LIST_INIT(&q->tx_list); char name[32]; - snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid); + snprintf(name, sizeof(name), "tx_q%u.%0lx", port->id, (unsigned long) q); q->queue = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); - snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid); + snprintf(name, sizeof(name), "free_q%u.%0lx", port->id, (unsigned long) q); q->freebufs = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); /* Set up retry queue */ - snprintf(name, sizeof(name), "retry_queue_%u", port->id); + snprintf(name, sizeof(name), "redo_q%u.%0lx", port->id, (unsigned long) q); q->retry_queue = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); @@ -65,24 +66,39 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk rte_free(q); return -ENOMEM; } - struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; - unsigned int num_bufs = rte_ring_free_count(q->freebufs); - int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); - if (buf_stat) { - rte_ring_free(q->freebufs); - rte_ring_free(q->queue); - rte_ring_free(q->retry_queue); - rte_free(q); - RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); - return -ENOENT; - } - unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); - if (enqd != num_bufs) { - RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); - } + + do { + struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; + num_bufs = rte_ring_free_count(q->freebufs); + if (num_bufs > 0) { + num_bufs = num_bufs > UHD_DPDK_TXQ_SIZE ? UHD_DPDK_TXQ_SIZE : num_bufs; + int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); + if (buf_stat) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); + goto unwind_txq; + } + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); + goto unwind_txq; + } + } + } while (num_bufs > 0); LIST_INSERT_HEAD(&port->txq_list, q, entry); *queue = q; return 0; + +unwind_txq: + while (!rte_ring_empty(q->freebufs)) { + struct rte_mbuf *buf; + if (rte_ring_dequeue(q->freebufs, (void **) &buf) == 0) + rte_free(buf); + } + rte_ring_free(q->freebufs); + rte_ring_free(q->queue); + rte_ring_free(q->retry_queue); + rte_free(q); + return -ENOENT; } /* Finish setting up UDP socket (unless ARP needs to be done) @@ -134,11 +150,14 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) return -EADDRINUSE; } + size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_RX_BURST_SIZE + 1)) ? + UHD_DPDK_RX_BURST_SIZE + 1 : pdata->xferd_pkts; + pdata->xferd_pkts = 0; char name[32]; snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port)); sock->rx_ring = rte_ring_create( name, - UHD_DPDK_RXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); @@ -172,6 +191,9 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) /* Are we doing TX? */ if (sock->tx_queue) { + size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_TX_BURST_SIZE + 1)) ? + UHD_DPDK_TX_BURST_SIZE + 1 : pdata->xferd_pkts; + pdata->xferd_pkts = 0; sock->tx_queue = NULL; struct uhd_dpdk_tx_queue *q = NULL; // FIXME Not sharing txq across all thread's sockets for now @@ -184,7 +206,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) // } //} if (!sock->tx_queue) { - retval = _alloc_txq(port, sock->tid, &q); + retval = _alloc_txq(port, sock->tid, &q, num_bufs); if (retval) { _uhd_dpdk_config_req_compl(req, retval); return retval; @@ -385,10 +407,13 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, data->src_port = arg->local_port; data->dst_port = arg->remote_port; sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock; + data->xferd_pkts = arg->num_bufs; } else { data->src_port = arg->remote_port; data->dst_port = arg->local_port; sock->rx_ring = (struct rte_ring *) sock; + data->xferd_pkts = arg->num_bufs; + data->filter_bcast = arg->filter_bcast; } /* TODO: Add support for I/O thread calling (skip locking and sleep) */ @@ -433,6 +458,7 @@ static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port, ip_hdr->time_to_live = 64; ip_hdr->next_proto_id = proto_id; ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */ + mbuf->ol_flags |= PKT_TX_IP_CKSUM; ip_hdr->src_addr = port->ipv4_addr; ip_hdr->dst_addr = dst_ipv4_addr; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h index 39fcb8597..d7ca5609b 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h @@ -15,6 +15,7 @@ struct uhd_dpdk_udp_priv { uint32_t dst_ipv4_addr; size_t dropped_pkts; size_t xferd_pkts; + bool filter_bcast; /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */ //struct uhd_dpdk_arp_entry *arp_entry; }; -- cgit v1.2.3