From 4e38eef817813c1bbd8a9cf972e4cf0134d24308 Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Sun, 1 Dec 2019 21:58:13 -0800 Subject: dpdk: Add new DPDK stack to integrate with I/O services docs: Update DPDK docs with new parameters: Parameter names have had their hyphens changed to underscores, and the I/O CPU argument is now named after the lcores and reflects the naming used by DPDK. transport: Add new udp_dpdk_link, based atop the new APIs: This link is tightly coupled with the DPDK I/O service. The link class carries all the address information to communicate with the other host, and it can send packets directly through the DPDK NIC ports. However, for receiving packets, the I/O service must pull the packets from the DMA queue and attach them to the appropriate link object. The link object merely formats the frame_buff object underneath, which is embedded in the rte_mbuf container. For get_recv_buff, the link will pull buffers only from its internal queue (the one filled by the I/O service). transport: Add DPDK-specific I/O service: The I/O service is split into two parts, the user threads and the I/O worker threads. The user threads submit requests through various appropriate queues, and the I/O threads perform all the I/O on their behalf. This includes routing UDP packets to the correct receiver and getting the MAC address of a destination (by performing the ARP request and handling the ARP replies). The DPDK context stores I/O services. The context spawns all I/O services on init(), and I/O services can be fetched from the dpdk_ctx object by using a port ID. I/O service clients: The clients have two lockless ring buffers. One is to get a buffer from the I/O service; the other is to release a buffer back to the I/O service. Threads sleeping on buffer I/O are kept in a separate list from the service queue and are processed in the course of doing RX or TX. The list nodes are embedded in the dpdk_io_if, and the head of the list is on the dpdk_io_service. The I/O service will transfer the embedded wait_req to the list if it cannot acquire the mutex to complete the condition for waking. Co-authored-by: Martin Braun Co-authored-by: Ciro Nishiguchi Co-authored-by: Brent Stapleton --- host/lib/include/uhdlib/transport/dpdk/arp.hpp | 29 +++ host/lib/include/uhdlib/transport/dpdk/common.hpp | 202 ++++++++++----- .../uhdlib/transport/dpdk/service_queue.hpp | 30 ++- host/lib/include/uhdlib/transport/dpdk/udp.hpp | 115 +++++++++ .../include/uhdlib/transport/dpdk_io_service.hpp | 246 ++++++++++++++++++ .../uhdlib/transport/dpdk_io_service_client.hpp | 285 +++++++++++++++++++++ host/lib/include/uhdlib/transport/dpdk_simple.hpp | 33 +-- host/lib/include/uhdlib/transport/link_base.hpp | 4 +- .../lib/include/uhdlib/transport/udp_dpdk_link.hpp | 267 +++++++++++++++++++ 9 files changed, 1121 insertions(+), 90 deletions(-) create mode 100644 host/lib/include/uhdlib/transport/dpdk/arp.hpp create mode 100644 host/lib/include/uhdlib/transport/dpdk/udp.hpp create mode 100644 host/lib/include/uhdlib/transport/dpdk_io_service.hpp create mode 100644 host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp create mode 100644 host/lib/include/uhdlib/transport/udp_dpdk_link.hpp (limited to 'host/lib/include/uhdlib/transport') diff --git a/host/lib/include/uhdlib/transport/dpdk/arp.hpp b/host/lib/include/uhdlib/transport/dpdk/arp.hpp new file mode 100644 index 000000000..e71119bb3 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/arp.hpp @@ -0,0 +1,29 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ + +#include +#include +#include + +namespace uhd { namespace transport { namespace dpdk { + +struct arp_request +{ + struct ether_addr tha; + port_id_t port; + ipv4_addr tpa; +}; + +struct arp_entry +{ + struct ether_addr mac_addr; + std::vector reqs; +}; + +}}} /* namespace uhd::transport::dpdk */ +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk/common.hpp b/host/lib/include/uhdlib/transport/dpdk/common.hpp index 1f4466a0f..ac3526e49 100644 --- a/host/lib/include/uhdlib/transport/dpdk/common.hpp +++ b/host/lib/include/uhdlib/transport/dpdk/common.hpp @@ -1,44 +1,130 @@ // -// Copyright 2019 Ettus Research, a National Instruments brand +// Copyright 2019 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // + #ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ #define _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ #include +#include #include -#include -#include +#include #include #include #include #include #include +#include #include #include #include #include #include +#include #include -/* NOTE: There are changes to rte_eth_addr in 19.x */ +/* NOTE: There are changes to all the network standard fields in 19.x */ + + +namespace uhd { namespace transport { + +class dpdk_io_service; + +namespace dpdk { -namespace uhd { namespace transport { namespace dpdk { +struct arp_entry; using queue_id_t = uint16_t; using port_id_t = uint16_t; using ipv4_addr = uint32_t; +class dpdk_adapter_info : public adapter_info +{ +public: + dpdk_adapter_info(port_id_t port) : _port(port) {} + ~dpdk_adapter_info() {} + + std::string to_string() + { + return std::string("DPDK:") + std::to_string(_port); + } + + bool operator==(const dpdk_adapter_info& rhs) const + { + return (_port == rhs._port); + } + +private: + // Port ID + port_id_t _port; +}; + + +/*! + * Packet/Frame buffer class for DPDK + * + * This class is intended to be placed in the private area of the rte_mbuf, and + * its memory is part of the rte_mbuf, so its life is tied to the underlying + * buffer (or more precisely, the encapsulating one). + */ +class dpdk_frame_buff : public frame_buff +{ +public: + dpdk_frame_buff(struct rte_mbuf* mbuf) : _mbuf(mbuf) + { + _data = rte_pktmbuf_mtod(mbuf, void*); + _packet_size = 0; + } + + ~dpdk_frame_buff() = default; + + /*! + * Simple getter for the underlying rte_mbuf. + * The rte_mbuf may need further modification before sending packets, + * like adjusting the IP and UDP lengths. + */ + inline struct rte_mbuf* get_pktmbuf() + { + return _mbuf; + } + + /*! + * Move the data pointer by the indicated size, to some desired + * encapsulated frame. + * + * \param hdr_size Size (in bytes) of the headers to skip. Can be negative + * to pull the header back. + */ + inline void header_jump(ssize_t hdr_size) + { + _data = (void*)((uint8_t*)_data + hdr_size); + } + + //! Embedded list node's next ptr + dpdk_frame_buff* next = nullptr; + //! Embedded list node's prev ptr + dpdk_frame_buff* prev = nullptr; + +private: + struct rte_mbuf* _mbuf; +}; + + +/*! + * The size (in bytes) of the private area reserved within the rte_mbuf. + * This portion of the rte_mbuf is used for the embedded dpdk_frame_buff data + * structure. + */ +constexpr size_t DPDK_MBUF_PRIV_SIZE = + RTE_ALIGN(sizeof(struct dpdk_frame_buff), RTE_MBUF_PRIV_ALIGN); + /*! * Class representing a DPDK NIC port * * The dpdk_port object possesses all the data needed to send and receive - * packets between this port and a remote host. A logical link should specify - * which packets are destined for it and allocate a DMA queue with the - * dpdk_port::alloc_queue() function. A logical link should not, however, - * specify ARP packets for its set of received packets. That functionality is - * reserved for the special queue 0. + * packets between this port and a remote host. * * The logical link can then get the packet buffer pools associated with this * NIC port and use them to send and receive packets. @@ -55,7 +141,7 @@ public: * \param port The port ID * \param mtu The intended MTU for the port * \param num_queues Number of DMA queues to reserve for this port - * \param num_mbufs The number of packet buffers per queue + * \param num_desc The number of descriptors per DMA queue * \param rx_pktbuf_pool A pointer to the port's RX packet buffer pool * \param tx_pktbuf_pool A pointer to the port's TX packet buffer pool * \param ipv4_address The IPv4 network address (w/ netmask) @@ -64,7 +150,7 @@ public: static dpdk_port::uptr make(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address); @@ -72,11 +158,13 @@ public: dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address); + ~dpdk_port(); + /*! Getter for this port's ID * \return this port's ID */ @@ -85,6 +173,19 @@ public: return _port; } + inline dpdk_adapter_info get_adapter_info() const + { + return dpdk_adapter_info(_port); + } + + /*! Getter for this port's MAC address + * \return this port's MAC address + */ + inline ether_addr get_mac_addr() const + { + return _mac_addr; + } + /*! Getter for this port's MTU * \return this port's MTU */ @@ -141,68 +242,45 @@ public: * \param dst_ipv4_addr The destination IPv4 address (in network order) * \return whether the destination address matches this port's broadcast address */ - inline bool dst_is_broadcast(const uint32_t dst_ipv4_addr) const + inline bool dst_is_broadcast(const ipv4_addr dst_ipv4_addr) const { uint32_t network = _netmask | ((~_netmask) & dst_ipv4_addr); return (network == 0xffffffff); } - /*! Allocate a DMA queue (TX/RX pair) and use the specified flow pattern - * to route packets to the RX queue. - * - * \pattern recv_pattern The flow pattern to use for directing traffic to - * the allocated RX queue. - * \return The queue ID for the allocated queue - * \throw uhd::runtime_error when there are no free queues - */ - queue_id_t alloc_queue(struct rte_flow_pattern recv_pattern[]); - - /*! Free a previously allocated queue and tear down the associated flow rule - * \param queue The ID of the queue to free - * \throw std::out_of_range when the queue ID is not currently allocated - */ - void free_queue(queue_id_t queue); - /*! - * Process ARP request/reply + * Allocate a UDP port and return it in network order + * + * \param udp_port UDP port to attempt to allocate. Use 0 for no preference. + * \return 0 for failure, else the allocated UDP port in network order. */ - // int process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr *arp_frame); + uint16_t alloc_udp_port(uint16_t udp_port); private: + friend uhd::transport::dpdk_io_service; + /*! * Construct and transmit an ARP reply (for the given ARP request) */ - int _arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req); + int _arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req); port_id_t _port; size_t _mtu; + size_t _num_queues; struct rte_mempool* _rx_pktbuf_pool; struct rte_mempool* _tx_pktbuf_pool; struct ether_addr _mac_addr; ipv4_addr _ipv4; ipv4_addr _netmask; - size_t _num_queues; - std::vector _free_queues; - std::unordered_map _flow_rules; - /* Need ARP table - * To implement ARP service, maybe create ARP xport - * Then need dpdk_udp_link and dpdk_raw_link - * - * ...Or just do it inline with dpdk_ctx - * - * And link can just save the result (do it during constructor) - * - * - * But what about the service that _responds_ to ARP requests?! - * - * Maybe have to connect a DPDK link in stages: - * First, create the ARP service and attach it to the dpdk_ctx - * dpdk_ctx must own the links...? - * Or! Always burn a DMA engine for ARP - * - * Maybe have a shared_ptr to an ARP service here? - */ + + // Structures protected by mutex std::mutex _mutex; + std::set _udp_ports; + uint16_t _next_udp_port = 0xffff; + + // Structures protected by spin lock + rte_spinlock_t _spinlock = RTE_SPINLOCK_INITIALIZER; + std::unordered_map _arp_table; }; @@ -267,17 +345,21 @@ public: int get_port_link_status(port_id_t portid) const; /*! - * Get port ID for routing packet destined for given address + * Get port for routing packet destined for given address * \param addr Destination address - * \return port ID from routing table + * \return pointer to the port from routing table */ - int get_route(const std::string& addr) const; + dpdk_port* get_route(const std::string& addr) const; /*! * \return whether init() has been called */ bool is_init_done(void) const; + /*! Return a reference to an IO service given a port ID + */ + std::shared_ptr get_io_service(const size_t port_id); + private: /*! Convert the args to DPDK's EAL args and Initialize the EAL * @@ -312,8 +394,12 @@ private: std::unordered_map _ports; std::vector _rx_pktbuf_pools; std::vector _tx_pktbuf_pools; + // Store all the I/O services, and also store the corresponding port ID + std::map, std::vector> + _io_srv_portid_map; }; -}}} // namespace uhd::transport::dpdk +} // namespace dpdk +}} // namespace uhd::transport #endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp index 7c9917079..c95786864 100644 --- a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp +++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp @@ -28,6 +28,14 @@ enum wait_type { WAIT_FLOW_OPEN, //! Wake once the flow/socket is destroyed WAIT_FLOW_CLOSE, + //! Wake once the new transport is connected + WAIT_XPORT_CONNECT, + //! Wake once the transport is disconnected + WAIT_XPORT_DISCONNECT, + //! Wake when MAC address found for IP address + WAIT_ARP, + //! Wake once the I/O worker terminates + WAIT_LCORE_TERM, //! Number of possible reasons for waiting WAIT_TYPE_COUNT }; @@ -50,6 +58,8 @@ struct wait_req std::mutex mutex; //! Whether the request was completed bool complete; + //! The status or error code associated with the request + int retval; //! An atomic reference counter for managing this request object's memory rte_atomic32_t refcnt; }; @@ -110,7 +120,7 @@ class service_queue public: /*! * Create a service queue - * \param depth Must be a power of 2 + * \param depth Must be a power of 2. Actual size is less. * \param lcore_id The DPDK lcore_id associated with this service queue */ service_queue(size_t depth, unsigned int lcore_id) @@ -227,6 +237,24 @@ public: return stat; } + /*! + * Get the size of the service queue + * \return size of the service queue + */ + inline size_t get_size() + { + return rte_ring_get_size(_waiter_ring); + } + + /*! + * Get the capacity of the service queue + * \return capacity of the service queue + */ + inline size_t get_capacity() + { + return rte_ring_get_capacity(_waiter_ring); + } + private: //! Multi-producer, single-consumer ring for requests struct rte_ring* _waiter_ring; diff --git a/host/lib/include/uhdlib/transport/dpdk/udp.hpp b/host/lib/include/uhdlib/transport/dpdk/udp.hpp new file mode 100644 index 000000000..65e561315 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/udp.hpp @@ -0,0 +1,115 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ + +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { namespace dpdk { + +constexpr size_t HDR_SIZE_UDP_IPV4 = + (sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + +/*! + * An enumerated type representing the type of flow for an IPv4 client + * Currently, only UDP is supported (FLOW_TYPE_UDP) + */ +enum flow_type { + FLOW_TYPE_UDP, + FLOW_TYPE_COUNT, +}; + +/*! + * A tuple for IPv4 flows that can be used for hashing + */ +struct ipv4_5tuple +{ + enum flow_type flow_type; + ipv4_addr src_ip; + ipv4_addr dst_ip; + uint16_t src_port; + uint16_t dst_port; +}; + +inline void fill_ipv4_hdr(struct rte_mbuf* mbuf, + const dpdk_port* port, + uint32_t dst_ipv4_addr, + uint8_t proto_id, + uint32_t payload_len) +{ + struct ether_hdr* eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)ð_hdr[1]; + + ip_hdr->version_ihl = 0x40 | 5; + ip_hdr->type_of_service = 0; + ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len); + ip_hdr->packet_id = 0; + ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG); + ip_hdr->time_to_live = 64; + ip_hdr->next_proto_id = proto_id; + ip_hdr->hdr_checksum = 0; // Require HW offload + ip_hdr->src_addr = port->get_ipv4(); + ip_hdr->dst_addr = dst_ipv4_addr; + + mbuf->ol_flags = PKT_TX_IP_CKSUM | PKT_TX_IPV4; + mbuf->l2_len = sizeof(struct ether_hdr); + mbuf->l3_len = sizeof(struct ipv4_hdr); + mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; + mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +} + +/* All values except payload length must be in network order */ +inline void fill_udp_hdr(struct rte_mbuf* mbuf, + const dpdk_port* port, + uint32_t dst_ipv4_addr, + uint16_t src_port, + uint16_t dst_port, + uint32_t payload_len) +{ + struct ether_hdr* eth_hdr; + struct ipv4_hdr* ip_hdr; + struct udp_hdr* tx_hdr; + + fill_ipv4_hdr( + mbuf, port, dst_ipv4_addr, IPPROTO_UDP, sizeof(struct udp_hdr) + payload_len); + + eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + ip_hdr = (struct ipv4_hdr*)ð_hdr[1]; + tx_hdr = (struct udp_hdr*)&ip_hdr[1]; + + tx_hdr->src_port = src_port; + tx_hdr->dst_port = dst_port; + tx_hdr->dgram_len = rte_cpu_to_be_16(8 + payload_len); + tx_hdr->dgram_cksum = 0; + mbuf->l4_len = sizeof(struct udp_hdr); +} + +//! Return an IPv4 address (numeric, in network order) into a string +inline std::string ipv4_num_to_str(const uint32_t ip_addr) +{ + char addr_str[INET_ADDRSTRLEN]; + struct in_addr ipv4_addr; + ipv4_addr.s_addr = ip_addr; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); +} + +inline std::string eth_addr_to_string(const struct ether_addr mac_addr) +{ + auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); + mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] + % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] + % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; + return mac_stream.str(); +} + +}}} /* namespace uhd::transport::dpdk */ +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp new file mode 100644 index 000000000..8e1fb29d0 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp @@ -0,0 +1,246 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +class dpdk_send_io; +class dpdk_recv_io; +struct dpdk_io_if; + +class dpdk_io_service : public virtual io_service, + public std::enable_shared_from_this +{ +public: + using sptr = std::shared_ptr; + + static sptr make( + unsigned int lcore_id, std::vector ports, size_t servq_depth); + + ~dpdk_io_service(); + + // Add entry to RX flow table + // This yields a link, which is then used for attaching to a buffer + // We yank from the link immediately following, then process at the transport level + // (so two tables here, one for transports, one for links) + void attach_recv_link(recv_link_if::sptr link); + + // Create object to hold set of queues, to go in TX table + void attach_send_link(send_link_if::sptr link); + + void detach_recv_link(recv_link_if::sptr link); + + void detach_send_link(send_link_if::sptr link); + + recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb); + + send_io_if::sptr make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); + + +private: + friend class dpdk_recv_io; + friend class dpdk_send_io; + + dpdk_io_service( + unsigned int lcore_id, std::vector ports, size_t servq_depth); + dpdk_io_service(const dpdk_io_service&) = delete; + + /*! + * I/O worker function to be passed to the DPDK lcore + * + * The argument must be a pointer to *this* dpdk_io_service + * + * \param arg a pointer to this dpdk_io_service + * \return 0 for normal termination, else nonzero + */ + static int _io_worker(void* arg); + + /*! + * Helper function for I/O thread to process requests on its service queue + */ + int _service_requests(); + + /*! + * Helper function for I/O thread to service a WAIT_FLOW_OPEN request + * + * \param req The requester's wait_req object + */ + void _service_flow_open(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to service a WAIT_FLOW_CLOSE request + * + * \param req The requester's wait_req object + */ + void _service_flow_close(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to service a WAIT_XPORT_CONNECT request + * + * \param req The requester's wait_req object + */ + void _service_xport_connect(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to service a WAIT_XPORT_DISCONNECT request + * + * \param req The requester's wait_req object + */ + void _service_xport_disconnect(dpdk::wait_req* req); + + /*! + * Get Ethernet MAC address for the given IPv4 address, and wake the + * requester when finished. + * This may only be called by an I/O service, on behalf of a requester's + * WAIT_ARP request. + * + * \param req The requester's wait_req object + * \return 0 if address was written, -EAGAIN if request was queued for + * later completion, -ENOMEM if ran out of memory to complete + * request + */ + int _service_arp_request(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to do a burst of packet retrieval and + * processing on an RX queue + * + * \param port the DPDK NIC port used for RX + * \param queue the DMA queue on the port to recv from + */ + int _rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue); + + /*! + * Helper function for I/O thread to do a burst of packet transmission on a + * TX queue + * + * \param port the DPDK NIC port used for TX + * \return number of buffers transmitted + */ + int _tx_burst(dpdk::dpdk_port* port); + + /*! + * Helper function for I/O thread to release a burst of buffers from an RX + * release queue + * + * \param port the DPDK NIC port used for RX + * \return number of buffers released + */ + int _rx_release(dpdk::dpdk_port* port); + + /*! + * Helper function for I/O thread to do send an ARP request + * + * \param port the DPDK NIC port to send the ARP request through + * \param queue the DMA queue on the port to send to + * \param ip the IPv4 address for which the caller is seeking a MAC address + */ + int _send_arp_request( + dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip); + + /*! + * Helper function for I/O thread to process an ARP request/reply + * + * \param port the DPDK NIC port to send any ARP replies from + * \param queue the DMA queue on the port to send ARP replies to + * \param arp_frame a pointer to the ARP frame + */ + int _process_arp( + dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame); + + /*! + * Helper function for I/O thread to process an IPv4 packet + * + * \param port the DPDK NIC port to send any ARP replies from + * \param mbuf a pointer to the packet buffer container + * \param pkt a pointer to the IPv4 header of the packet + */ + int _process_ipv4(dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt); + + /*! + * Helper function for I/O thread to process an IPv4 packet + * + * \param port the DPDK NIC port to send any ARP replies from + * \param mbuf a pointer to the packet buffer container + * \param pkt a pointer to the UDP header of the packet + * \param bcast whether this packet was destined for the port's broadcast + * IPv4 address + */ + int _process_udp( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool bcast); + + /*! + * Helper function to get a unique client ID + * + * \return a unique client ID + */ + uint16_t _get_unique_client_id(); + + /*! + * Attempt to wake client + */ + void _wake_client(dpdk_io_if* dpdk_io); + + //! The reference to the DPDK context + std::weak_ptr _ctx; + //! The lcore running this dpdk_io_service's work routine + unsigned int _lcore_id; + //! The NIC ports served by this dpdk_io_service + std::vector _ports; + //! The set of TX queues associated with a given port + std::unordered_map> _tx_queues; + //! The list of recv_io for each port + std::unordered_map> _recv_xport_map; + //! The RX table, which provides lists of dpdk_recv_io for an IPv4 tuple + struct rte_hash* _rx_table; + //! Service queue for clients to make requests + dpdk::service_queue _servq; + //! Retry list for waking clients + dpdk_io_if* _retry_head = NULL; + + //! Mutex to protect below data structures + std::mutex _mutex; + //! The recv links attached to this I/O service (managed client side) + std::list _recv_links; + //! The send links attached to this I/O service (managed client side) + std::list _send_links; + //! Set of IDs for new clients + std::set _client_id_set; + //! Next ID to try + uint16_t _next_client_id; + + static constexpr int MAX_PENDING_SERVICE_REQS = 32; + static constexpr int MAX_FLOWS = 128; + static constexpr int MAX_CLIENTS = 2048; + static constexpr int RX_BURST_SIZE = 16; + static constexpr int TX_BURST_SIZE = 16; +}; + +}} // namespace uhd::transport + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp new file mode 100644 index 000000000..451cc1531 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp @@ -0,0 +1,285 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ + +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +struct dpdk_flow_data +{ + //! The UDP DPDK link + udp_dpdk_link* link; + //! Is it a recv_link_if? Or a send_link_if? + bool is_recv; +}; + +/*! DPDK I/O interface for service requests + * + * This is used to pass around information about the I/O clients. It is what is + * passed into the data portion of a request, for connect and disconnect + * requests. + * + */ +struct dpdk_io_if +{ + dpdk_io_if(bool is_recv, + udp_dpdk_link* link, + dpdk_io_service::sptr io_srv, + recv_callback_t recv_cb) + : is_recv(is_recv), link(link), io_srv(io_srv), recv_cb(recv_cb) + { + } + + bool is_recv; + udp_dpdk_link* link; + dpdk_io_service::sptr io_srv; + recv_callback_t recv_cb; + void* io_client; + //! Embedded list node + dpdk_io_if* next = NULL; + dpdk_io_if* prev = NULL; +}; + +// This must be tied to a link for frame_buffs +// so need map of dpdk_send_io to udp_dpdk_link +// Have queue pair: buffs to send + free buffs +// There used to be a retry queue: Still needed? (Maybe 1 per port?) +class dpdk_send_io : public virtual send_io_if +{ +public: + using sptr = std::shared_ptr; + + dpdk_send_io(dpdk_io_service::sptr io_srv, + udp_dpdk_link* link, + size_t num_send_frames, + send_callback_t send_cb, + size_t num_recv_frames, + recv_callback_t recv_cb, + fc_callback_t fc_cb) + : _dpdk_io_if(false, link, io_srv, recv_cb) + , _servq(io_srv->_servq) + , _send_cb(send_cb) + , _fc_cb(fc_cb) + { + // Get reference to DPDK context, since this owns some DPDK memory + _ctx = dpdk::dpdk_ctx::get(); + _num_send_frames = num_send_frames; + _num_recv_frames = num_recv_frames; + + // Create the free buffer and send queues + // Must be power of 2, and add one since usable ring is size-1 + size_t queue_size = (size_t)exp2(ceil(log2(num_send_frames + 1))); + dpdk::port_id_t nic_port = link->get_port()->get_port_id(); + uint16_t id = io_srv->_get_unique_client_id(); + char name[16]; + snprintf(name, sizeof(name), "tx%hu-%hu", nic_port, id); + _buffer_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + snprintf(name, sizeof(name), "~tx%hu-%hu", nic_port, id); + _send_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + UHD_LOG_TRACE("DPDK::SEND_IO", "dpdk_send_io() " << _buffer_queue->name); + + // Create the wait_request object that gets passed around + _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_TX_BUF, (void*)&_dpdk_io_if); + _waiter->complete = true; + } + + ~dpdk_send_io() + { + UHD_LOG_TRACE("DPDK::SEND_IO", "~dpdk_send_io() " << _buffer_queue->name); + // Deregister with I/O service + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + + // Clean up + wait_req_put(xport_req); + rte_ring_free(_send_queue); + rte_ring_free(_buffer_queue); + wait_req_put(_waiter); + } + + bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/) + { + // For this I/O service, the destination is the queue to the offload + // thread. The queue is always able to accomodate new packets since it + // is sized to fit all the frames reserved from the link. + return true; + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + frame_buff* buff_ptr; + if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) { + if (!timeout_ms) { + return frame_buff::uptr(); + } + // Nothing in the queue. Try waiting if there is a timeout. + auto timeout_point = + std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + std::unique_lock lock(_waiter->mutex); + wait_req_get(_waiter); + _waiter->complete = false; + auto is_complete = [this] { return !rte_ring_empty(_buffer_queue); }; + if (timeout_ms < 0) { + _waiter->cond.wait(lock, is_complete); + } else { + auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete); + if (!status) { + return frame_buff::uptr(); + } + } + if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) { + return frame_buff::uptr(); + } + } + return frame_buff::uptr(buff_ptr); + } + + void release_send_buff(frame_buff::uptr buff) + { + auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); + assert(buff_ptr); + int status = rte_ring_enqueue(_send_queue, buff_ptr); + if (status != 0) { + assert(false); + } + // TODO: Should we retry if it failed? + } + +private: + friend class dpdk_io_service; + + dpdk_io_if _dpdk_io_if; + size_t _num_frames_in_use = 0; + + dpdk::service_queue& _servq; + dpdk::dpdk_ctx::sptr _ctx; + struct rte_ring* _buffer_queue; + struct rte_ring* _send_queue; + dpdk::wait_req* _waiter; + send_callback_t _send_cb; + fc_callback_t _fc_cb; +}; + +class dpdk_recv_io : public virtual recv_io_if +{ +public: + using sptr = std::shared_ptr; + + dpdk_recv_io(dpdk_io_service::sptr io_srv, + udp_dpdk_link* link, + size_t num_recv_frames, + recv_callback_t recv_cb, + size_t num_send_frames, + fc_callback_t fc_cb) + : _dpdk_io_if(true, link, io_srv, recv_cb) + , _servq(io_srv->_servq) + , _fc_cb(fc_cb) // Call on release + { + // Get reference to DPDK context, since this owns some DPDK memory + _ctx = dpdk::dpdk_ctx::get(); + _num_send_frames = num_send_frames; + _num_recv_frames = num_recv_frames; + // Create the recv and release queues + // Must be power of 2, and add one since usable ring is size-1 + size_t queue_size = (size_t)exp2(ceil(log2(num_recv_frames + 1))); + dpdk::port_id_t nic_port = link->get_port()->get_port_id(); + uint16_t id = io_srv->_get_unique_client_id(); + UHD_LOG_DEBUG("DPDK::IO_SERVICE", "Creating recv client with queue size of " << queue_size); + char name[16]; + snprintf(name, sizeof(name), "rx%hu-%hu", nic_port, id); + _recv_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + snprintf(name, sizeof(name), "~rx%hu-%hu", nic_port, id); + _release_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + UHD_LOG_TRACE("DPDK::RECV_IO", "dpdk_recv_io() " << _recv_queue->name); + // Create the wait_request object that gets passed around + _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_RX, (void*)&_dpdk_io_if); + _waiter->complete = true; + } + + ~dpdk_recv_io() + { + // Deregister with I/O service + UHD_LOG_TRACE("DPDK::RECV_IO", "~dpdk_recv_io() " << _recv_queue->name); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + + // Clean up + wait_req_put(xport_req); + rte_ring_free(_recv_queue); + rte_ring_free(_release_queue); + wait_req_put(_waiter); + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + frame_buff* buff_ptr; + if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) { + if (!timeout_ms) { + return frame_buff::uptr(); + } + // Nothing in the queue. Try waiting if there is a timeout. + auto timeout_point = + std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + std::unique_lock lock(_waiter->mutex); + wait_req_get(_waiter); + _waiter->complete = false; + auto is_complete = [this] { return !rte_ring_empty(_recv_queue); }; + if (timeout_ms < 0) { + _waiter->cond.wait(lock, is_complete); + } else { + auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete); + if (!status) { + return frame_buff::uptr(); + } + } + if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) { + return frame_buff::uptr(); + } + } + return frame_buff::uptr(buff_ptr); + } + + void release_recv_buff(frame_buff::uptr buff) + { + frame_buff* buff_ptr = buff.release(); + int status = rte_ring_enqueue(_release_queue, buff_ptr); + if (status != 0) { + assert(false); + } + } + +private: + friend class dpdk_io_service; + + dpdk_io_if _dpdk_io_if; + size_t _num_frames_in_use = 0; + + dpdk::service_queue& _servq; + dpdk::dpdk_ctx::sptr _ctx; + struct rte_ring* _recv_queue; + struct rte_ring* _release_queue; + dpdk::wait_req* _waiter; + fc_callback_t _fc_cb; +}; + + +}} // namespace uhd::transport + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_simple.hpp b/host/lib/include/uhdlib/transport/dpdk_simple.hpp index 86abaeff8..8420510ea 100644 --- a/host/lib/include/uhdlib/transport/dpdk_simple.hpp +++ b/host/lib/include/uhdlib/transport/dpdk_simple.hpp @@ -8,7 +8,6 @@ #define INCLUDED_DPDK_SIMPLE_HPP #include -#include namespace uhd { namespace transport { @@ -17,35 +16,11 @@ class dpdk_simple : public udp_simple public: virtual ~dpdk_simple(void) = 0; - /*! - * Make a new connected dpdk transport: - * This transport is for sending and receiving - * between this host and a single endpoint. - * The primary usage for this transport will be control transactions. - * - * The address must be an ipv4 address. - * The port must be a number. - * - * \param addr a string representing the destination address - * \param port a string representing the destination port - */ - static udp_simple::sptr make_connected(struct uhd_dpdk_ctx &ctx, - const std::string &addr, const std::string &port); + static udp_simple::sptr make_connected( + const std::string& addr, const std::string& port); - /*! - * Make a new broadcasting dpdk transport: - * This transport can send broadcast datagrams - * and receive datagrams from multiple sources. - * The primary usage for this transport will be to discover devices. - * - * The address must be an ipv4 address. - * The port must be a number. - * - * \param addr a string representing the destination address - * \param port a string representing the destination port - */ - static udp_simple::sptr make_broadcast(struct uhd_dpdk_ctx &ctx, - const std::string &addr, const std::string &port); + static udp_simple::sptr make_broadcast( + const std::string& addr, const std::string& port); /*! * Send a single buffer. diff --git a/host/lib/include/uhdlib/transport/link_base.hpp b/host/lib/include/uhdlib/transport/link_base.hpp index e4d329e2a..a57b681ca 100644 --- a/host/lib/include/uhdlib/transport/link_base.hpp +++ b/host/lib/include/uhdlib/transport/link_base.hpp @@ -56,7 +56,7 @@ private: * the link interface methods. * * This template requires the following methods in the derived class: - * bool get_send_buf_derived(frame_buff& buf, int32_t timeout_ms); + * bool get_send_buff_derived(frame_buff& buf, int32_t timeout_ms); * void release_send_buf_derived(frame_buff& buf); * * Additionally, the subclass must call preload_free_buf for each frame_buff @@ -145,7 +145,7 @@ private: * the link interface methods. * * This template requires the following methods in the derived class: - * bool get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms); + * size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms); * void release_recv_buff_derived(frame_buff& buff); * * Additionally, the subclass must call preload_free_buff for each diff --git a/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp new file mode 100644 index 000000000..eaf3cf7c4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp @@ -0,0 +1,267 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP +#define INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +/*! + * A zero copy transport interface to the dpdk DMA library. + */ +class udp_dpdk_link : public virtual recv_link_if, public virtual send_link_if +{ +public: + using sptr = std::shared_ptr; + + udp_dpdk_link(const dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params); + + ~udp_dpdk_link(); + + /*! + * Make a new dpdk link. Get port ID from routing table. + * + * \param remote_addr Remote IP address + * \param remote_port Remote UDP port + * \param params Values for frame sizes, num frames, and buffer sizes + * \return a shared_ptr to a new udp dpdk link + */ + static sptr make(const std::string& remote_addr, + const std::string& remote_port, + const link_params_t& params); + + /*! + * Make a new dpdk link. User specifies DPDK port ID directly. + * + * \param port_id DPDK port ID to use for communication + * \param remote_addr Remote IP address + * \param remote_port Remote UDP port + * \param local_port Local UDP port + * \param params Values for frame sizes, num frames, and buffer sizes + * \return a shared_ptr to a new udp dpdk link + */ + static sptr make(const dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params); + + /*! + * Get the associated dpdk_port + * + * \return a pointer to the dpdk_port used by this link + */ + inline dpdk::dpdk_port* get_port() + { + return _port; + } + + /*! + * Get the DMA queue associated with this link + * + * \return the queue ID for this link's DMA queue + */ + inline dpdk::queue_id_t get_queue_id() + { + return _queue; + } + + /*! + * Get the local UDP port used by this link + * + * \return the local UDP port, in network order + */ + inline uint16_t get_local_port() + { + return _local_port; + } + + /*! + * Get the remote UDP port used by this link + * + * \return the remote UDP port, in network order + */ + inline uint16_t get_remote_port() + { + return _remote_port; + } + + /*! + * Get the remote IPv4 address used by this link + * + * \return the remote IPv4 address, in network order + */ + inline uint32_t get_remote_ipv4() + { + return _remote_ipv4; + } + + /*! + * Set the remote host's MAC address + * This MAC address must be filled in for the remote IPv4 address before + * the link can reach its destination. + * + * \param mac the remote host's MAC address + */ + inline void set_remote_mac(struct ether_addr& mac) + { + ether_addr_copy(&mac, &_remote_mac); + } + + /*! + * Get the remote host's MAC address + * + * \param mac Where to write the MAC address + */ + inline void get_remote_mac(struct ether_addr& dst) + { + ether_addr_copy(&_remote_mac, &dst); + } + + /*! + * Get the number of frame buffers that can be queued by this link. + */ + size_t get_num_send_frames() const + { + return _num_send_frames; + } + + /*! + * Get the maximum capacity of a frame buffer. + */ + size_t get_send_frame_size() const + { + return _send_frame_size; + } + + /*! + * Get the physical adapter ID used for this link + */ + inline adapter_id_t get_send_adapter_id() const + { + return _adapter_id; + } + + /*! + * Get the number of frame buffers that can be queued by this link. + */ + size_t get_num_recv_frames() const + { + return _num_recv_frames; + } + + /*! + * Get the maximum capacity of a frame buffer. + */ + size_t get_recv_frame_size() const + { + return _recv_frame_size; + } + + /*! + * Get the physical adapter ID used for this link + */ + inline adapter_id_t get_recv_adapter_id() const + { + return _adapter_id; + } + + /*! + * Enqueue a received mbuf, which can be pulled via get_recv_buff() + */ + void enqueue_recv_mbuf(struct rte_mbuf* mbuf); + + /*! + * Receive a packet and return a frame buffer containing the packet data. + * The timeout argument is ignored. + * + * Received buffers are pulled from the frame buffer list. No buffers can + * be retrieved unless the corresponding rte_mbufs were placed in the list + * via the enqueue_recv_mbuf() method. + * + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_recv_buff(int32_t /*timeout_ms*/); + + /*! + * Release a frame buffer, allowing the link driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_recv_buff(frame_buff::uptr buff); + + /*! + * Get an empty frame buffer in which to write packet contents. + * + * \param timeout_ms a positive timeout value specifies the maximum number + of ms to wait, a negative value specifies to block + until successful, and a value of 0 specifies no wait. + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_send_buff(int32_t /*timeout_ms*/); + + /*! + * Send a packet with the contents of the frame buffer and release the + * buffer, allowing the link driver to reuse it. If the size of the frame + * buffer is 0, the buffer is released with no packet being sent. + * + * Note that this function will only fill in the L2 header and send the + * mbuf. The L3 and L4 headers, in addition to the lengths in the rte_mbuf + * fields, must be set in the I/O service. + * + * \param buffer frame buffer containing packet data + * + * Throws an exception if an I/O error occurs while sending + */ + void release_send_buff(frame_buff::uptr buff); + +private: + //! A reference to the DPDK context + dpdk::dpdk_ctx::sptr _ctx; + //! The DPDK NIC port used by this link + dpdk::dpdk_port* _port; + //! Local UDP port, in network order + uint16_t _local_port; + //! Remote UDP port, in network order + uint16_t _remote_port; + //! Remote IPv4 address, in network order + uint32_t _remote_ipv4; + //! Remote host's MAC address + struct ether_addr _remote_mac; + //! Number of recv frames is not validated + size_t _num_recv_frames; + //! Maximum bytes of UDP payload data in recv frame + size_t _recv_frame_size; + //! Number of send frames is not validated + size_t _num_send_frames; + //! Maximum bytes of UDP payload data in send frame + size_t _send_frame_size; + //! Registered adapter ID for this link's DPDK NIC port + adapter_id_t _adapter_id; + //! The RX frame buff list head + dpdk::dpdk_frame_buff* _recv_buff_head = nullptr; + // TODO: Implement ability to use multiple queues + dpdk::queue_id_t _queue = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP */ -- cgit v1.2.3