diff options
Diffstat (limited to 'host')
18 files changed, 2633 insertions, 352 deletions
diff --git a/host/docs/dpdk.dox b/host/docs/dpdk.dox index e840fc235..3f71231a5 100644 --- a/host/docs/dpdk.dox +++ b/host/docs/dpdk.dox @@ -75,7 +75,23 @@ load that driver with the following command: modprobe vfio-pci For NICs that require vfio-pci (like Intel's X520), you'll want to use the -`dpdk-devbind.py` script to the vfio-pci driver. +`dpdk-devbind.py` script to the vfio-pci driver. This script is shipped with +DPDK and installed to `$prefix/share/dpdk/usertools`. If the NIC uses the +vfio-pci driver, and the package was installed apt-get, then a typical invocation +might be + + /usr/share/dpdk/usertools/dpdk-devbind.py --bind=vfio-pci ens6f0 + +If successful, the script might provide an updated status like this: + + /usr/share/dpdk/usertools/dpdk-devbind.py -s + + Network devices using DPDK-compatible driver + ============================================ + 0000:02:00.0 '82599ES 10-Gigabit SFI/SFP+ Network Connection 10fb' drv=vfio-pci unused=ixgbe + [...] + + See https://doc.dpdk.org/guides-18.11/linux_gsg/linux_drivers.html#binding-and-unbinding-network-ports-to-from-the-kernel-modules for more details. @@ -102,7 +118,7 @@ options: ;instead and swap between them [use_dpdk=1] ;dpdk_mtu is the NIC's MTU setting - ;This is separate from MPM's maximum packet size--tops out at 4000 + ;This is separate from MPM's maximum packet size dpdk_mtu=9000 ;dpdk_driver is the -d flag for the DPDK EAL. If DPDK doesn't pick up the driver for your NIC ;automatically, you may need this argument to point it to the folder where it can find the drivers @@ -115,8 +131,8 @@ options: ;dpdk_num_mbufs is the total number of packet buffers allocated ;to each direction's packet buffer pool ;This will be multiplied by the number of NICs, but NICs on the same - ;CPU socket share a pool - dpdk_num_mbufs=512 + ;CPU socket share a pool. + dpdk_num_mbufs=4096 ;dpdk_mbuf_cache_size is the number of buffers to cache for a CPU ;The cache reduces the interaction with the global pool dpdk_mbuf_cache_size=64 @@ -127,20 +143,24 @@ address, and it must be in a particular format. Hex digits must all be lower case, and octets must be separated by colons. Here is an example: [dpdk_mac=3c:fd:fe:a2:a9:09] - ;dpdk_io_cpu selects the CPU that this NIC's driver will run on - ;Multiple NICs may occupy one CPU, but the I/O thread will completely - ;consume that CPU. Also, 0 is reserved for the master thread (i.e. + ;dpdk_lcore selects the lcore that this NIC's driver will run on + ;Multiple NICs may occupy one lcore, but the I/O thread will completely + ;consume that lcore's CPU. Also, 0 is reserved for the master thread (i.e. ;the initial UHD thread that calls init() for DPDK). Attempting to ;use it as an I/O thread will only result in hanging. - dpdk_io_cpu = 1 + ;Note also that by default, the lcore ID will be the same as the CPU ID. + dpdk_lcore = 1 ;dpdk_ipv4 specifies the IPv4 address, and both the address and ;subnet mask are required (and in this format!). DPDK uses the ;netmask to create a basic routing table. Routing to other networks ;(i.e. via gateways) is not permitted. dpdk_ipv4 = 192.168.10.1/24 + ;dpdk_num_desc is the number of descriptors in each DMA ring. + ;Must be a power of 2. + dpdk_num_desc=4096 [dpdk_mac=3c:fd:fe:a2:a9:0a] - dpdk_io_cpu = 1 + dpdk_lcore = 1 dpdk_ipv4 = 192.168.20.1/24 \section dpdk_using Using DPDK in UHD diff --git a/host/lib/include/uhdlib/transport/dpdk/arp.hpp b/host/lib/include/uhdlib/transport/dpdk/arp.hpp new file mode 100644 index 000000000..e71119bb3 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/arp.hpp @@ -0,0 +1,29 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <rte_arp.h> + +namespace uhd { namespace transport { namespace dpdk { + +struct arp_request +{ + struct ether_addr tha; + port_id_t port; + ipv4_addr tpa; +}; + +struct arp_entry +{ + struct ether_addr mac_addr; + std::vector<wait_req*> reqs; +}; + +}}} /* namespace uhd::transport::dpdk */ +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk/common.hpp b/host/lib/include/uhdlib/transport/dpdk/common.hpp index 1f4466a0f..ac3526e49 100644 --- a/host/lib/include/uhdlib/transport/dpdk/common.hpp +++ b/host/lib/include/uhdlib/transport/dpdk/common.hpp @@ -1,44 +1,130 @@ // -// Copyright 2019 Ettus Research, a National Instruments brand +// Copyright 2019 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // + #ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ #define _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ #include <uhd/config.hpp> +#include <uhd/transport/frame_buff.hpp> #include <uhd/types/device_addr.hpp> -#include <uhd/utils/noncopyable.hpp> -#include <uhd/utils/static.hpp> +#include <uhdlib/transport/adapter_info.hpp> #include <rte_ethdev.h> #include <rte_ether.h> #include <rte_flow.h> #include <rte_mbuf.h> #include <rte_mempool.h> +#include <rte_spinlock.h> #include <rte_version.h> #include <unordered_map> #include <array> #include <atomic> #include <mutex> +#include <set> #include <string> -/* NOTE: There are changes to rte_eth_addr in 19.x */ +/* NOTE: There are changes to all the network standard fields in 19.x */ + + +namespace uhd { namespace transport { + +class dpdk_io_service; + +namespace dpdk { -namespace uhd { namespace transport { namespace dpdk { +struct arp_entry; using queue_id_t = uint16_t; using port_id_t = uint16_t; using ipv4_addr = uint32_t; +class dpdk_adapter_info : public adapter_info +{ +public: + dpdk_adapter_info(port_id_t port) : _port(port) {} + ~dpdk_adapter_info() {} + + std::string to_string() + { + return std::string("DPDK:") + std::to_string(_port); + } + + bool operator==(const dpdk_adapter_info& rhs) const + { + return (_port == rhs._port); + } + +private: + // Port ID + port_id_t _port; +}; + + +/*! + * Packet/Frame buffer class for DPDK + * + * This class is intended to be placed in the private area of the rte_mbuf, and + * its memory is part of the rte_mbuf, so its life is tied to the underlying + * buffer (or more precisely, the encapsulating one). + */ +class dpdk_frame_buff : public frame_buff +{ +public: + dpdk_frame_buff(struct rte_mbuf* mbuf) : _mbuf(mbuf) + { + _data = rte_pktmbuf_mtod(mbuf, void*); + _packet_size = 0; + } + + ~dpdk_frame_buff() = default; + + /*! + * Simple getter for the underlying rte_mbuf. + * The rte_mbuf may need further modification before sending packets, + * like adjusting the IP and UDP lengths. + */ + inline struct rte_mbuf* get_pktmbuf() + { + return _mbuf; + } + + /*! + * Move the data pointer by the indicated size, to some desired + * encapsulated frame. + * + * \param hdr_size Size (in bytes) of the headers to skip. Can be negative + * to pull the header back. + */ + inline void header_jump(ssize_t hdr_size) + { + _data = (void*)((uint8_t*)_data + hdr_size); + } + + //! Embedded list node's next ptr + dpdk_frame_buff* next = nullptr; + //! Embedded list node's prev ptr + dpdk_frame_buff* prev = nullptr; + +private: + struct rte_mbuf* _mbuf; +}; + + +/*! + * The size (in bytes) of the private area reserved within the rte_mbuf. + * This portion of the rte_mbuf is used for the embedded dpdk_frame_buff data + * structure. + */ +constexpr size_t DPDK_MBUF_PRIV_SIZE = + RTE_ALIGN(sizeof(struct dpdk_frame_buff), RTE_MBUF_PRIV_ALIGN); + /*! * Class representing a DPDK NIC port * * The dpdk_port object possesses all the data needed to send and receive - * packets between this port and a remote host. A logical link should specify - * which packets are destined for it and allocate a DMA queue with the - * dpdk_port::alloc_queue() function. A logical link should not, however, - * specify ARP packets for its set of received packets. That functionality is - * reserved for the special queue 0. + * packets between this port and a remote host. * * The logical link can then get the packet buffer pools associated with this * NIC port and use them to send and receive packets. @@ -55,7 +141,7 @@ public: * \param port The port ID * \param mtu The intended MTU for the port * \param num_queues Number of DMA queues to reserve for this port - * \param num_mbufs The number of packet buffers per queue + * \param num_desc The number of descriptors per DMA queue * \param rx_pktbuf_pool A pointer to the port's RX packet buffer pool * \param tx_pktbuf_pool A pointer to the port's TX packet buffer pool * \param ipv4_address The IPv4 network address (w/ netmask) @@ -64,7 +150,7 @@ public: static dpdk_port::uptr make(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address); @@ -72,11 +158,13 @@ public: dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address); + ~dpdk_port(); + /*! Getter for this port's ID * \return this port's ID */ @@ -85,6 +173,19 @@ public: return _port; } + inline dpdk_adapter_info get_adapter_info() const + { + return dpdk_adapter_info(_port); + } + + /*! Getter for this port's MAC address + * \return this port's MAC address + */ + inline ether_addr get_mac_addr() const + { + return _mac_addr; + } + /*! Getter for this port's MTU * \return this port's MTU */ @@ -141,68 +242,45 @@ public: * \param dst_ipv4_addr The destination IPv4 address (in network order) * \return whether the destination address matches this port's broadcast address */ - inline bool dst_is_broadcast(const uint32_t dst_ipv4_addr) const + inline bool dst_is_broadcast(const ipv4_addr dst_ipv4_addr) const { uint32_t network = _netmask | ((~_netmask) & dst_ipv4_addr); return (network == 0xffffffff); } - /*! Allocate a DMA queue (TX/RX pair) and use the specified flow pattern - * to route packets to the RX queue. - * - * \pattern recv_pattern The flow pattern to use for directing traffic to - * the allocated RX queue. - * \return The queue ID for the allocated queue - * \throw uhd::runtime_error when there are no free queues - */ - queue_id_t alloc_queue(struct rte_flow_pattern recv_pattern[]); - - /*! Free a previously allocated queue and tear down the associated flow rule - * \param queue The ID of the queue to free - * \throw std::out_of_range when the queue ID is not currently allocated - */ - void free_queue(queue_id_t queue); - /*! - * Process ARP request/reply + * Allocate a UDP port and return it in network order + * + * \param udp_port UDP port to attempt to allocate. Use 0 for no preference. + * \return 0 for failure, else the allocated UDP port in network order. */ - // int process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr *arp_frame); + uint16_t alloc_udp_port(uint16_t udp_port); private: + friend uhd::transport::dpdk_io_service; + /*! * Construct and transmit an ARP reply (for the given ARP request) */ - int _arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req); + int _arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req); port_id_t _port; size_t _mtu; + size_t _num_queues; struct rte_mempool* _rx_pktbuf_pool; struct rte_mempool* _tx_pktbuf_pool; struct ether_addr _mac_addr; ipv4_addr _ipv4; ipv4_addr _netmask; - size_t _num_queues; - std::vector<queue_id_t> _free_queues; - std::unordered_map<queue_id_t, struct rte_flow*> _flow_rules; - /* Need ARP table - * To implement ARP service, maybe create ARP xport - * Then need dpdk_udp_link and dpdk_raw_link - * - * ...Or just do it inline with dpdk_ctx - * - * And link can just save the result (do it during constructor) - * - * - * But what about the service that _responds_ to ARP requests?! - * - * Maybe have to connect a DPDK link in stages: - * First, create the ARP service and attach it to the dpdk_ctx - * dpdk_ctx must own the links...? - * Or! Always burn a DMA engine for ARP - * - * Maybe have a shared_ptr to an ARP service here? - */ + + // Structures protected by mutex std::mutex _mutex; + std::set<uint16_t> _udp_ports; + uint16_t _next_udp_port = 0xffff; + + // Structures protected by spin lock + rte_spinlock_t _spinlock = RTE_SPINLOCK_INITIALIZER; + std::unordered_map<ipv4_addr, struct arp_entry*> _arp_table; }; @@ -267,17 +345,21 @@ public: int get_port_link_status(port_id_t portid) const; /*! - * Get port ID for routing packet destined for given address + * Get port for routing packet destined for given address * \param addr Destination address - * \return port ID from routing table + * \return pointer to the port from routing table */ - int get_route(const std::string& addr) const; + dpdk_port* get_route(const std::string& addr) const; /*! * \return whether init() has been called */ bool is_init_done(void) const; + /*! Return a reference to an IO service given a port ID + */ + std::shared_ptr<uhd::transport::dpdk_io_service> get_io_service(const size_t port_id); + private: /*! Convert the args to DPDK's EAL args and Initialize the EAL * @@ -312,8 +394,12 @@ private: std::unordered_map<port_id_t, dpdk_port::uptr> _ports; std::vector<struct rte_mempool*> _rx_pktbuf_pools; std::vector<struct rte_mempool*> _tx_pktbuf_pools; + // Store all the I/O services, and also store the corresponding port ID + std::map<std::shared_ptr<uhd::transport::dpdk_io_service>, std::vector<size_t>> + _io_srv_portid_map; }; -}}} // namespace uhd::transport::dpdk +} // namespace dpdk +}} // namespace uhd::transport #endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp index 7c9917079..c95786864 100644 --- a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp +++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp @@ -28,6 +28,14 @@ enum wait_type { WAIT_FLOW_OPEN, //! Wake once the flow/socket is destroyed WAIT_FLOW_CLOSE, + //! Wake once the new transport is connected + WAIT_XPORT_CONNECT, + //! Wake once the transport is disconnected + WAIT_XPORT_DISCONNECT, + //! Wake when MAC address found for IP address + WAIT_ARP, + //! Wake once the I/O worker terminates + WAIT_LCORE_TERM, //! Number of possible reasons for waiting WAIT_TYPE_COUNT }; @@ -50,6 +58,8 @@ struct wait_req std::mutex mutex; //! Whether the request was completed bool complete; + //! The status or error code associated with the request + int retval; //! An atomic reference counter for managing this request object's memory rte_atomic32_t refcnt; }; @@ -110,7 +120,7 @@ class service_queue public: /*! * Create a service queue - * \param depth Must be a power of 2 + * \param depth Must be a power of 2. Actual size is less. * \param lcore_id The DPDK lcore_id associated with this service queue */ service_queue(size_t depth, unsigned int lcore_id) @@ -227,6 +237,24 @@ public: return stat; } + /*! + * Get the size of the service queue + * \return size of the service queue + */ + inline size_t get_size() + { + return rte_ring_get_size(_waiter_ring); + } + + /*! + * Get the capacity of the service queue + * \return capacity of the service queue + */ + inline size_t get_capacity() + { + return rte_ring_get_capacity(_waiter_ring); + } + private: //! Multi-producer, single-consumer ring for requests struct rte_ring* _waiter_ring; diff --git a/host/lib/include/uhdlib/transport/dpdk/udp.hpp b/host/lib/include/uhdlib/transport/dpdk/udp.hpp new file mode 100644 index 000000000..65e561315 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/udp.hpp @@ -0,0 +1,115 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <arpa/inet.h> +#include <netinet/udp.h> +#include <rte_ip.h> +#include <rte_udp.h> +#include <boost/format.hpp> + +namespace uhd { namespace transport { namespace dpdk { + +constexpr size_t HDR_SIZE_UDP_IPV4 = + (sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + +/*! + * An enumerated type representing the type of flow for an IPv4 client + * Currently, only UDP is supported (FLOW_TYPE_UDP) + */ +enum flow_type { + FLOW_TYPE_UDP, + FLOW_TYPE_COUNT, +}; + +/*! + * A tuple for IPv4 flows that can be used for hashing + */ +struct ipv4_5tuple +{ + enum flow_type flow_type; + ipv4_addr src_ip; + ipv4_addr dst_ip; + uint16_t src_port; + uint16_t dst_port; +}; + +inline void fill_ipv4_hdr(struct rte_mbuf* mbuf, + const dpdk_port* port, + uint32_t dst_ipv4_addr, + uint8_t proto_id, + uint32_t payload_len) +{ + struct ether_hdr* eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)ð_hdr[1]; + + ip_hdr->version_ihl = 0x40 | 5; + ip_hdr->type_of_service = 0; + ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len); + ip_hdr->packet_id = 0; + ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG); + ip_hdr->time_to_live = 64; + ip_hdr->next_proto_id = proto_id; + ip_hdr->hdr_checksum = 0; // Require HW offload + ip_hdr->src_addr = port->get_ipv4(); + ip_hdr->dst_addr = dst_ipv4_addr; + + mbuf->ol_flags = PKT_TX_IP_CKSUM | PKT_TX_IPV4; + mbuf->l2_len = sizeof(struct ether_hdr); + mbuf->l3_len = sizeof(struct ipv4_hdr); + mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; + mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +} + +/* All values except payload length must be in network order */ +inline void fill_udp_hdr(struct rte_mbuf* mbuf, + const dpdk_port* port, + uint32_t dst_ipv4_addr, + uint16_t src_port, + uint16_t dst_port, + uint32_t payload_len) +{ + struct ether_hdr* eth_hdr; + struct ipv4_hdr* ip_hdr; + struct udp_hdr* tx_hdr; + + fill_ipv4_hdr( + mbuf, port, dst_ipv4_addr, IPPROTO_UDP, sizeof(struct udp_hdr) + payload_len); + + eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + ip_hdr = (struct ipv4_hdr*)ð_hdr[1]; + tx_hdr = (struct udp_hdr*)&ip_hdr[1]; + + tx_hdr->src_port = src_port; + tx_hdr->dst_port = dst_port; + tx_hdr->dgram_len = rte_cpu_to_be_16(8 + payload_len); + tx_hdr->dgram_cksum = 0; + mbuf->l4_len = sizeof(struct udp_hdr); +} + +//! Return an IPv4 address (numeric, in network order) into a string +inline std::string ipv4_num_to_str(const uint32_t ip_addr) +{ + char addr_str[INET_ADDRSTRLEN]; + struct in_addr ipv4_addr; + ipv4_addr.s_addr = ip_addr; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); +} + +inline std::string eth_addr_to_string(const struct ether_addr mac_addr) +{ + auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); + mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] + % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] + % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; + return mac_stream.str(); +} + +}}} /* namespace uhd::transport::dpdk */ +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp new file mode 100644 index 000000000..8e1fb29d0 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp @@ -0,0 +1,246 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <rte_arp.h> +#include <rte_hash.h> +#include <rte_ip.h> +#include <rte_mbuf.h> +#include <rte_udp.h> +#include <vector> + +namespace uhd { namespace transport { + +class dpdk_send_io; +class dpdk_recv_io; +struct dpdk_io_if; + +class dpdk_io_service : public virtual io_service, + public std::enable_shared_from_this<dpdk_io_service> +{ +public: + using sptr = std::shared_ptr<dpdk_io_service>; + + static sptr make( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth); + + ~dpdk_io_service(); + + // Add entry to RX flow table + // This yields a link, which is then used for attaching to a buffer + // We yank from the link immediately following, then process at the transport level + // (so two tables here, one for transports, one for links) + void attach_recv_link(recv_link_if::sptr link); + + // Create object to hold set of queues, to go in TX table + void attach_send_link(send_link_if::sptr link); + + void detach_recv_link(recv_link_if::sptr link); + + void detach_send_link(send_link_if::sptr link); + + recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr fc_link, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb); + + send_io_if::sptr make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr recv_link, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb); + + +private: + friend class dpdk_recv_io; + friend class dpdk_send_io; + + dpdk_io_service( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth); + dpdk_io_service(const dpdk_io_service&) = delete; + + /*! + * I/O worker function to be passed to the DPDK lcore + * + * The argument must be a pointer to *this* dpdk_io_service + * + * \param arg a pointer to this dpdk_io_service + * \return 0 for normal termination, else nonzero + */ + static int _io_worker(void* arg); + + /*! + * Helper function for I/O thread to process requests on its service queue + */ + int _service_requests(); + + /*! + * Helper function for I/O thread to service a WAIT_FLOW_OPEN request + * + * \param req The requester's wait_req object + */ + void _service_flow_open(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to service a WAIT_FLOW_CLOSE request + * + * \param req The requester's wait_req object + */ + void _service_flow_close(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to service a WAIT_XPORT_CONNECT request + * + * \param req The requester's wait_req object + */ + void _service_xport_connect(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to service a WAIT_XPORT_DISCONNECT request + * + * \param req The requester's wait_req object + */ + void _service_xport_disconnect(dpdk::wait_req* req); + + /*! + * Get Ethernet MAC address for the given IPv4 address, and wake the + * requester when finished. + * This may only be called by an I/O service, on behalf of a requester's + * WAIT_ARP request. + * + * \param req The requester's wait_req object + * \return 0 if address was written, -EAGAIN if request was queued for + * later completion, -ENOMEM if ran out of memory to complete + * request + */ + int _service_arp_request(dpdk::wait_req* req); + + /*! + * Helper function for I/O thread to do a burst of packet retrieval and + * processing on an RX queue + * + * \param port the DPDK NIC port used for RX + * \param queue the DMA queue on the port to recv from + */ + int _rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue); + + /*! + * Helper function for I/O thread to do a burst of packet transmission on a + * TX queue + * + * \param port the DPDK NIC port used for TX + * \return number of buffers transmitted + */ + int _tx_burst(dpdk::dpdk_port* port); + + /*! + * Helper function for I/O thread to release a burst of buffers from an RX + * release queue + * + * \param port the DPDK NIC port used for RX + * \return number of buffers released + */ + int _rx_release(dpdk::dpdk_port* port); + + /*! + * Helper function for I/O thread to do send an ARP request + * + * \param port the DPDK NIC port to send the ARP request through + * \param queue the DMA queue on the port to send to + * \param ip the IPv4 address for which the caller is seeking a MAC address + */ + int _send_arp_request( + dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip); + + /*! + * Helper function for I/O thread to process an ARP request/reply + * + * \param port the DPDK NIC port to send any ARP replies from + * \param queue the DMA queue on the port to send ARP replies to + * \param arp_frame a pointer to the ARP frame + */ + int _process_arp( + dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame); + + /*! + * Helper function for I/O thread to process an IPv4 packet + * + * \param port the DPDK NIC port to send any ARP replies from + * \param mbuf a pointer to the packet buffer container + * \param pkt a pointer to the IPv4 header of the packet + */ + int _process_ipv4(dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt); + + /*! + * Helper function for I/O thread to process an IPv4 packet + * + * \param port the DPDK NIC port to send any ARP replies from + * \param mbuf a pointer to the packet buffer container + * \param pkt a pointer to the UDP header of the packet + * \param bcast whether this packet was destined for the port's broadcast + * IPv4 address + */ + int _process_udp( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool bcast); + + /*! + * Helper function to get a unique client ID + * + * \return a unique client ID + */ + uint16_t _get_unique_client_id(); + + /*! + * Attempt to wake client + */ + void _wake_client(dpdk_io_if* dpdk_io); + + //! The reference to the DPDK context + std::weak_ptr<dpdk::dpdk_ctx> _ctx; + //! The lcore running this dpdk_io_service's work routine + unsigned int _lcore_id; + //! The NIC ports served by this dpdk_io_service + std::vector<dpdk::dpdk_port*> _ports; + //! The set of TX queues associated with a given port + std::unordered_map<dpdk::port_id_t, std::list<dpdk_send_io*>> _tx_queues; + //! The list of recv_io for each port + std::unordered_map<dpdk::port_id_t, std::list<dpdk_recv_io*>> _recv_xport_map; + //! The RX table, which provides lists of dpdk_recv_io for an IPv4 tuple + struct rte_hash* _rx_table; + //! Service queue for clients to make requests + dpdk::service_queue _servq; + //! Retry list for waking clients + dpdk_io_if* _retry_head = NULL; + + //! Mutex to protect below data structures + std::mutex _mutex; + //! The recv links attached to this I/O service (managed client side) + std::list<recv_link_if::sptr> _recv_links; + //! The send links attached to this I/O service (managed client side) + std::list<send_link_if::sptr> _send_links; + //! Set of IDs for new clients + std::set<uint16_t> _client_id_set; + //! Next ID to try + uint16_t _next_client_id; + + static constexpr int MAX_PENDING_SERVICE_REQS = 32; + static constexpr int MAX_FLOWS = 128; + static constexpr int MAX_CLIENTS = 2048; + static constexpr int RX_BURST_SIZE = 16; + static constexpr int TX_BURST_SIZE = 16; +}; + +}} // namespace uhd::transport + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp new file mode 100644 index 000000000..451cc1531 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp @@ -0,0 +1,285 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> +#include <rte_ring.h> +#include <chrono> + +namespace uhd { namespace transport { + +struct dpdk_flow_data +{ + //! The UDP DPDK link + udp_dpdk_link* link; + //! Is it a recv_link_if? Or a send_link_if? + bool is_recv; +}; + +/*! DPDK I/O interface for service requests + * + * This is used to pass around information about the I/O clients. It is what is + * passed into the data portion of a request, for connect and disconnect + * requests. + * + */ +struct dpdk_io_if +{ + dpdk_io_if(bool is_recv, + udp_dpdk_link* link, + dpdk_io_service::sptr io_srv, + recv_callback_t recv_cb) + : is_recv(is_recv), link(link), io_srv(io_srv), recv_cb(recv_cb) + { + } + + bool is_recv; + udp_dpdk_link* link; + dpdk_io_service::sptr io_srv; + recv_callback_t recv_cb; + void* io_client; + //! Embedded list node + dpdk_io_if* next = NULL; + dpdk_io_if* prev = NULL; +}; + +// This must be tied to a link for frame_buffs +// so need map of dpdk_send_io to udp_dpdk_link +// Have queue pair: buffs to send + free buffs +// There used to be a retry queue: Still needed? (Maybe 1 per port?) +class dpdk_send_io : public virtual send_io_if +{ +public: + using sptr = std::shared_ptr<dpdk_send_io>; + + dpdk_send_io(dpdk_io_service::sptr io_srv, + udp_dpdk_link* link, + size_t num_send_frames, + send_callback_t send_cb, + size_t num_recv_frames, + recv_callback_t recv_cb, + fc_callback_t fc_cb) + : _dpdk_io_if(false, link, io_srv, recv_cb) + , _servq(io_srv->_servq) + , _send_cb(send_cb) + , _fc_cb(fc_cb) + { + // Get reference to DPDK context, since this owns some DPDK memory + _ctx = dpdk::dpdk_ctx::get(); + _num_send_frames = num_send_frames; + _num_recv_frames = num_recv_frames; + + // Create the free buffer and send queues + // Must be power of 2, and add one since usable ring is size-1 + size_t queue_size = (size_t)exp2(ceil(log2(num_send_frames + 1))); + dpdk::port_id_t nic_port = link->get_port()->get_port_id(); + uint16_t id = io_srv->_get_unique_client_id(); + char name[16]; + snprintf(name, sizeof(name), "tx%hu-%hu", nic_port, id); + _buffer_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + snprintf(name, sizeof(name), "~tx%hu-%hu", nic_port, id); + _send_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + UHD_LOG_TRACE("DPDK::SEND_IO", "dpdk_send_io() " << _buffer_queue->name); + + // Create the wait_request object that gets passed around + _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_TX_BUF, (void*)&_dpdk_io_if); + _waiter->complete = true; + } + + ~dpdk_send_io() + { + UHD_LOG_TRACE("DPDK::SEND_IO", "~dpdk_send_io() " << _buffer_queue->name); + // Deregister with I/O service + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + + // Clean up + wait_req_put(xport_req); + rte_ring_free(_send_queue); + rte_ring_free(_buffer_queue); + wait_req_put(_waiter); + } + + bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/) + { + // For this I/O service, the destination is the queue to the offload + // thread. The queue is always able to accomodate new packets since it + // is sized to fit all the frames reserved from the link. + return true; + } + + frame_buff::uptr get_send_buff(int32_t timeout_ms) + { + frame_buff* buff_ptr; + if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) { + if (!timeout_ms) { + return frame_buff::uptr(); + } + // Nothing in the queue. Try waiting if there is a timeout. + auto timeout_point = + std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + std::unique_lock<std::mutex> lock(_waiter->mutex); + wait_req_get(_waiter); + _waiter->complete = false; + auto is_complete = [this] { return !rte_ring_empty(_buffer_queue); }; + if (timeout_ms < 0) { + _waiter->cond.wait(lock, is_complete); + } else { + auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete); + if (!status) { + return frame_buff::uptr(); + } + } + if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) { + return frame_buff::uptr(); + } + } + return frame_buff::uptr(buff_ptr); + } + + void release_send_buff(frame_buff::uptr buff) + { + auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); + assert(buff_ptr); + int status = rte_ring_enqueue(_send_queue, buff_ptr); + if (status != 0) { + assert(false); + } + // TODO: Should we retry if it failed? + } + +private: + friend class dpdk_io_service; + + dpdk_io_if _dpdk_io_if; + size_t _num_frames_in_use = 0; + + dpdk::service_queue& _servq; + dpdk::dpdk_ctx::sptr _ctx; + struct rte_ring* _buffer_queue; + struct rte_ring* _send_queue; + dpdk::wait_req* _waiter; + send_callback_t _send_cb; + fc_callback_t _fc_cb; +}; + +class dpdk_recv_io : public virtual recv_io_if +{ +public: + using sptr = std::shared_ptr<dpdk_recv_io>; + + dpdk_recv_io(dpdk_io_service::sptr io_srv, + udp_dpdk_link* link, + size_t num_recv_frames, + recv_callback_t recv_cb, + size_t num_send_frames, + fc_callback_t fc_cb) + : _dpdk_io_if(true, link, io_srv, recv_cb) + , _servq(io_srv->_servq) + , _fc_cb(fc_cb) // Call on release + { + // Get reference to DPDK context, since this owns some DPDK memory + _ctx = dpdk::dpdk_ctx::get(); + _num_send_frames = num_send_frames; + _num_recv_frames = num_recv_frames; + // Create the recv and release queues + // Must be power of 2, and add one since usable ring is size-1 + size_t queue_size = (size_t)exp2(ceil(log2(num_recv_frames + 1))); + dpdk::port_id_t nic_port = link->get_port()->get_port_id(); + uint16_t id = io_srv->_get_unique_client_id(); + UHD_LOG_DEBUG("DPDK::IO_SERVICE", "Creating recv client with queue size of " << queue_size); + char name[16]; + snprintf(name, sizeof(name), "rx%hu-%hu", nic_port, id); + _recv_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + snprintf(name, sizeof(name), "~rx%hu-%hu", nic_port, id); + _release_queue = rte_ring_create( + name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + UHD_LOG_TRACE("DPDK::RECV_IO", "dpdk_recv_io() " << _recv_queue->name); + // Create the wait_request object that gets passed around + _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_RX, (void*)&_dpdk_io_if); + _waiter->complete = true; + } + + ~dpdk_recv_io() + { + // Deregister with I/O service + UHD_LOG_TRACE("DPDK::RECV_IO", "~dpdk_recv_io() " << _recv_queue->name); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + + // Clean up + wait_req_put(xport_req); + rte_ring_free(_recv_queue); + rte_ring_free(_release_queue); + wait_req_put(_waiter); + } + + frame_buff::uptr get_recv_buff(int32_t timeout_ms) + { + frame_buff* buff_ptr; + if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) { + if (!timeout_ms) { + return frame_buff::uptr(); + } + // Nothing in the queue. Try waiting if there is a timeout. + auto timeout_point = + std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + std::unique_lock<std::mutex> lock(_waiter->mutex); + wait_req_get(_waiter); + _waiter->complete = false; + auto is_complete = [this] { return !rte_ring_empty(_recv_queue); }; + if (timeout_ms < 0) { + _waiter->cond.wait(lock, is_complete); + } else { + auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete); + if (!status) { + return frame_buff::uptr(); + } + } + if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) { + return frame_buff::uptr(); + } + } + return frame_buff::uptr(buff_ptr); + } + + void release_recv_buff(frame_buff::uptr buff) + { + frame_buff* buff_ptr = buff.release(); + int status = rte_ring_enqueue(_release_queue, buff_ptr); + if (status != 0) { + assert(false); + } + } + +private: + friend class dpdk_io_service; + + dpdk_io_if _dpdk_io_if; + size_t _num_frames_in_use = 0; + + dpdk::service_queue& _servq; + dpdk::dpdk_ctx::sptr _ctx; + struct rte_ring* _recv_queue; + struct rte_ring* _release_queue; + dpdk::wait_req* _waiter; + fc_callback_t _fc_cb; +}; + + +}} // namespace uhd::transport + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_simple.hpp b/host/lib/include/uhdlib/transport/dpdk_simple.hpp index 86abaeff8..8420510ea 100644 --- a/host/lib/include/uhdlib/transport/dpdk_simple.hpp +++ b/host/lib/include/uhdlib/transport/dpdk_simple.hpp @@ -8,7 +8,6 @@ #define INCLUDED_DPDK_SIMPLE_HPP #include <uhd/transport/udp_simple.hpp> -#include <uhdlib/transport/dpdk_common.hpp> namespace uhd { namespace transport { @@ -17,35 +16,11 @@ class dpdk_simple : public udp_simple public: virtual ~dpdk_simple(void) = 0; - /*! - * Make a new connected dpdk transport: - * This transport is for sending and receiving - * between this host and a single endpoint. - * The primary usage for this transport will be control transactions. - * - * The address must be an ipv4 address. - * The port must be a number. - * - * \param addr a string representing the destination address - * \param port a string representing the destination port - */ - static udp_simple::sptr make_connected(struct uhd_dpdk_ctx &ctx, - const std::string &addr, const std::string &port); + static udp_simple::sptr make_connected( + const std::string& addr, const std::string& port); - /*! - * Make a new broadcasting dpdk transport: - * This transport can send broadcast datagrams - * and receive datagrams from multiple sources. - * The primary usage for this transport will be to discover devices. - * - * The address must be an ipv4 address. - * The port must be a number. - * - * \param addr a string representing the destination address - * \param port a string representing the destination port - */ - static udp_simple::sptr make_broadcast(struct uhd_dpdk_ctx &ctx, - const std::string &addr, const std::string &port); + static udp_simple::sptr make_broadcast( + const std::string& addr, const std::string& port); /*! * Send a single buffer. diff --git a/host/lib/include/uhdlib/transport/link_base.hpp b/host/lib/include/uhdlib/transport/link_base.hpp index e4d329e2a..a57b681ca 100644 --- a/host/lib/include/uhdlib/transport/link_base.hpp +++ b/host/lib/include/uhdlib/transport/link_base.hpp @@ -56,7 +56,7 @@ private: * the link interface methods. * * This template requires the following methods in the derived class: - * bool get_send_buf_derived(frame_buff& buf, int32_t timeout_ms); + * bool get_send_buff_derived(frame_buff& buf, int32_t timeout_ms); * void release_send_buf_derived(frame_buff& buf); * * Additionally, the subclass must call preload_free_buf for each frame_buff @@ -145,7 +145,7 @@ private: * the link interface methods. * * This template requires the following methods in the derived class: - * bool get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms); + * size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms); * void release_recv_buff_derived(frame_buff& buff); * * Additionally, the subclass must call preload_free_buff for each diff --git a/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp new file mode 100644 index 000000000..eaf3cf7c4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp @@ -0,0 +1,267 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP +#define INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <uhdlib/transport/links.hpp> +#include <rte_udp.h> +#include <cassert> +#include <string> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * A zero copy transport interface to the dpdk DMA library. + */ +class udp_dpdk_link : public virtual recv_link_if, public virtual send_link_if +{ +public: + using sptr = std::shared_ptr<udp_dpdk_link>; + + udp_dpdk_link(const dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params); + + ~udp_dpdk_link(); + + /*! + * Make a new dpdk link. Get port ID from routing table. + * + * \param remote_addr Remote IP address + * \param remote_port Remote UDP port + * \param params Values for frame sizes, num frames, and buffer sizes + * \return a shared_ptr to a new udp dpdk link + */ + static sptr make(const std::string& remote_addr, + const std::string& remote_port, + const link_params_t& params); + + /*! + * Make a new dpdk link. User specifies DPDK port ID directly. + * + * \param port_id DPDK port ID to use for communication + * \param remote_addr Remote IP address + * \param remote_port Remote UDP port + * \param local_port Local UDP port + * \param params Values for frame sizes, num frames, and buffer sizes + * \return a shared_ptr to a new udp dpdk link + */ + static sptr make(const dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params); + + /*! + * Get the associated dpdk_port + * + * \return a pointer to the dpdk_port used by this link + */ + inline dpdk::dpdk_port* get_port() + { + return _port; + } + + /*! + * Get the DMA queue associated with this link + * + * \return the queue ID for this link's DMA queue + */ + inline dpdk::queue_id_t get_queue_id() + { + return _queue; + } + + /*! + * Get the local UDP port used by this link + * + * \return the local UDP port, in network order + */ + inline uint16_t get_local_port() + { + return _local_port; + } + + /*! + * Get the remote UDP port used by this link + * + * \return the remote UDP port, in network order + */ + inline uint16_t get_remote_port() + { + return _remote_port; + } + + /*! + * Get the remote IPv4 address used by this link + * + * \return the remote IPv4 address, in network order + */ + inline uint32_t get_remote_ipv4() + { + return _remote_ipv4; + } + + /*! + * Set the remote host's MAC address + * This MAC address must be filled in for the remote IPv4 address before + * the link can reach its destination. + * + * \param mac the remote host's MAC address + */ + inline void set_remote_mac(struct ether_addr& mac) + { + ether_addr_copy(&mac, &_remote_mac); + } + + /*! + * Get the remote host's MAC address + * + * \param mac Where to write the MAC address + */ + inline void get_remote_mac(struct ether_addr& dst) + { + ether_addr_copy(&_remote_mac, &dst); + } + + /*! + * Get the number of frame buffers that can be queued by this link. + */ + size_t get_num_send_frames() const + { + return _num_send_frames; + } + + /*! + * Get the maximum capacity of a frame buffer. + */ + size_t get_send_frame_size() const + { + return _send_frame_size; + } + + /*! + * Get the physical adapter ID used for this link + */ + inline adapter_id_t get_send_adapter_id() const + { + return _adapter_id; + } + + /*! + * Get the number of frame buffers that can be queued by this link. + */ + size_t get_num_recv_frames() const + { + return _num_recv_frames; + } + + /*! + * Get the maximum capacity of a frame buffer. + */ + size_t get_recv_frame_size() const + { + return _recv_frame_size; + } + + /*! + * Get the physical adapter ID used for this link + */ + inline adapter_id_t get_recv_adapter_id() const + { + return _adapter_id; + } + + /*! + * Enqueue a received mbuf, which can be pulled via get_recv_buff() + */ + void enqueue_recv_mbuf(struct rte_mbuf* mbuf); + + /*! + * Receive a packet and return a frame buffer containing the packet data. + * The timeout argument is ignored. + * + * Received buffers are pulled from the frame buffer list. No buffers can + * be retrieved unless the corresponding rte_mbufs were placed in the list + * via the enqueue_recv_mbuf() method. + * + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_recv_buff(int32_t /*timeout_ms*/); + + /*! + * Release a frame buffer, allowing the link driver to reuse it. + * + * \param buffer frame buffer to release for reuse by the link + */ + void release_recv_buff(frame_buff::uptr buff); + + /*! + * Get an empty frame buffer in which to write packet contents. + * + * \param timeout_ms a positive timeout value specifies the maximum number + of ms to wait, a negative value specifies to block + until successful, and a value of 0 specifies no wait. + * \return a frame buffer, or null uptr if timeout occurs + */ + frame_buff::uptr get_send_buff(int32_t /*timeout_ms*/); + + /*! + * Send a packet with the contents of the frame buffer and release the + * buffer, allowing the link driver to reuse it. If the size of the frame + * buffer is 0, the buffer is released with no packet being sent. + * + * Note that this function will only fill in the L2 header and send the + * mbuf. The L3 and L4 headers, in addition to the lengths in the rte_mbuf + * fields, must be set in the I/O service. + * + * \param buffer frame buffer containing packet data + * + * Throws an exception if an I/O error occurs while sending + */ + void release_send_buff(frame_buff::uptr buff); + +private: + //! A reference to the DPDK context + dpdk::dpdk_ctx::sptr _ctx; + //! The DPDK NIC port used by this link + dpdk::dpdk_port* _port; + //! Local UDP port, in network order + uint16_t _local_port; + //! Remote UDP port, in network order + uint16_t _remote_port; + //! Remote IPv4 address, in network order + uint32_t _remote_ipv4; + //! Remote host's MAC address + struct ether_addr _remote_mac; + //! Number of recv frames is not validated + size_t _num_recv_frames; + //! Maximum bytes of UDP payload data in recv frame + size_t _recv_frame_size; + //! Number of send frames is not validated + size_t _num_send_frames; + //! Maximum bytes of UDP payload data in send frame + size_t _send_frame_size; + //! Registered adapter ID for this link's DPDK NIC port + adapter_id_t _adapter_id; + //! The RX frame buff list head + dpdk::dpdk_frame_buff* _recv_buff_head = nullptr; + // TODO: Implement ability to use multiple queues + dpdk::queue_id_t _queue = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 646b2837e..d88631ae3 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -143,5 +143,14 @@ endif(ENABLE_LIBERIO) if(ENABLE_DPDK) INCLUDE_SUBDIRECTORY(uhd-dpdk) + + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp + ) + set_source_files_properties( + ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp + PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" + ) endif(ENABLE_DPDK) diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp index 001775934..50855f36a 100644 --- a/host/lib/transport/dpdk_simple.cpp +++ b/host/lib/transport/dpdk_simple.cpp @@ -1,179 +1,203 @@ // -// Copyright 2019 Ettus Research, a National Instruments Company +// Copyright 2019 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // +#include <uhd/transport/frame_buff.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> #include <uhdlib/transport/dpdk_simple.hpp> -#include <uhdlib/transport/uhd-dpdk.h> +#include <uhdlib/transport/links.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> #include <arpa/inet.h> + namespace uhd { namespace transport { namespace { - constexpr uint64_t USEC = 1000000; - // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC - constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4; +constexpr double SEND_TIMEOUT_MS = 500; // seconds } class dpdk_simple_impl : public dpdk_simple { public: - dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr, - const std::string &port, bool filter_bcast) + dpdk_simple_impl(const std::string& addr, const std::string& port) { - UHD_ASSERT_THROW(ctx.is_init_done()); - - // Get NIC that can route to addr - int port_id = ctx.get_route(addr); - UHD_ASSERT_THROW(port_id >= 0); - - _port_id = port_id; - uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); - uint16_t dst_port = htons(std::stoi(port, NULL, 0)); - - struct uhd_dpdk_sockarg_udp sockarg = { - .is_tx = false, - .filter_bcast = filter_bcast, - .local_port = 0, - .remote_port = dst_port, - .dst_addr = dst_ipv4, - .num_bufs = 1 + link_params_t link_params = _get_default_link_params(); + _link = + uhd::transport::udp_dpdk_link::make(addr, port, link_params); + UHD_LOG_TRACE("DPDK::SIMPLE", + "Creating simple UDP object for " << addr << ":" << port + << ", DPDK port index " + << _link->get_port()->get_port_id()); + // The context must be initialized, or we'd never get here + auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); + UHD_ASSERT_THROW(ctx->is_init_done()); + + // Init I/O service + _port_id = _link->get_port()->get_port_id(); + _io_service = ctx->get_io_service(_port_id); + // This is normally done by the I/O service manager, but with DPDK, this + // is all it does so we skip that step + UHD_LOG_TRACE("DPDK::SIMPLE", "Attaching link to I/O service..."); + _io_service->attach_recv_link(_link); + _io_service->attach_send_link(_link); + + auto recv_cb = [this](buff_t::uptr& buff, recv_link_if*, send_link_if*) { + return this->_recv_callback(buff); + }; + + auto fc_cb = [this](buff_t::uptr buff, recv_link_if*, send_link_if*) { + this->_recv_fc_callback(std::move(buff)); }; - _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); - UHD_ASSERT_THROW(_rx_sock != nullptr); - - // Backfill the local port, in case it was auto-assigned - uhd_dpdk_udp_get_info(_rx_sock, &sockarg); - sockarg.is_tx = true; - sockarg.remote_port = dst_port; - sockarg.dst_addr = dst_ipv4; - sockarg.num_bufs = 1; - _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); - UHD_ASSERT_THROW(_tx_sock != nullptr); - UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":" - << ntohs(dst_port) << " and NIC(" << _port_id - << "):" << ntohs(sockarg.local_port)); + + _recv_io = _io_service->make_recv_client(_link, + link_params.num_recv_frames, + recv_cb, + nullptr, // No send/fc link + 0, // No send frames + fc_cb); + + auto send_cb = [this](buff_t::uptr buff, transport::send_link_if*) { + this->_send_callback(std::move(buff)); + }; + _send_io = _io_service->make_send_client(_link, + link_params.num_send_frames, + send_cb, + nullptr, // no FC link + 0, + nullptr, // No receive callback necessary + [](const size_t) { return true; } // We can always send + ); + UHD_LOG_TRACE("DPDK::SIMPLE", "Constructor complete"); } - ~dpdk_simple_impl(void) {} + ~dpdk_simple_impl(void) + { + UHD_LOG_TRACE("DPDK::SIMPLE", + "~dpdk_simple_impl(), DPDK port index " << _link->get_port()->get_port_id()); + // Disconnect the clients from the I/O service + _send_io.reset(); + _recv_io.reset(); + // Disconnect the link from the I/O service + _io_service->detach_recv_link(_link); + _io_service->detach_send_link(_link); + } - /*! - * Send and release outstanding buffer + /*! Send and release outstanding buffer * * \param length bytes of data to send * \return number of bytes sent (releases buffer if sent) */ - size_t send(const boost::asio::const_buffer& buff) + size_t send(const boost::asio::const_buffer& user_buff) { - struct rte_mbuf* tx_mbuf; - size_t frame_size = _get_tx_buf(&tx_mbuf); - UHD_ASSERT_THROW(tx_mbuf) - size_t nbytes = boost::asio::buffer_size(buff); - UHD_ASSERT_THROW(nbytes <= frame_size) - const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(buff); - - uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_tx_sock, tx_mbuf); - std::memcpy(pkt_data, user_data, nbytes); - tx_mbuf->pkt_len = nbytes; - tx_mbuf->data_len = nbytes; - - int num_tx = uhd_dpdk_send(_tx_sock, &tx_mbuf, 1); - if (num_tx == 0) - return 0; + // Extract buff and sanity check + const size_t nbytes = boost::asio::buffer_size(user_buff); + UHD_ASSERT_THROW(nbytes <= _link->get_send_frame_size()) + const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(user_buff); + + // Get send buff + auto buff = _send_io->get_send_buff(SEND_TIMEOUT_MS); + UHD_ASSERT_THROW(buff); + buff->set_packet_size(nbytes); + std::memcpy(buff->data(), user_data, nbytes); + + // Release send buff (send the packet) + _send_io->release_send_buff(std::move(buff)); return nbytes; } - /*! - * Receive a single packet. + /*! Receive a single packet. + * * Buffer provided by transport (must be freed before next operation). * * \param buf a pointer to place to write buffer location * \param timeout the timeout in seconds * \return the number of bytes received or zero on timeout */ - size_t recv(const boost::asio::mutable_buffer& buff, double timeout) + size_t recv(const boost::asio::mutable_buffer& user_buff, double timeout) { - struct rte_mbuf *rx_mbuf; - size_t buff_size = boost::asio::buffer_size(buff); - uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(buff); + size_t user_buff_size = boost::asio::buffer_size(user_buff); + uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(user_buff); - int bufs = uhd_dpdk_recv(_rx_sock, &rx_mbuf, 1, (int) (timeout*USEC)); - if (bufs != 1 || rx_mbuf == nullptr) { - return 0; - } - if ((rx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { - uhd_dpdk_free_buf(rx_mbuf); + auto buff = _recv_io->get_recv_buff(static_cast<int32_t>(timeout * 1e3)); + if (!buff) { return 0; } - uhd_dpdk_get_src_ipv4(_rx_sock, rx_mbuf, &_last_recv_addr); - const size_t nbytes = uhd_dpdk_get_len(_rx_sock, rx_mbuf); - UHD_ASSERT_THROW(nbytes <= buff_size); + // Extract the sender's address. This is only possible because we know + // the memory layout of the buff + struct udp_hdr* udp_hdr_end = (struct udp_hdr*)buff->data(); + struct ipv4_hdr* ip_hdr_end = (struct ipv4_hdr*)(&udp_hdr_end[-1]); + struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)(&ip_hdr_end[-1]); + _last_recv_addr = ip_hdr->src_addr; + + // Extract the buffer data + const size_t copy_len = std::min(user_buff_size, buff->packet_size()); + if (copy_len < buff->packet_size()) { + UHD_LOG_WARNING("DPDK", "Truncating recv packet"); + } + std::memcpy(user_data, buff->data(), copy_len); - uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_rx_sock, rx_mbuf); - std::memcpy(user_data, pkt_data, nbytes); - _put_rx_buf(rx_mbuf); - return nbytes; + // Housekeeping + _recv_io->release_recv_buff(std::move(buff)); + return copy_len; } - /*! - * Get the last IP address as seen by recv(). - * Only use this with the broadcast socket. - */ std::string get_recv_addr(void) { - char addr_str[INET_ADDRSTRLEN]; - struct in_addr ipv4_addr; - ipv4_addr.s_addr = _last_recv_addr; - inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); - return std::string(addr_str); + return dpdk::ipv4_num_to_str(_last_recv_addr); } - /*! - * Get the IP address for the destination - */ std::string get_send_addr(void) { - struct in_addr ipv4_addr; - int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr); - UHD_ASSERT_THROW(status); - char addr_str[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); - return std::string(addr_str); + return dpdk::ipv4_num_to_str(_link->get_remote_ipv4()); } + private: - /*! - * Request a single send buffer of specified size. - * - * \param buf a pointer to place to write buffer location - * \return the maximum length of the buffer - */ - size_t _get_tx_buf(struct rte_mbuf** buf) + using buff_t = frame_buff; + + link_params_t _get_default_link_params() { - int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, buf, 1, 0); - if (bufs != 1) { - *buf = nullptr; - return 0; - } - return _mtu - DPDK_SIMPLE_NONDATA_SIZE; + link_params_t link_params; + link_params.recv_frame_size = 8000; + link_params.send_frame_size = 8000; + link_params.num_recv_frames = 1; + link_params.num_send_frames = 1; + link_params.recv_buff_size = 8000; + link_params.send_buff_size = 8000; + return link_params; } - /*! - * Return/free receive buffer - * Can also use to free un-sent TX bufs - */ - void _put_rx_buf(struct rte_mbuf *rx_mbuf) + void _send_callback(buff_t::uptr buff) + { + _link->release_send_buff(std::move(buff)); + } + + bool _recv_callback(buff_t::uptr&) { - UHD_ASSERT_THROW(rx_mbuf) - uhd_dpdk_free_buf(rx_mbuf); + // Queue it up + return true; } + void _recv_fc_callback(buff_t::uptr buff) + { + _link->release_recv_buff(std::move(buff)); + } + + /*** Attributes **********************************************************/ unsigned int _port_id; - size_t _mtu; - struct uhd_dpdk_socket *_tx_sock; - struct uhd_dpdk_socket *_rx_sock; uint32_t _last_recv_addr; + + udp_dpdk_link::sptr _link; + + dpdk_io_service::sptr _io_service; + + send_io_if::sptr _send_io; + + recv_io_if::sptr _recv_io; }; dpdk_simple::~dpdk_simple(void) {} @@ -182,16 +206,16 @@ dpdk_simple::~dpdk_simple(void) {} * DPDK simple transport public make functions **********************************************************************/ udp_simple::sptr dpdk_simple::make_connected( - struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port -){ - return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, true)); + const std::string& addr, const std::string& port) +{ + return udp_simple::sptr(new dpdk_simple_impl(addr, port)); } +// For DPDK, this is not special and the same as make_connected udp_simple::sptr dpdk_simple::make_broadcast( - struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port -){ - return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, false)); + const std::string& addr, const std::string& port) +{ + return udp_simple::sptr(new dpdk_simple_impl(addr, port)); } }} // namespace uhd::transport - diff --git a/host/lib/transport/udp_dpdk_link.cpp b/host/lib/transport/udp_dpdk_link.cpp new file mode 100644 index 000000000..dc56de43c --- /dev/null +++ b/host/lib/transport/udp_dpdk_link.cpp @@ -0,0 +1,198 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/config.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/static.hpp> +#include <uhdlib/transport/adapter.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> +#include <arpa/inet.h> +#include <memory> + +using namespace uhd::transport; +using namespace uhd::transport::dpdk; + +udp_dpdk_link::udp_dpdk_link(dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params) + : _num_recv_frames(params.num_recv_frames) + , _recv_frame_size(params.recv_frame_size) + , _num_send_frames(params.num_send_frames) + , _send_frame_size(params.send_frame_size) +{ + // Get a reference to the context, since this class manages DPDK memory + _ctx = dpdk_ctx::get(); + UHD_ASSERT_THROW(_ctx); + + // Fill in remote IPv4 address and UDP port + // NOTE: Remote MAC address is filled in later by I/O service + int status = inet_pton(AF_INET, remote_addr.c_str(), &_remote_ipv4); + if (status != 1) { + UHD_LOG_ERROR("DPDK", std::string("Invalid destination address ") + remote_addr); + throw uhd::runtime_error( + std::string("DPDK: Invalid destination address ") + remote_addr); + } + _remote_port = rte_cpu_to_be_16(std::stoul(remote_port)); + + // Grab the port with a route to the remote host + _port = _ctx->get_port(port_id); + + uint16_t local_port_num = rte_cpu_to_be_16(std::stoul(local_port)); + // Get an unused UDP port for listening + _local_port = _port->alloc_udp_port(local_port_num); + + // Validate params + const size_t max_frame_size = _port->get_mtu() - dpdk::HDR_SIZE_UDP_IPV4; + UHD_ASSERT_THROW(params.send_frame_size <= max_frame_size); + UHD_ASSERT_THROW(params.recv_frame_size <= max_frame_size); + + // Register the adapter + auto info = _port->get_adapter_info(); + auto& adap_ctx = adapter_ctx::get(); + _adapter_id = adap_ctx.register_adapter(info); + UHD_LOGGER_TRACE("DPDK") << boost::format("Created udp_dpdk_link to (%s:%s)") + % remote_addr % remote_port; + UHD_LOGGER_TRACE("DPDK") + << boost::format("num_recv_frames=%d, recv_frame_size=%d, num_send_frames=%d, " + "send_frame_size=%d") + % params.num_recv_frames % params.recv_frame_size % params.num_send_frames + % params.send_frame_size; +} + +udp_dpdk_link::~udp_dpdk_link() {} + +udp_dpdk_link::sptr udp_dpdk_link::make(const std::string& remote_addr, + const std::string& remote_port, + const link_params_t& params) +{ + auto ctx = dpdk::dpdk_ctx::get(); + auto port = ctx->get_route(remote_addr); + if (!port) { + UHD_LOG_ERROR("DPDK", + std::string("Could not find route to destination address ") + remote_addr); + throw uhd::runtime_error( + std::string("DPDK: Could not find route to destination address ") + + remote_addr); + } + return make(port->get_port_id(), remote_addr, remote_port, "0", params); +} + +udp_dpdk_link::sptr udp_dpdk_link::make(const dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params) +{ + UHD_ASSERT_THROW(params.recv_frame_size > 0); + UHD_ASSERT_THROW(params.send_frame_size > 0); + UHD_ASSERT_THROW(params.num_send_frames > 0); + UHD_ASSERT_THROW(params.num_recv_frames > 0); + + return std::make_shared<udp_dpdk_link>( + port_id, remote_addr, remote_port, local_port, params); +} + +void udp_dpdk_link::enqueue_recv_mbuf(struct rte_mbuf* mbuf) +{ + // Get packet size + struct udp_hdr* hdr = rte_pktmbuf_mtod_offset( + mbuf, struct udp_hdr*, sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr)); + size_t packet_size = rte_be_to_cpu_16(hdr->dgram_len) - sizeof(struct udp_hdr); + // Prepare the dpdk_frame_buff + auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf); + buff->header_jump( + sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + buff->set_packet_size(packet_size); + // Add the dpdk_frame_buff to the list + if (_recv_buff_head) { + buff->prev = _recv_buff_head->prev; + buff->next = _recv_buff_head; + _recv_buff_head->prev->next = buff; + _recv_buff_head->prev = buff; + } else { + _recv_buff_head = buff; + buff->next = buff; + buff->prev = buff; + } +} + +frame_buff::uptr udp_dpdk_link::get_recv_buff(int32_t /*timeout_ms*/) +{ + auto buff = _recv_buff_head; + if (buff) { + if (_recv_buff_head->next == buff) { + /* Only had the one buff, so the list is empty */ + _recv_buff_head = nullptr; + } else { + /* Make the next buff the new list head */ + _recv_buff_head->next->prev = _recv_buff_head->prev; + _recv_buff_head->prev->next = _recv_buff_head->next; + _recv_buff_head = _recv_buff_head->next; + } + buff->next = nullptr; + buff->prev = nullptr; + return frame_buff::uptr(buff); + } + return frame_buff::uptr(); +} + +void udp_dpdk_link::release_recv_buff(frame_buff::uptr buff) +{ + dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release(); + assert(buff_ptr); + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +} + +frame_buff::uptr udp_dpdk_link::get_send_buff(int32_t /*timeout_ms*/) +{ + auto mbuf = rte_pktmbuf_alloc(_port->get_tx_pktbuf_pool()); + if (mbuf) { + auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf); + buff->header_jump( + sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + return frame_buff::uptr(buff); + } + return frame_buff::uptr(); +} + +void udp_dpdk_link::release_send_buff(frame_buff::uptr buff) +{ + dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release(); + assert(buff_ptr); + auto mbuf = buff_ptr->get_pktmbuf(); + if (buff_ptr->packet_size()) { + // Fill in L2 header + auto local_mac = _port->get_mac_addr(); + struct ether_hdr* l2_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + ether_addr_copy(&_remote_mac, &l2_hdr->d_addr); + ether_addr_copy(&local_mac, &l2_hdr->s_addr); + l2_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + // Fill in L3 and L4 headers + dpdk::fill_udp_hdr(mbuf, + _port, + _remote_ipv4, + _local_port, + _remote_port, + buff_ptr->packet_size()); + // Prepare the packet buffer and send it out + int status = rte_eth_tx_prepare(_port->get_port_id(), _queue, &mbuf, 1); + if (status != 1) { + throw uhd::runtime_error("DPDK: Failed to prepare TX buffer for send"); + } + status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1); + while (status != 1) { + status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1); + // FIXME: Should we make available retrying? + // throw uhd::runtime_error("DPDK: Failed to send TX buffer"); + } + } else { + // Release the buffer if there is nothing in it + rte_pktmbuf_free(mbuf); + } +} diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index ea78aa1e8..a13886653 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -19,9 +19,11 @@ if(ENABLE_DPDK) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp ) set_source_files_properties( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" ) include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp index 43a1507bb..46818e973 100644 --- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -3,8 +3,13 @@ // // SPDX-License-Identifier: GPL-3.0-or-later // + +#include <uhd/utils/algorithm.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> #include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> #include <uhdlib/utils/prefs.hpp> #include <arpa/inet.h> #include <rte_arp.h> @@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; inline char* eal_add_opt( std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg) { + UHD_LOG_TRACE("DPDK", opt << " " << arg); char* ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); @@ -34,15 +40,6 @@ inline char* eal_add_opt( return ptr; } -inline std::string eth_addr_to_string(const struct ether_addr mac_addr) -{ - auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); - mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] - % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] - % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; - return mac_stream.str(); -} - inline void separate_ipv4_addr( const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) { @@ -59,28 +56,29 @@ inline void separate_ipv4_addr( dpdk_port::uptr dpdk_port::make(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) { return std::make_unique<dpdk_port>( - port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); + port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); } dpdk_port::dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) : _port(port) , _mtu(mtu) + , _num_queues(num_queues) , _rx_pktbuf_pool(rx_pktbuf_pool) , _tx_pktbuf_pool(tx_pktbuf_pool) { - /* 1. Set MTU and IPv4 address */ + /* Set MTU and IPv4 address */ int retval; retval = rte_eth_dev_set_mtu(_port, _mtu); @@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port, separate_ipv4_addr(ipv4_address, _ipv4, _netmask); - /* 2. Set hardware offloads */ + /* Set hardware offloads */ struct rte_eth_dev_info dev_info; rte_eth_dev_info_get(_port, &dev_info); uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; @@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the device"); } - /* 3. Set descriptor ring sizes */ - uint16_t rx_desc = num_mbufs; + /* Set descriptor ring sizes */ + uint16_t rx_desc = num_desc; if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.rx_desc_lim.nb_min + % _port % num_desc % dev_info.rx_desc_lim.nb_min % dev_info.rx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num RX descriptors must also be aligned to 0x%x") @@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); } - uint16_t tx_desc = num_mbufs; + uint16_t tx_desc = num_desc; if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.tx_desc_lim.nb_min + % _port % num_desc % dev_info.tx_desc_lim.nb_min % dev_info.tx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num TX descriptors must also be aligned to 0x%x") @@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); } - /* 4. Set up the RX and TX DMA queues (May not be generally supported after + /* Set up the RX and TX DMA queues (May not be generally supported after * eth_dev_start) */ unsigned int cpu_socket = rte_eth_dev_socket_id(_port); for (uint16_t i = 0; i < _num_queues; i++) { @@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port, } } - /* 5. Set up initial flow table */ + /* TODO: Enable multiple queues (only support 1 right now) */ - /* Append all free queues except 0, which is reserved for ARP */ - _free_queues.reserve(_num_queues - 1); - for (unsigned int i = 1; i < _num_queues; i++) { - _free_queues.push_back(i); - } - - // struct rte_flow_attr flow_attr; - // flow_attr.group = 0; - // flow_attr.priority = 1; - // flow_attr.ingress = 1; - // flow_attr.egress = 0; - // flow_attr.transfer = 0; - // flow_attr.reserved = 0; - - // struct rte_flow_item[] flow_pattern = { - //}; - // int rte_flow_validate(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action actions[], - // struct rte_flow_error *error); - // struct rte_flow * rte_flow_create(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action *actions[], - // struct rte_flow_error *error); - - /* 6. Start the Ethernet device */ + /* Start the Ethernet device */ retval = rte_eth_dev_start(_port); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") @@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port, << " MAC: " << eth_addr_to_string(_mac_addr); } -/* TODO: Do flow directions */ -queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +dpdk_port::~dpdk_port() { - std::lock_guard<std::mutex> lock(_mutex); - UHD_ASSERT_THROW(_free_queues.size() != 0); - auto queue = _free_queues.back(); - _free_queues.pop_back(); - return queue; + rte_eth_dev_stop(_port); + rte_spinlock_lock(&_spinlock); + for (auto kv : _arp_table) { + for (auto req : kv.second->reqs) { + req->cond.notify_one(); + } + rte_free(kv.second); + } + _arp_table.clear(); + rte_spinlock_unlock(&_spinlock); } -void dpdk_port::free_queue(queue_id_t queue) +uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port) { + uint16_t port_selected; std::lock_guard<std::mutex> lock(_mutex); - auto flow = _flow_rules.at(queue); - int status = rte_flow_destroy(_port, flow, NULL); - if (status) { - UHD_LOGGER_ERROR("DPDK") - << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port - % queue; - throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); + if (udp_port) { + if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) { + return 0; + } + port_selected = rte_be_to_cpu_16(udp_port); } else { - _flow_rules.erase(queue); + if (_udp_ports.size() >= 65535) { + UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain"); + return 0; + } + port_selected = _next_udp_port; + while (true) { + if (port_selected == 0) { + continue; + } + if (_udp_ports.count(port_selected) == 0) { + _next_udp_port = port_selected - 1; + break; + } + if (port_selected - 1 == _next_udp_port) { + return 0; + } + port_selected--; + } } - _free_queues.push_back(queue); + _udp_ports.insert(port_selected); + return rte_cpu_to_be_16(port_selected); } -int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req) { struct rte_mbuf* mbuf; struct ether_hdr* hdr; struct arp_hdr* arp_frame; - mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); + mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool); if (unlikely(mbuf == NULL)) { UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); return -ENOMEM; @@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar mbuf->pkt_len = 42; mbuf->data_len = 42; - // ARP replies always on queue 0 - if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { + if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) { UHD_LOGGER_WARNING("DPDK") << boost::format("%s: TX descriptor ring is full") % __func__; rte_pktmbuf_free(mbuf); @@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar return 0; } -// TODO: ARP processing for queue 0 -// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr -// *arp_frame) -//{ -// std::lock_guard<std::mutex> lock(_mutex); -// uint32_t dest_ip = arp_frame->arp_data.arp_sip; -// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; -// -// /* Add entry to ARP table */ -// struct uhd_dpdk_arp_entry *entry = NULL; -// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); -// if (!entry) { -// entry = rte_zmalloc(NULL, sizeof(*entry), 0); -// if (!entry) { -// return -ENOMEM; -// } -// LIST_INIT(&entry->pending_list); -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { -// rte_free(entry); -// return -ENOSPC; -// } -// } else { -// struct uhd_dpdk_config_req *req = NULL; -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// /* Now wake any config reqs waiting for the ARP */ -// LIST_FOREACH(req, &entry->pending_list, entry) { -// _uhd_dpdk_config_req_compl(req, 0); -// } -// while (entry->pending_list.lh_first != NULL) { -// LIST_REMOVE(entry->pending_list.lh_first, entry); -// } -// } -// /* Respond if this was an ARP request */ -// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && -// arp_frame->arp_data.arp_tip == port->ipv4_addr) { -// _arp_reply(tx_pktbuf_pool, arp_frame); -// } -// -// return 0; -// -//} - -static dpdk_ctx* global_ctx = nullptr; +static dpdk_ctx::sptr global_ctx = nullptr; static std::mutex global_ctx_mutex; dpdk_ctx::sptr dpdk_ctx::get() { std::lock_guard<std::mutex> lock(global_ctx_mutex); if (!global_ctx) { - auto new_ctx = std::make_shared<dpdk_ctx>(); - global_ctx = new_ctx.get(); - return new_ctx; + global_ctx = std::make_shared<dpdk_ctx>(); } - return global_ctx->shared_from_this(); + return global_ctx; } dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} @@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args) auto args = new std::array<char, 4096>(); char* opt = args->data(); char* end = args->data() + args->size(); + UHD_LOG_TRACE("DPDK", "EAL init options: "); for (std::string& key : eal_args.keys()) { std::string val = eal_args[key]; if (key == "dpdk_coremask") { @@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args) _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); _mbuf_cache_size = dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); + UHD_LOG_TRACE("DPDK", + "mtu: " << _mtu << " num_mbufs: " << _num_mbufs + << " mbuf_cache_size: " << _mbuf_cache_size); /* Get device info for all the NIC ports */ int num_dpdk_ports = rte_eth_dev_count_avail(); - UHD_ASSERT_THROW(num_dpdk_ports > 0); + if (num_dpdk_ports == 0) { + UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!"); + throw uhd::runtime_error("No available DPDK devices (ports) found!"); + } device_addrs_t nics(num_dpdk_ports); RTE_ETH_FOREACH_DEV(i) { @@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args) } /* Now combine user args with conf file */ auto conf = uhd::prefs::get_dpdk_nic_args(nic); + // TODO: Enable the use of multiple DMA queues + conf["dpdk_num_queues"] = "1"; /* Update config, and remove ports that aren't fully configured */ if (conf.has_key("dpdk_ipv4")) { @@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args) } } + std::map<size_t, std::vector<size_t>> lcore_to_port_id_map; RTE_ETH_FOREACH_DEV(i) { auto& conf = nics.at(i); if (conf.has_key("dpdk_ipv4")) { + UHD_ASSERT_THROW(conf.has_key("dpdk_lcore")); + const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0); + if (!lcore_to_port_id_map.count(lcore_id)) { + lcore_to_port_id_map.insert({lcore_id, {}}); + } + // Allocating enough buffers for all DMA queues for each CPU socket // - This is a bit inefficient for larger systems, since NICs may not // all be on one socket @@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args) _ports[i] = dpdk_port::make(i, _mtu, conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), - _num_mbufs, + conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE), rx_pool, tx_pool, conf["dpdk_ipv4"]); + + // Remember all port IDs that map to an lcore + lcore_to_port_id_map.at(lcore_id).push_back(i); } } @@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args) rte_eth_link_get(portid, &link); unsigned int link_status = link.link_status; unsigned int link_speed = link.link_speed; - UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") - % portid % link_status % link_speed; + UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid + % link_status % link_speed; } - UHD_LOG_TRACE("DPDK", "Init DONE!"); - + UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services"); _init_done = true; + + // Links are up, now create one IO service per lcore + for (auto& lcore_portids_pair : lcore_to_port_id_map) { + const size_t lcore_id = lcore_portids_pair.first; + std::vector<dpdk_port*> dpdk_ports; + dpdk_ports.reserve(lcore_portids_pair.second.size()); + for (const size_t port_id : lcore_portids_pair.second) { + dpdk_ports.push_back(get_port(port_id)); + } + const size_t servq_depth = 32; // FIXME + UHD_LOG_TRACE("DPDK", + "Creating I/O service for lcore " + << lcore_id << ", servicing " << dpdk_ports.size() + << " ports, service queue depth " << servq_depth); + _io_srv_portid_map.insert( + {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth), + lcore_portids_pair.second}); + } } } @@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const return link.link_status; } -int dpdk_ctx::get_route(const std::string& addr) const +dpdk_port* dpdk_ctx::get_route(const std::string& addr) const { const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); for (const auto& port : _ports) { @@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const uint32_t src_ipv4 = port.second->get_ipv4(); uint32_t netmask = port.second->get_netmask(); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { - return (int)port.first; + return port.second.get(); } } - return -ENODEV; + return NULL; } @@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const return _init_done.load(); } +uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id) +{ + for (auto& io_srv_portid_pair : _io_srv_portid_map) { + if (uhd::has(io_srv_portid_pair.second, port_id)) { + return io_srv_portid_pair.first; + } + } + + std::string err_msg = std::string("Cannot look up I/O service for port ID: ") + + std::to_string(port_id) + ". No such port ID!"; + UHD_LOG_ERROR("DPDK", err_msg); + throw uhd::lookup_error(err_msg); +} + struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( unsigned int cpu_socket, size_t num_bufs) { @@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; char name[32]; snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); - _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( - name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name, + num_bufs, + _mbuf_cache_size, + DPDK_MBUF_PRIV_SIZE, + mbuf_size, + SOCKET_ID_ANY); if (!_rx_pktbuf_pools.at(cpu_socket)) { UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp new file mode 100644 index 000000000..1fcedca51 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp @@ -0,0 +1,950 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> +#include <uhdlib/utils/narrow.hpp> +#include <cmath> + +/* + * Memory management + * + * Every object that allocates and frees DPDK memory has a reference to the + * dpdk_ctx. + * + * Ownership hierarchy: + * + * dpdk_io_service_mgr (1) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * xport (1) => + * dpdk_send_io::sptr + * dpdk_recv_io::sptr + * + * usrp link_mgr (1) => + * udp_dpdk_link::sptr + * + * dpdk_send_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_recv_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_io_service (3) => + * dpdk_ctx::wptr (weak_ptr) + * udp_dpdk_link::sptr + * + * udp_dpdk_link (4) => + * dpdk_ctx::sptr + */ + +using namespace uhd::transport; + +dpdk_io_service::dpdk_io_service( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) + : _ctx(dpdk::dpdk_ctx::get()) + , _lcore_id(lcore_id) + , _ports(ports) + , _servq(servq_depth, lcore_id) +{ + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id); + for (auto port : _ports) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id()); + _tx_queues[port->get_port_id()] = std::list<dpdk_send_io*>(); + _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>(); + } + int status = rte_eal_remote_launch(_io_worker, this, lcore_id); + if (status) { + throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore"); + } +} + +dpdk_io_service::sptr dpdk_io_service::make( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +{ + return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth)); +} + +dpdk_io_service::~dpdk_io_service() +{ + UHD_LOG_TRACE( + "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id); + dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL); + if (!req) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "Could not allocate request for lcore termination for lcore " << _lcore_id); + return; + } + dpdk::wait_req_get(req); + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); +} + +void dpdk_io_service::attach_recv_link(recv_link_if::sptr link) +{ + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link.get()); + data.is_recv = true; + assert(data.link); + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.push_back(link); + } +} + +void dpdk_io_service::attach_send_link(send_link_if::sptr link) +{ + udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get()); + assert(dpdk_link); + + // First, fill in destination MAC address + struct dpdk::arp_request arp_data; + arp_data.tpa = dpdk_link->get_remote_ipv4(); + arp_data.port = dpdk_link->get_port()->get_port_id(); + if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) { + // If a broadcast IP, skip the ARP and fill with broadcast MAC addr + memset(arp_data.tha.addr_bytes, 0xFF, 6); + } else { + auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (!arp_req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request"); + } + if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) { + // Try one more time... + auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) { + wait_req_put(arp_req); + wait_req_put(arp_req2); + throw uhd::io_error("DPDK: Could not reach host"); + } + wait_req_put(arp_req2); + } + wait_req_put(arp_req); + } + dpdk_link->set_remote_mac(arp_data.tha); + + // Then, submit the link to the I/O service thread + struct dpdk_flow_data data; + data.link = dpdk_link; + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.push_back(link); + } +} + +void dpdk_io_service::detach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = true; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.remove_if( + [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +void dpdk_io_service::detach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.remove_if( + [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr /*fc_link*/, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(data_link.get()); + auto recv_io = std::make_shared<dpdk_recv_io>( + shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb); + + // Register with I/O service + recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return recv_io; +} + +send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr /*recv_link*/, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(send_link.get()); + auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(), + link, + num_send_frames, + send_cb, + num_recv_frames, + recv_cb, + fc_cb); + + // Register with I/O service + send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return send_io; +} + + +int dpdk_io_service::_io_worker(void* arg) +{ + if (!arg) + return -EINVAL; + dpdk_io_service* srv = (dpdk_io_service*)arg; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + if (srv->_ports.size() == 0) + return -ENODEV; + + char name[16]; + snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id); + rte_thread_setname(pthread_self(), name); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "I/O service thread '" << name << "' started on lcore " << lcore_id); + + uhd::set_thread_priority_safe(); + + snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id); + struct rte_hash_parameters hash_params = {.name = name, + .entries = MAX_FLOWS, + .reserved = 0, + .key_len = sizeof(struct dpdk::ipv4_5tuple), + .hash_func = NULL, + .hash_func_init_val = 0, + .socket_id = uhd::narrow_cast<int>(rte_socket_id()), + .extra_flag = 0}; + srv->_rx_table = rte_hash_create(&hash_params); + if (srv->_rx_table == NULL) { + return rte_errno; + } + + int status = 0; + while (!status) { + /* For each port, attempt to receive packets and process */ + for (auto port : srv->_ports) { + srv->_rx_burst(port, 0); + } + /* For each port's TX queues, do TX */ + for (auto port : srv->_ports) { + srv->_tx_burst(port); + } + /* For each port's RX release queues, release buffers */ + for (auto port : srv->_ports) { + srv->_rx_release(port); + } + /* Retry waking clients */ + if (srv->_retry_head) { + dpdk_io_if* node = srv->_retry_head; + dpdk_io_if* end = srv->_retry_head->prev; + while (true) { + dpdk_io_if* next = node->next; + srv->_wake_client(node); + if (node == end) { + break; + } else { + node = next; + next = node->next; + } + } + } + /* Check for open()/close()/term() requests and service 1 at a time + * Leave this last so we immediately terminate if requested + */ + status = srv->_service_requests(); + } + + return status; +} + +int dpdk_io_service::_service_requests() +{ + for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) { + /* Dequeue */ + dpdk::wait_req* req = _servq.pop(); + if (!req) { + break; + } + switch (req->reason) { + case dpdk::wait_type::WAIT_SIMPLE: + while (_servq.complete(req) == -ENOBUFS) + ; + break; + case dpdk::wait_type::WAIT_RX: + case dpdk::wait_type::WAIT_TX_BUF: + throw uhd::not_implemented_error( + "DPDK: _service_requests(): DPDK is still a WIP"); + case dpdk::wait_type::WAIT_FLOW_OPEN: + _service_flow_open(req); + break; + case dpdk::wait_type::WAIT_FLOW_CLOSE: + _service_flow_close(req); + break; + case dpdk::wait_type::WAIT_XPORT_CONNECT: + _service_xport_connect(req); + break; + case dpdk::wait_type::WAIT_XPORT_DISCONNECT: + _service_xport_disconnect(req); + break; + case dpdk::wait_type::WAIT_ARP: { + assert(req->data != NULL); + int arp_status = _service_arp_request(req); + assert(arp_status != -ENOMEM); + if (arp_status == 0) { + while (_servq.complete(req) == -ENOBUFS) + ; + } + break; + } + case dpdk::wait_type::WAIT_LCORE_TERM: + rte_free(_rx_table); + while (_servq.complete(req) == -ENOBUFS) + ; + // Return a positive value to indicate we should terminate + return 1; + default: + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Invalid reason associated with wait request"); + while (_servq.complete(req) == -ENOBUFS) + ; + break; + } + } + return 0; +} + +void dpdk_io_service::_service_flow_open(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, add to RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + // Check the UDP port isn't in use + if (rte_hash_lookup(_rx_table, &ht_key) > 0) { + req->retval = -EADDRINUSE; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add xport list for this UDP port + auto rx_entry = new std::list<dpdk_io_if*>(); + if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table"); + delete rx_entry; + req->retval = -ENOMEM; + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_flow_close(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, remove from RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + std::list<dpdk_io_if*>* xport_list; + + if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) { + UHD_ASSERT_THROW(xport_list->empty()); + delete xport_list; + rte_hash_del_key(_rx_table, &ht_key); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req) +{ + auto dpdk_io = static_cast<dpdk_io_if*>(req->data); + UHD_ASSERT_THROW(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Add to RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + req->retval = -ENOENT; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add to xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->push_back(dpdk_io); + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request..."); + // Add to xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request..."); + dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Add to xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.push_back(send_io); + for (size_t i = 0; i < send_io->_num_send_frames; i++) { + auto buff_ptr = + (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + break; + } + if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + break; + } + send_io->_num_frames_in_use++; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req) +{ + auto dpdk_io = (struct dpdk_io_if*)req->data; + assert(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Remove from RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) { + // Remove from xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->remove(dpdk_io); + } else { + req->retval = -EINVAL; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table"); + } + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request..."); + dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.remove(recv_client); + while (!rte_ring_empty(recv_client->_recv_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(recv_client->_release_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request..."); + dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.remove(send_client); + while (!rte_ring_empty(send_client->_send_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(send_client->_buffer_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + } + // Now remove the node if it's on the retry list + if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) { + _retry_head = NULL; + } else if (_retry_head) { + dpdk_io_if* node = _retry_head->next; + while (node != _retry_head) { + if (node == dpdk_io) { + dpdk_io->prev->next = dpdk_io->next; + dpdk_io->next->prev = dpdk_io->prev; + break; + } + node = node->next; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +int dpdk_io_service::_service_arp_request(dpdk::wait_req* req) +{ + int status = 0; + auto arp_req_data = (struct dpdk::arp_request*)req->data; + dpdk::ipv4_addr dst_addr = arp_req_data->tpa; + auto ctx_sptr = _ctx.lock(); + UHD_ASSERT_THROW(ctx_sptr); + dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr)); + + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dst_addr) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + status = -ENOMEM; + goto arp_end; + } + entry = new (entry) dpdk::arp_entry(); + entry->reqs.push_back(req); + port->_arp_table[dst_addr] = entry; + status = -EAGAIN; + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request."); + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + entry = port->_arp_table.at(dst_addr); + if (is_zero_ether_addr(&entry->mac_addr)) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Address in table, but not populated yet. Resending ARP request."); + port->_arp_table.at(dst_addr)->reqs.push_back(req); + status = -EAGAIN; + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table."); + ether_addr_copy(&entry->mac_addr, &arp_req_data->tha); + status = 0; + } + } +arp_end: + rte_spinlock_unlock(&port->_spinlock); + return status; +} + +int dpdk_io_service::_send_arp_request( + dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip) +{ + struct rte_mbuf* mbuf; + struct ether_hdr* hdr; + struct arp_hdr* arp_frame; + + mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool()); + if (unlikely(mbuf == NULL)) { + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + arp_frame = (struct arp_hdr*)&hdr[1]; + + memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + hdr->s_addr = port->get_mac_addr(); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST); + arp_frame->arp_data.arp_sha = port->get_mac_addr(); + arp_frame->arp_data.arp_sip = port->get_ipv4(); + memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); + arp_frame->arp_data.arp_tip = ip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + + if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full"); + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + +/* Do a burst of RX on port */ +int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue) +{ + struct ether_hdr* hdr; + char* l2_data; + struct rte_mbuf* bufs[RX_BURST_SIZE]; + const uint16_t num_rx = + rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + return 0; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*); + l2_data = (char*)&hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _process_arp(port, queue, (struct arp_hdr*)l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum"); + } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum"); + } else { + _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } + return num_rx; +} + +int dpdk_io_service::_process_arp( + dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame) +{ + uint32_t dest_ip = arp_frame->arp_data.arp_sip; + struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> " + << dpdk::eth_addr_to_string(dest_addr)); + /* Add entry to ARP table */ + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dest_ip) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + return -ENOMEM; + } + entry = new (entry) dpdk::arp_entry(); + ether_addr_copy(&dest_addr, &entry->mac_addr); + port->_arp_table[dest_ip] = entry; + } else { + entry = port->_arp_table.at(dest_ip); + ether_addr_copy(&dest_addr, &entry->mac_addr); + for (auto req : entry->reqs) { + auto arp_data = (struct dpdk::arp_request*)req->data; + ether_addr_copy(&dest_addr, &arp_data->tha); + while (_servq.complete(req) == -ENOBUFS) + ; + } + entry->reqs.clear(); + } + rte_spinlock_unlock(&port->_spinlock); + + /* Respond if this was an ARP request */ + if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) + && arp_frame->arp_data.arp_tip == port->get_ipv4()) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply."); + port->_arp_reply(queue_id, arp_frame); + } + + return 0; +} + +int dpdk_io_service::_process_ipv4( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt) +{ + bool bcast = port->dst_is_broadcast(pkt->dst_addr); + if (pkt->dst_addr != port->get_ipv4() && !bcast) { + rte_pktmbuf_free(mbuf); + return -ENODEV; + } + if (pkt->next_proto_id == IPPROTO_UDP) { + return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast); + } + rte_pktmbuf_free(mbuf); + return -EINVAL; +} + + +int dpdk_io_service::_process_udp( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/) +{ + // Get the link + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = pkt->dst_port}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Get xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + if (rx_entry->empty()) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Turn rte_mbuf -> dpdk_frame_buff + auto link = rx_entry->front()->link; + link->enqueue_recv_mbuf(mbuf); + auto buff = link->get_recv_buff(0); + bool rcvr_found = false; + for (auto client_if : *rx_entry) { + // Check all the muxed receivers... + if (client_if->recv_cb(buff, link, link)) { + rcvr_found = true; + if (buff) { + assert(client_if->is_recv); + auto recv_io = (dpdk_recv_io*)client_if->io_client; + auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); + if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue"); + } else { + recv_io->_num_frames_in_use++; + assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames); + _wake_client(client_if); + } + } + break; + } + } + if (!rcvr_found) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found"); + // Release the buffer if no receiver found + link->release_recv_buff(std::move(buff)); + return -ENOENT; + } + return 0; +} + +/* Do a burst of TX on port's tx queues */ +int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port) +{ + unsigned int total_tx = 0; + auto& queues = _tx_queues.at(port->get_port_id()); + + for (auto& send_io : queues) { + unsigned int num_tx = rte_ring_count(send_io->_send_queue); + num_tx = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE; + bool replaced_buffers = false; + for (unsigned int i = 0; i < num_tx; i++) { + size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size(); + if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) { + break; + } + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual"); + break; + } + send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link); + // Attempt to replace buffer + buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0) + .release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + send_io->_num_frames_in_use--; + } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + send_io->_num_frames_in_use--; + } else { + replaced_buffers = true; + } + } + if (replaced_buffers) { + _wake_client(&send_io->_dpdk_io_if); + } + total_tx += num_tx; + } + + return total_tx; +} + +int dpdk_io_service::_rx_release(dpdk::dpdk_port* port) +{ + unsigned int total_bufs = 0; + auto& queues = _recv_xport_map.at(port->get_port_id()); + + for (auto& recv_io : queues) { + unsigned int num_buf = rte_ring_count(recv_io->_release_queue); + num_buf = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE; + for (unsigned int i = 0; i < num_buf; i++) { + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual"); + break; + } + recv_io->_fc_cb(frame_buff::uptr(buff_ptr), + recv_io->_dpdk_io_if.link, + recv_io->_dpdk_io_if.link); + recv_io->_num_frames_in_use--; + } + total_bufs += num_buf; + } + + return total_bufs; +} + +uint16_t dpdk_io_service::_get_unique_client_id() +{ + std::lock_guard<std::mutex> lock(_mutex); + if (_client_id_set.size() >= MAX_CLIENTS) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients"); + throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients"); + } + + uint16_t id = _next_client_id++; + while (_client_id_set.count(id)) { + id = _next_client_id++; + } + _client_id_set.insert(id); + return id; +} + +void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io) +{ + dpdk::wait_req* req; + if (dpdk_io->is_recv) { + auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + req = recv_io->_waiter; + } else { + auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + req = send_io->_waiter; + } + bool stat = req->mutex.try_lock(); + if (stat) { + bool active_req = !req->complete; + if (dpdk_io->next) { + // On the list: Take it off + if (dpdk_io->next == dpdk_io) { + // Only node on the list + _retry_head = NULL; + } else { + // Other nodes are on the list + if (_retry_head == dpdk_io) { + // Move list head to next + _retry_head = dpdk_io->next; + } + dpdk_io->next->prev = dpdk_io->prev; + dpdk_io->prev->next = dpdk_io->next; + } + dpdk_io->next = NULL; + dpdk_io->prev = NULL; + } + if (active_req) { + req->complete = true; + req->cond.notify_one(); + } + req->mutex.unlock(); + if (active_req) { + wait_req_put(req); + } + } else { + // Put on the retry list, if it isn't already + if (!dpdk_io->next) { + if (_retry_head) { + dpdk_io->next = _retry_head; + dpdk_io->prev = _retry_head->prev; + _retry_head->prev->next = dpdk_io; + _retry_head->prev = dpdk_io; + } else { + _retry_head = dpdk_io; + dpdk_io->next = dpdk_io; + dpdk_io->prev = dpdk_io; + } + } + } +} diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 8e58dd591..2c53e4905 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -124,7 +124,10 @@ if(ENABLE_DPDK) ${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp ${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp ${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/adapter.cpp ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp INCLUDE_DIRS ${DPDK_INCLUDE_DIR} EXTRA_LIBS ${DPDK_LIBRARIES} @@ -132,6 +135,8 @@ if(ENABLE_DPDK) ) set_source_files_properties( ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp PROPERTIES COMPILE_FLAGS "-march=native -D_GNU_SOURCE" ) ENDIF(ENABLE_DPDK) diff --git a/host/tests/dpdk_port_test.cpp b/host/tests/dpdk_port_test.cpp index 7ef386c52..8f6b20c34 100644 --- a/host/tests/dpdk_port_test.cpp +++ b/host/tests/dpdk_port_test.cpp @@ -6,6 +6,8 @@ #include <uhdlib/transport/dpdk/common.hpp> #include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> #include <boost/program_options.hpp> #include <iostream> #include <memory> @@ -101,6 +103,46 @@ int main(int argc, char **argv) service_thread.join(); std::cout << "PASS: Service thread terminated" << std::endl; delete queue; + + std::cout << "Starting up ARP thread..." << std::endl; + std::vector<uhd::transport::dpdk::dpdk_port*> ports; + ports.push_back(port); + //auto io_srv = uhd::transport::dpdk_io_service::make(1, ports, 16); + auto io_srv = ctx->get_io_service(1); + + // Create link + std::cout << "Creating UDP link..." << std::endl; + uhd::transport::link_params_t params; + params.recv_frame_size = 8000; + params.send_frame_size = 8000; + params.num_recv_frames = 511; + params.num_send_frames = 511; + params.recv_buff_size = params.recv_frame_size*params.num_recv_frames; + params.send_buff_size = params.send_frame_size*params.num_send_frames; + auto link = uhd::transport::udp_dpdk_link::make("192.168.10.2", "49600", params); + + // Attach link + std::cout << "Attaching UDP send link..." << std::endl; + io_srv->attach_send_link(link); + struct ether_addr dest_mac; + link->get_remote_mac(dest_mac); + char mac_str[20]; + ether_format_addr(mac_str, 20, &dest_mac); + std::cout << "Remote MAC address is " << mac_str << std::endl; + std::cout << std::endl; + std::cout << "Attaching UDP recv link..." << std::endl; + io_srv->attach_recv_link(link); + std::cout << "Press any key to quit..." << std::endl; + std::cin.get(); + + // Shut down + std::cout << "Detaching UDP send link..." << std::endl; + io_srv->detach_send_link(link); + std::cout << "Detaching UDP recv link..." << std::endl; + io_srv->detach_recv_link(link); + std::cout << "Shutting down I/O service..." << std::endl; + io_srv.reset(); + std::cout << "Shutting down context..." << std::endl; ctx.reset(); return 0; } |