diff options
Diffstat (limited to 'host')
18 files changed, 2633 insertions, 352 deletions
| diff --git a/host/docs/dpdk.dox b/host/docs/dpdk.dox index e840fc235..3f71231a5 100644 --- a/host/docs/dpdk.dox +++ b/host/docs/dpdk.dox @@ -75,7 +75,23 @@ load that driver with the following command:      modprobe vfio-pci  For NICs that require vfio-pci (like Intel's X520), you'll want to use the -`dpdk-devbind.py` script to the vfio-pci driver. +`dpdk-devbind.py` script to the vfio-pci driver. This script is shipped with +DPDK and installed to `$prefix/share/dpdk/usertools`. If the NIC uses the +vfio-pci driver, and the package was installed apt-get, then a typical invocation +might be + +    /usr/share/dpdk/usertools/dpdk-devbind.py --bind=vfio-pci ens6f0 + +If successful, the script might provide an updated status like this: + +    /usr/share/dpdk/usertools/dpdk-devbind.py -s +     +    Network devices using DPDK-compatible driver +    ============================================ +    0000:02:00.0 '82599ES 10-Gigabit SFI/SFP+ Network Connection 10fb' drv=vfio-pci unused=ixgbe +    [...] + +  See https://doc.dpdk.org/guides-18.11/linux_gsg/linux_drivers.html#binding-and-unbinding-network-ports-to-from-the-kernel-modules  for more details. @@ -102,7 +118,7 @@ options:      ;instead and swap between them      [use_dpdk=1]      ;dpdk_mtu is the NIC's MTU setting -    ;This is separate from MPM's maximum packet size--tops out at 4000 +    ;This is separate from MPM's maximum packet size      dpdk_mtu=9000      ;dpdk_driver is the -d flag for the DPDK EAL. If DPDK doesn't pick up the driver for your NIC      ;automatically, you may need this argument to point it to the folder where it can find the drivers @@ -115,8 +131,8 @@ options:      ;dpdk_num_mbufs is the total number of packet buffers allocated      ;to each direction's packet buffer pool      ;This will be multiplied by the number of NICs, but NICs on the same -    ;CPU socket share a pool -    dpdk_num_mbufs=512 +    ;CPU socket share a pool. +    dpdk_num_mbufs=4096      ;dpdk_mbuf_cache_size is the number of buffers to cache for a CPU      ;The cache reduces the interaction with the global pool      dpdk_mbuf_cache_size=64 @@ -127,20 +143,24 @@ address, and it must be in a particular format. Hex digits must all be lower  case, and octets must be separated by colons. Here is an example:      [dpdk_mac=3c:fd:fe:a2:a9:09] -    ;dpdk_io_cpu selects the CPU that this NIC's driver will run on -    ;Multiple NICs may occupy one CPU, but the I/O thread will completely -    ;consume that CPU. Also, 0 is reserved for the master thread (i.e. +    ;dpdk_lcore selects the lcore that this NIC's driver will run on +    ;Multiple NICs may occupy one lcore, but the I/O thread will completely +    ;consume that lcore's CPU. Also, 0 is reserved for the master thread (i.e.      ;the initial UHD thread that calls init() for DPDK). Attempting to      ;use it as an I/O thread will only result in hanging. -    dpdk_io_cpu = 1 +    ;Note also that by default, the lcore ID will be the same as the CPU ID. +    dpdk_lcore = 1      ;dpdk_ipv4 specifies the IPv4 address, and both the address and      ;subnet mask are required (and in this format!). DPDK uses the      ;netmask to create a basic routing table. Routing to other networks      ;(i.e. via gateways) is not permitted.      dpdk_ipv4 = 192.168.10.1/24 +    ;dpdk_num_desc is the number of descriptors in each DMA ring. +    ;Must be a power of 2. +    dpdk_num_desc=4096      [dpdk_mac=3c:fd:fe:a2:a9:0a] -    dpdk_io_cpu = 1 +    dpdk_lcore = 1      dpdk_ipv4 = 192.168.20.1/24  \section dpdk_using Using DPDK in UHD diff --git a/host/lib/include/uhdlib/transport/dpdk/arp.hpp b/host/lib/include/uhdlib/transport/dpdk/arp.hpp new file mode 100644 index 000000000..e71119bb3 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/arp.hpp @@ -0,0 +1,29 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <rte_arp.h> + +namespace uhd { namespace transport { namespace dpdk { + +struct arp_request +{ +    struct ether_addr tha; +    port_id_t port; +    ipv4_addr tpa; +}; + +struct arp_entry +{ +    struct ether_addr mac_addr; +    std::vector<wait_req*> reqs; +}; + +}}} /* namespace uhd::transport::dpdk */ +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk/common.hpp b/host/lib/include/uhdlib/transport/dpdk/common.hpp index 1f4466a0f..ac3526e49 100644 --- a/host/lib/include/uhdlib/transport/dpdk/common.hpp +++ b/host/lib/include/uhdlib/transport/dpdk/common.hpp @@ -1,44 +1,130 @@  // -// Copyright 2019 Ettus Research, a National Instruments brand +// Copyright 2019 Ettus Research, a National Instruments Brand  //  // SPDX-License-Identifier: GPL-3.0-or-later  // +  #ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_  #define _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_  #include <uhd/config.hpp> +#include <uhd/transport/frame_buff.hpp>  #include <uhd/types/device_addr.hpp> -#include <uhd/utils/noncopyable.hpp> -#include <uhd/utils/static.hpp> +#include <uhdlib/transport/adapter_info.hpp>  #include <rte_ethdev.h>  #include <rte_ether.h>  #include <rte_flow.h>  #include <rte_mbuf.h>  #include <rte_mempool.h> +#include <rte_spinlock.h>  #include <rte_version.h>  #include <unordered_map>  #include <array>  #include <atomic>  #include <mutex> +#include <set>  #include <string> -/* NOTE: There are changes to rte_eth_addr in 19.x */ +/* NOTE: There are changes to all the network standard fields in 19.x */ + + +namespace uhd { namespace transport { + +class dpdk_io_service; + +namespace dpdk { -namespace uhd { namespace transport { namespace dpdk { +struct arp_entry;  using queue_id_t = uint16_t;  using port_id_t  = uint16_t;  using ipv4_addr  = uint32_t; +class dpdk_adapter_info : public adapter_info +{ +public: +    dpdk_adapter_info(port_id_t port) : _port(port) {} +    ~dpdk_adapter_info() {} + +    std::string to_string() +    { +        return std::string("DPDK:") + std::to_string(_port); +    } + +    bool operator==(const dpdk_adapter_info& rhs) const +    { +        return (_port == rhs._port); +    } + +private: +    // Port ID +    port_id_t _port; +}; + + +/*! + * Packet/Frame buffer class for DPDK + * + * This class is intended to be placed in the private area of the rte_mbuf, and + * its memory is part of the rte_mbuf, so its life is tied to the underlying + * buffer (or more precisely, the encapsulating one). + */ +class dpdk_frame_buff : public frame_buff +{ +public: +    dpdk_frame_buff(struct rte_mbuf* mbuf) : _mbuf(mbuf) +    { +        _data        = rte_pktmbuf_mtod(mbuf, void*); +        _packet_size = 0; +    } + +    ~dpdk_frame_buff() = default; + +    /*! +     * Simple getter for the underlying rte_mbuf. +     * The rte_mbuf may need further modification before sending packets, +     * like adjusting the IP and UDP lengths. +     */ +    inline struct rte_mbuf* get_pktmbuf() +    { +        return _mbuf; +    } + +    /*! +     * Move the data pointer by the indicated size, to some desired +     * encapsulated frame. +     * +     * \param hdr_size Size (in bytes) of the headers to skip. Can be negative +     *                 to pull the header back. +     */ +    inline void header_jump(ssize_t hdr_size) +    { +        _data = (void*)((uint8_t*)_data + hdr_size); +    } + +    //! Embedded list node's next ptr +    dpdk_frame_buff* next = nullptr; +    //! Embedded list node's prev ptr +    dpdk_frame_buff* prev = nullptr; + +private: +    struct rte_mbuf* _mbuf; +}; + + +/*! + * The size (in bytes) of the private area reserved within the rte_mbuf. + * This portion of the rte_mbuf is used for the embedded dpdk_frame_buff data + * structure. + */ +constexpr size_t DPDK_MBUF_PRIV_SIZE = +    RTE_ALIGN(sizeof(struct dpdk_frame_buff), RTE_MBUF_PRIV_ALIGN); +  /*!   * Class representing a DPDK NIC port   *   * The dpdk_port object possesses all the data needed to send and receive - * packets between this port and a remote host. A logical link should specify - * which packets are destined for it and allocate a DMA queue with the - * dpdk_port::alloc_queue() function. A logical link should not, however, - * specify ARP packets for its set of received packets. That functionality is - * reserved for the special queue 0. + * packets between this port and a remote host.   *   * The logical link can then get the packet buffer pools associated with this   * NIC port and use them to send and receive packets. @@ -55,7 +141,7 @@ public:       * \param port The port ID       * \param mtu The intended MTU for the port       * \param num_queues Number of DMA queues to reserve for this port -     * \param num_mbufs The number of packet buffers per queue +     * \param num_desc The number of descriptors per DMA queue       * \param rx_pktbuf_pool A pointer to the port's RX packet buffer pool       * \param tx_pktbuf_pool A pointer to the port's TX packet buffer pool       * \param ipv4_address The IPv4 network address (w/ netmask) @@ -64,7 +150,7 @@ public:      static dpdk_port::uptr make(port_id_t port,          size_t mtu,          uint16_t num_queues, -        size_t num_mbufs, +        uint16_t num_desc,          struct rte_mempool* rx_pktbuf_pool,          struct rte_mempool* tx_pktbuf_pool,          std::string ipv4_address); @@ -72,11 +158,13 @@ public:      dpdk_port(port_id_t port,          size_t mtu,          uint16_t num_queues, -        size_t num_mbufs, +        uint16_t num_desc,          struct rte_mempool* rx_pktbuf_pool,          struct rte_mempool* tx_pktbuf_pool,          std::string ipv4_address); +    ~dpdk_port(); +      /*! Getter for this port's ID       * \return this port's ID       */ @@ -85,6 +173,19 @@ public:          return _port;      } +    inline dpdk_adapter_info get_adapter_info() const +    { +        return dpdk_adapter_info(_port); +    } + +    /*! Getter for this port's MAC address +     * \return this port's MAC address +     */ +    inline ether_addr get_mac_addr() const +    { +        return _mac_addr; +    } +      /*! Getter for this port's MTU       * \return this port's MTU       */ @@ -141,68 +242,45 @@ public:       * \param dst_ipv4_addr The destination IPv4 address (in network order)       * \return whether the destination address matches this port's broadcast address       */ -    inline bool dst_is_broadcast(const uint32_t dst_ipv4_addr) const +    inline bool dst_is_broadcast(const ipv4_addr dst_ipv4_addr) const      {          uint32_t network = _netmask | ((~_netmask) & dst_ipv4_addr);          return (network == 0xffffffff);      } -    /*! Allocate a DMA queue (TX/RX pair) and use the specified flow pattern -     * to route packets to the RX queue. -     * -     * \pattern recv_pattern The flow pattern to use for directing traffic to -     *                       the allocated RX queue. -     * \return The queue ID for the allocated queue -     * \throw uhd::runtime_error when there are no free queues -     */ -    queue_id_t alloc_queue(struct rte_flow_pattern recv_pattern[]); - -    /*! Free a previously allocated queue and tear down the associated flow rule -     * \param queue The ID of the queue to free -     * \throw std::out_of_range when the queue ID is not currently allocated -     */ -    void free_queue(queue_id_t queue); -      /*! -     * Process ARP request/reply +     * Allocate a UDP port and return it in network order +     * +     * \param udp_port UDP port to attempt to allocate. Use 0 for no preference. +     * \return 0 for failure, else the allocated UDP port in network order.       */ -    // int process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr *arp_frame); +    uint16_t alloc_udp_port(uint16_t udp_port);  private: +    friend uhd::transport::dpdk_io_service; +      /*!       * Construct and transmit an ARP reply (for the given ARP request)       */ -    int _arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req); +    int _arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req);      port_id_t _port;      size_t _mtu; +    size_t _num_queues;      struct rte_mempool* _rx_pktbuf_pool;      struct rte_mempool* _tx_pktbuf_pool;      struct ether_addr _mac_addr;      ipv4_addr _ipv4;      ipv4_addr _netmask; -    size_t _num_queues; -    std::vector<queue_id_t> _free_queues; -    std::unordered_map<queue_id_t, struct rte_flow*> _flow_rules; -    /* Need ARP table -     * To implement ARP service, maybe create ARP xport -     * Then need dpdk_udp_link and dpdk_raw_link -     * -     * ...Or just do it inline with dpdk_ctx -     * -     * And link can just save the result (do it during constructor) -     * -     * -     * But what about the service that _responds_ to ARP requests?! -     * -     * Maybe have to connect a DPDK link in stages: -     * First, create the ARP service and attach it to the dpdk_ctx -     *  dpdk_ctx must own the links...? -     *  Or! Always burn a DMA engine for ARP -     * -     * Maybe have a shared_ptr to an ARP service here? -     */ + +    // Structures protected by mutex      std::mutex _mutex; +    std::set<uint16_t> _udp_ports; +    uint16_t _next_udp_port = 0xffff; + +    // Structures protected by spin lock +    rte_spinlock_t _spinlock = RTE_SPINLOCK_INITIALIZER; +    std::unordered_map<ipv4_addr, struct arp_entry*> _arp_table;  }; @@ -267,17 +345,21 @@ public:      int get_port_link_status(port_id_t portid) const;      /*! -     * Get port ID for routing packet destined for given address +     * Get port for routing packet destined for given address       * \param addr Destination address -     * \return port ID from routing table +     * \return pointer to the port from routing table       */ -    int get_route(const std::string& addr) const; +    dpdk_port* get_route(const std::string& addr) const;      /*!       * \return whether init() has been called       */      bool is_init_done(void) const; +    /*! Return a reference to an IO service given a port ID +     */ +    std::shared_ptr<uhd::transport::dpdk_io_service> get_io_service(const size_t port_id); +  private:      /*! Convert the args to DPDK's EAL args and Initialize the EAL       * @@ -312,8 +394,12 @@ private:      std::unordered_map<port_id_t, dpdk_port::uptr> _ports;      std::vector<struct rte_mempool*> _rx_pktbuf_pools;      std::vector<struct rte_mempool*> _tx_pktbuf_pools; +    // Store all the I/O services, and also store the corresponding port ID +    std::map<std::shared_ptr<uhd::transport::dpdk_io_service>, std::vector<size_t>> +        _io_srv_portid_map;  }; -}}} // namespace uhd::transport::dpdk +} // namespace dpdk +}} // namespace uhd::transport  #endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp index 7c9917079..c95786864 100644 --- a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp +++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp @@ -28,6 +28,14 @@ enum wait_type {      WAIT_FLOW_OPEN,      //! Wake once the flow/socket is destroyed      WAIT_FLOW_CLOSE, +    //! Wake once the new transport is connected +    WAIT_XPORT_CONNECT, +    //! Wake once the transport is disconnected +    WAIT_XPORT_DISCONNECT, +    //! Wake when MAC address found for IP address +    WAIT_ARP, +    //! Wake once the I/O worker terminates +    WAIT_LCORE_TERM,      //! Number of possible reasons for waiting      WAIT_TYPE_COUNT  }; @@ -50,6 +58,8 @@ struct wait_req      std::mutex mutex;      //! Whether the request was completed      bool complete; +    //! The status or error code associated with the request +    int retval;      //! An atomic reference counter for managing this request object's memory      rte_atomic32_t refcnt;  }; @@ -110,7 +120,7 @@ class service_queue  public:      /*!       * Create a service queue -     * \param depth Must be a power of 2 +     * \param depth Must be a power of 2. Actual size is less.       * \param lcore_id The DPDK lcore_id associated with this service queue       */      service_queue(size_t depth, unsigned int lcore_id) @@ -227,6 +237,24 @@ public:          return stat;      } +    /*! +     * Get the size of the service queue +     * \return size of the service queue +     */ +    inline size_t get_size() +    { +        return rte_ring_get_size(_waiter_ring); +    } + +    /*! +     * Get the capacity of the service queue +     * \return capacity of the service queue +     */ +    inline size_t get_capacity() +    { +        return rte_ring_get_capacity(_waiter_ring); +    } +  private:      //! Multi-producer, single-consumer ring for requests      struct rte_ring* _waiter_ring; diff --git a/host/lib/include/uhdlib/transport/dpdk/udp.hpp b/host/lib/include/uhdlib/transport/dpdk/udp.hpp new file mode 100644 index 000000000..65e561315 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/udp.hpp @@ -0,0 +1,115 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <arpa/inet.h> +#include <netinet/udp.h> +#include <rte_ip.h> +#include <rte_udp.h> +#include <boost/format.hpp> + +namespace uhd { namespace transport { namespace dpdk { + +constexpr size_t HDR_SIZE_UDP_IPV4 = +    (sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + +/*! + * An enumerated type representing the type of flow for an IPv4 client + * Currently, only UDP is supported (FLOW_TYPE_UDP) + */ +enum flow_type { +    FLOW_TYPE_UDP, +    FLOW_TYPE_COUNT, +}; + +/*! + * A tuple for IPv4 flows that can be used for hashing + */ +struct ipv4_5tuple +{ +    enum flow_type flow_type; +    ipv4_addr src_ip; +    ipv4_addr dst_ip; +    uint16_t src_port; +    uint16_t dst_port; +}; + +inline void fill_ipv4_hdr(struct rte_mbuf* mbuf, +    const dpdk_port* port, +    uint32_t dst_ipv4_addr, +    uint8_t proto_id, +    uint32_t payload_len) +{ +    struct ether_hdr* eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); +    struct ipv4_hdr* ip_hdr   = (struct ipv4_hdr*)ð_hdr[1]; + +    ip_hdr->version_ihl     = 0x40 | 5; +    ip_hdr->type_of_service = 0; +    ip_hdr->total_length    = rte_cpu_to_be_16(20 + payload_len); +    ip_hdr->packet_id       = 0; +    ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG); +    ip_hdr->time_to_live    = 64; +    ip_hdr->next_proto_id   = proto_id; +    ip_hdr->hdr_checksum    = 0; // Require HW offload +    ip_hdr->src_addr        = port->get_ipv4(); +    ip_hdr->dst_addr        = dst_ipv4_addr; + +    mbuf->ol_flags = PKT_TX_IP_CKSUM | PKT_TX_IPV4; +    mbuf->l2_len   = sizeof(struct ether_hdr); +    mbuf->l3_len   = sizeof(struct ipv4_hdr); +    mbuf->pkt_len  = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +    mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len; +} + +/* All values except payload length must be in network order */ +inline void fill_udp_hdr(struct rte_mbuf* mbuf, +    const dpdk_port* port, +    uint32_t dst_ipv4_addr, +    uint16_t src_port, +    uint16_t dst_port, +    uint32_t payload_len) +{ +    struct ether_hdr* eth_hdr; +    struct ipv4_hdr* ip_hdr; +    struct udp_hdr* tx_hdr; + +    fill_ipv4_hdr( +        mbuf, port, dst_ipv4_addr, IPPROTO_UDP, sizeof(struct udp_hdr) + payload_len); + +    eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); +    ip_hdr  = (struct ipv4_hdr*)ð_hdr[1]; +    tx_hdr  = (struct udp_hdr*)&ip_hdr[1]; + +    tx_hdr->src_port    = src_port; +    tx_hdr->dst_port    = dst_port; +    tx_hdr->dgram_len   = rte_cpu_to_be_16(8 + payload_len); +    tx_hdr->dgram_cksum = 0; +    mbuf->l4_len        = sizeof(struct udp_hdr); +} + +//! Return an IPv4 address (numeric, in network order) into a string +inline std::string ipv4_num_to_str(const uint32_t ip_addr) +{ +    char addr_str[INET_ADDRSTRLEN]; +    struct in_addr ipv4_addr; +    ipv4_addr.s_addr = ip_addr; +    inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); +    return std::string(addr_str); +} + +inline std::string eth_addr_to_string(const struct ether_addr mac_addr) +{ +    auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); +    mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] +        % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] +        % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; +    return mac_stream.str(); +} + +}}} /* namespace uhd::transport::dpdk */ +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp new file mode 100644 index 000000000..8e1fb29d0 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp @@ -0,0 +1,246 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <rte_arp.h> +#include <rte_hash.h> +#include <rte_ip.h> +#include <rte_mbuf.h> +#include <rte_udp.h> +#include <vector> + +namespace uhd { namespace transport { + +class dpdk_send_io; +class dpdk_recv_io; +struct dpdk_io_if; + +class dpdk_io_service : public virtual io_service, +                        public std::enable_shared_from_this<dpdk_io_service> +{ +public: +    using sptr = std::shared_ptr<dpdk_io_service>; + +    static sptr make( +        unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth); + +    ~dpdk_io_service(); + +    // Add entry to RX flow table +    // This yields a link, which is then used for attaching to a buffer +    // We yank from the link immediately following, then process at the transport level +    // (so two tables here, one for transports, one for links) +    void attach_recv_link(recv_link_if::sptr link); + +    // Create object to hold set of queues, to go in TX table +    void attach_send_link(send_link_if::sptr link); + +    void detach_recv_link(recv_link_if::sptr link); + +    void detach_send_link(send_link_if::sptr link); + +    recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link, +        size_t num_recv_frames, +        recv_callback_t cb, +        send_link_if::sptr fc_link, +        size_t num_send_frames, +        recv_io_if::fc_callback_t fc_cb); + +    send_io_if::sptr make_send_client(send_link_if::sptr send_link, +        size_t num_send_frames, +        send_io_if::send_callback_t send_cb, +        recv_link_if::sptr recv_link, +        size_t num_recv_frames, +        recv_callback_t recv_cb, +        send_io_if::fc_callback_t fc_cb); + + +private: +    friend class dpdk_recv_io; +    friend class dpdk_send_io; + +    dpdk_io_service( +        unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth); +    dpdk_io_service(const dpdk_io_service&) = delete; + +    /*! +     * I/O worker function to be passed to the DPDK lcore +     * +     * The argument must be a pointer to  *this* dpdk_io_service +     * +     * \param arg a pointer to this dpdk_io_service +     * \return 0 for normal termination, else nonzero +     */ +    static int _io_worker(void* arg); + +    /*! +     * Helper function for I/O thread to process requests on its service queue +     */ +    int _service_requests(); + +    /*! +     * Helper function for I/O thread to service a WAIT_FLOW_OPEN request +     * +     * \param req The requester's wait_req object +     */ +    void _service_flow_open(dpdk::wait_req* req); + +    /*! +     * Helper function for I/O thread to service a WAIT_FLOW_CLOSE request +     * +     * \param req The requester's wait_req object +     */ +    void _service_flow_close(dpdk::wait_req* req); + +    /*! +     * Helper function for I/O thread to service a WAIT_XPORT_CONNECT request +     * +     * \param req The requester's wait_req object +     */ +    void _service_xport_connect(dpdk::wait_req* req); + +    /*! +     * Helper function for I/O thread to service a WAIT_XPORT_DISCONNECT request +     * +     * \param req The requester's wait_req object +     */ +    void _service_xport_disconnect(dpdk::wait_req* req); + +    /*! +     * Get Ethernet MAC address for the given IPv4 address, and wake the +     * requester when finished. +     * This may only be called by an I/O service, on behalf of a requester's +     * WAIT_ARP request. +     * +     * \param req The requester's wait_req object +     * \return 0 if address was written, -EAGAIN if request was queued for +     *         later completion, -ENOMEM if ran out of memory to complete +     *         request +     */ +    int _service_arp_request(dpdk::wait_req* req); + +    /*! +     * Helper function for I/O thread to do a burst of packet retrieval and +     * processing on an RX queue +     * +     * \param port the DPDK NIC port used for RX +     * \param queue the DMA queue on the port to recv from +     */ +    int _rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue); + +    /*! +     * Helper function for I/O thread to do a burst of packet transmission on a +     * TX queue +     * +     * \param port the DPDK NIC port used for TX +     * \return number of buffers transmitted +     */ +    int _tx_burst(dpdk::dpdk_port* port); + +    /*! +     * Helper function for I/O thread to release a burst of buffers from an RX +     * release queue +     * +     * \param port the DPDK NIC port used for RX +     * \return number of buffers released +     */ +    int _rx_release(dpdk::dpdk_port* port); + +    /*! +     * Helper function for I/O thread to do send an ARP request +     * +     * \param port the DPDK NIC port to send the ARP request through +     * \param queue the DMA queue on the port to send to +     * \param ip the IPv4 address for which the caller is seeking a MAC address +     */ +    int _send_arp_request( +        dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip); + +    /*! +     * Helper function for I/O thread to process an ARP request/reply +     * +     * \param port the DPDK NIC port to send any ARP replies from +     * \param queue the DMA queue on the port to send ARP replies to +     * \param arp_frame a pointer to the ARP frame +     */ +    int _process_arp( +        dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame); + +    /*! +     * Helper function for I/O thread to process an IPv4 packet +     * +     * \param port the DPDK NIC port to send any ARP replies from +     * \param mbuf a pointer to the packet buffer container +     * \param pkt a pointer to the IPv4 header of the packet +     */ +    int _process_ipv4(dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt); + +    /*! +     * Helper function for I/O thread to process an IPv4 packet +     * +     * \param port the DPDK NIC port to send any ARP replies from +     * \param mbuf a pointer to the packet buffer container +     * \param pkt a pointer to the UDP header of the packet +     * \param bcast whether this packet was destined for the port's broadcast +     *              IPv4 address +     */ +    int _process_udp( +        dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool bcast); + +    /*! +     * Helper function to get a unique client ID +     * +     * \return a unique client ID +     */ +    uint16_t _get_unique_client_id(); + +    /*! +     * Attempt to wake client +     */ +    void _wake_client(dpdk_io_if* dpdk_io); + +    //! The reference to the DPDK context +    std::weak_ptr<dpdk::dpdk_ctx> _ctx; +    //! The lcore running this dpdk_io_service's work routine +    unsigned int _lcore_id; +    //! The NIC ports served by this dpdk_io_service +    std::vector<dpdk::dpdk_port*> _ports; +    //! The set of TX queues associated with a given port +    std::unordered_map<dpdk::port_id_t, std::list<dpdk_send_io*>> _tx_queues; +    //! The list of recv_io for each port +    std::unordered_map<dpdk::port_id_t, std::list<dpdk_recv_io*>> _recv_xport_map; +    //! The RX table, which provides lists of dpdk_recv_io for an IPv4 tuple +    struct rte_hash* _rx_table; +    //! Service queue for clients to make requests +    dpdk::service_queue _servq; +    //! Retry list for waking clients +    dpdk_io_if* _retry_head = NULL; + +    //! Mutex to protect below data structures +    std::mutex _mutex; +    //! The recv links attached to this I/O service (managed client side) +    std::list<recv_link_if::sptr> _recv_links; +    //! The send links attached to this I/O service (managed client side) +    std::list<send_link_if::sptr> _send_links; +    //! Set of IDs for new clients +    std::set<uint16_t> _client_id_set; +    //! Next ID to try +    uint16_t _next_client_id; + +    static constexpr int MAX_PENDING_SERVICE_REQS = 32; +    static constexpr int MAX_FLOWS                = 128; +    static constexpr int MAX_CLIENTS              = 2048; +    static constexpr int RX_BURST_SIZE            = 16; +    static constexpr int TX_BURST_SIZE            = 16; +}; + +}} // namespace uhd::transport + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp new file mode 100644 index 000000000..451cc1531 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp @@ -0,0 +1,285 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ + +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> +#include <rte_ring.h> +#include <chrono> + +namespace uhd { namespace transport { + +struct dpdk_flow_data +{ +    //! The UDP DPDK link +    udp_dpdk_link* link; +    //! Is it a recv_link_if? Or a send_link_if? +    bool is_recv; +}; + +/*! DPDK I/O interface for service requests + * + * This is used to pass around information about the I/O clients. It is what is + * passed into the data portion of a request, for connect and disconnect + * requests. + * + */ +struct dpdk_io_if +{ +    dpdk_io_if(bool is_recv, +        udp_dpdk_link* link, +        dpdk_io_service::sptr io_srv, +        recv_callback_t recv_cb) +        : is_recv(is_recv), link(link), io_srv(io_srv), recv_cb(recv_cb) +    { +    } + +    bool is_recv; +    udp_dpdk_link* link; +    dpdk_io_service::sptr io_srv; +    recv_callback_t recv_cb; +    void* io_client; +    //! Embedded list node +    dpdk_io_if* next = NULL; +    dpdk_io_if* prev = NULL; +}; + +// This must be tied to a link for frame_buffs +// so need map of dpdk_send_io to udp_dpdk_link +// Have queue pair: buffs to send + free buffs +// There used to be a retry queue: Still needed? (Maybe 1 per port?) +class dpdk_send_io : public virtual send_io_if +{ +public: +    using sptr = std::shared_ptr<dpdk_send_io>; + +    dpdk_send_io(dpdk_io_service::sptr io_srv, +        udp_dpdk_link* link, +        size_t num_send_frames, +        send_callback_t send_cb, +        size_t num_recv_frames, +        recv_callback_t recv_cb, +        fc_callback_t fc_cb) +        : _dpdk_io_if(false, link, io_srv, recv_cb) +        , _servq(io_srv->_servq) +        , _send_cb(send_cb) +        , _fc_cb(fc_cb) +    { +        // Get reference to DPDK context, since this owns some DPDK memory +        _ctx             = dpdk::dpdk_ctx::get(); +        _num_send_frames = num_send_frames; +        _num_recv_frames = num_recv_frames; + +        // Create the free buffer and send queues +        // Must be power of 2, and add one since usable ring is size-1 +        size_t queue_size        = (size_t)exp2(ceil(log2(num_send_frames + 1))); +        dpdk::port_id_t nic_port = link->get_port()->get_port_id(); +        uint16_t id              = io_srv->_get_unique_client_id(); +        char name[16]; +        snprintf(name, sizeof(name), "tx%hu-%hu", nic_port, id); +        _buffer_queue = rte_ring_create( +            name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); +        snprintf(name, sizeof(name), "~tx%hu-%hu", nic_port, id); +        _send_queue = rte_ring_create( +            name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); +        UHD_LOG_TRACE("DPDK::SEND_IO", "dpdk_send_io() " << _buffer_queue->name); + +        // Create the wait_request object that gets passed around +        _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_TX_BUF, (void*)&_dpdk_io_if); +        _waiter->complete = true; +    } + +    ~dpdk_send_io() +    { +        UHD_LOG_TRACE("DPDK::SEND_IO", "~dpdk_send_io() " << _buffer_queue->name); +        // Deregister with I/O service +        auto xport_req = dpdk::wait_req_alloc( +            dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if); +        _servq.submit(xport_req, std::chrono::microseconds(-1)); + +        // Clean up +        wait_req_put(xport_req); +        rte_ring_free(_send_queue); +        rte_ring_free(_buffer_queue); +        wait_req_put(_waiter); +    } + +    bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/) +    { +        // For this I/O service, the destination is the queue to the offload +        // thread. The queue is always able to accomodate new packets since it +        // is sized to fit all the frames reserved from the link. +        return true; +    } + +    frame_buff::uptr get_send_buff(int32_t timeout_ms) +    { +        frame_buff* buff_ptr; +        if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) { +            if (!timeout_ms) { +                return frame_buff::uptr(); +            } +            // Nothing in the queue. Try waiting if there is a timeout. +            auto timeout_point = +                std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); +            std::unique_lock<std::mutex> lock(_waiter->mutex); +            wait_req_get(_waiter); +            _waiter->complete = false; +            auto is_complete  = [this] { return !rte_ring_empty(_buffer_queue); }; +            if (timeout_ms < 0) { +                _waiter->cond.wait(lock, is_complete); +            } else { +                auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete); +                if (!status) { +                    return frame_buff::uptr(); +                } +            } +            if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) { +                return frame_buff::uptr(); +            } +        } +        return frame_buff::uptr(buff_ptr); +    } + +    void release_send_buff(frame_buff::uptr buff) +    { +        auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); +        assert(buff_ptr); +        int status = rte_ring_enqueue(_send_queue, buff_ptr); +        if (status != 0) { +            assert(false); +        } +        // TODO: Should we retry if it failed? +    } + +private: +    friend class dpdk_io_service; + +    dpdk_io_if _dpdk_io_if; +    size_t _num_frames_in_use = 0; + +    dpdk::service_queue& _servq; +    dpdk::dpdk_ctx::sptr _ctx; +    struct rte_ring* _buffer_queue; +    struct rte_ring* _send_queue; +    dpdk::wait_req* _waiter; +    send_callback_t _send_cb; +    fc_callback_t _fc_cb; +}; + +class dpdk_recv_io : public virtual recv_io_if +{ +public: +    using sptr = std::shared_ptr<dpdk_recv_io>; + +    dpdk_recv_io(dpdk_io_service::sptr io_srv, +        udp_dpdk_link* link, +        size_t num_recv_frames, +        recv_callback_t recv_cb, +        size_t num_send_frames, +        fc_callback_t fc_cb) +        : _dpdk_io_if(true, link, io_srv, recv_cb) +        , _servq(io_srv->_servq) +        , _fc_cb(fc_cb) // Call on release +    { +        // Get reference to DPDK context, since this owns some DPDK memory +        _ctx             = dpdk::dpdk_ctx::get(); +        _num_send_frames = num_send_frames; +        _num_recv_frames = num_recv_frames; +        // Create the recv and release queues +        // Must be power of 2, and add one since usable ring is size-1 +        size_t queue_size        = (size_t)exp2(ceil(log2(num_recv_frames + 1))); +        dpdk::port_id_t nic_port = link->get_port()->get_port_id(); +        uint16_t id              = io_srv->_get_unique_client_id(); +	UHD_LOG_DEBUG("DPDK::IO_SERVICE", "Creating recv client with queue size of " << queue_size); +        char name[16]; +        snprintf(name, sizeof(name), "rx%hu-%hu", nic_port, id); +        _recv_queue = rte_ring_create( +            name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); +        snprintf(name, sizeof(name), "~rx%hu-%hu", nic_port, id); +        _release_queue = rte_ring_create( +            name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); +        UHD_LOG_TRACE("DPDK::RECV_IO", "dpdk_recv_io() " << _recv_queue->name); +        // Create the wait_request object that gets passed around +        _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_RX, (void*)&_dpdk_io_if); +        _waiter->complete = true; +    } + +    ~dpdk_recv_io() +    { +        // Deregister with I/O service +        UHD_LOG_TRACE("DPDK::RECV_IO", "~dpdk_recv_io() " << _recv_queue->name); +        auto xport_req = dpdk::wait_req_alloc( +            dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if); +        _servq.submit(xport_req, std::chrono::microseconds(-1)); + +        // Clean up +        wait_req_put(xport_req); +        rte_ring_free(_recv_queue); +        rte_ring_free(_release_queue); +        wait_req_put(_waiter); +    } + +    frame_buff::uptr get_recv_buff(int32_t timeout_ms) +    { +        frame_buff* buff_ptr; +        if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) { +            if (!timeout_ms) { +                return frame_buff::uptr(); +            } +            // Nothing in the queue. Try waiting if there is a timeout. +            auto timeout_point = +                std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); +            std::unique_lock<std::mutex> lock(_waiter->mutex); +            wait_req_get(_waiter); +            _waiter->complete = false; +            auto is_complete  = [this] { return !rte_ring_empty(_recv_queue); }; +            if (timeout_ms < 0) { +                _waiter->cond.wait(lock, is_complete); +            } else { +                auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete); +                if (!status) { +                    return frame_buff::uptr(); +                } +            } +            if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) { +                return frame_buff::uptr(); +            } +        } +        return frame_buff::uptr(buff_ptr); +    } + +    void release_recv_buff(frame_buff::uptr buff) +    { +        frame_buff* buff_ptr = buff.release(); +        int status           = rte_ring_enqueue(_release_queue, buff_ptr); +        if (status != 0) { +            assert(false); +        } +    } + +private: +    friend class dpdk_io_service; + +    dpdk_io_if _dpdk_io_if; +    size_t _num_frames_in_use = 0; + +    dpdk::service_queue& _servq; +    dpdk::dpdk_ctx::sptr _ctx; +    struct rte_ring* _recv_queue; +    struct rte_ring* _release_queue; +    dpdk::wait_req* _waiter; +    fc_callback_t _fc_cb; +}; + + +}} // namespace uhd::transport + +#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ */ diff --git a/host/lib/include/uhdlib/transport/dpdk_simple.hpp b/host/lib/include/uhdlib/transport/dpdk_simple.hpp index 86abaeff8..8420510ea 100644 --- a/host/lib/include/uhdlib/transport/dpdk_simple.hpp +++ b/host/lib/include/uhdlib/transport/dpdk_simple.hpp @@ -8,7 +8,6 @@  #define INCLUDED_DPDK_SIMPLE_HPP  #include <uhd/transport/udp_simple.hpp> -#include <uhdlib/transport/dpdk_common.hpp>  namespace uhd { namespace transport { @@ -17,35 +16,11 @@ class dpdk_simple : public udp_simple  public:      virtual ~dpdk_simple(void) = 0; -    /*! -     * Make a new connected dpdk transport: -     * This transport is for sending and receiving -     * between this host and a single endpoint. -     * The primary usage for this transport will be control transactions. -     * -     * The address must be an ipv4 address. -     * The port must be a number. -     * -     * \param addr a string representing the destination address -     * \param port a string representing the destination port -     */ -    static udp_simple::sptr make_connected(struct uhd_dpdk_ctx &ctx, -        const std::string &addr, const std::string &port); +    static udp_simple::sptr make_connected( +        const std::string& addr, const std::string& port); -    /*! -     * Make a new broadcasting dpdk transport: -     * This transport can send broadcast datagrams -     * and receive datagrams from multiple sources. -     * The primary usage for this transport will be to discover devices. -     * -     * The address must be an ipv4 address. -     * The port must be a number. -     * -     * \param addr a string representing the destination address -     * \param port a string representing the destination port -     */ -    static udp_simple::sptr make_broadcast(struct uhd_dpdk_ctx &ctx, -        const std::string &addr, const std::string &port); +    static udp_simple::sptr make_broadcast( +        const std::string& addr, const std::string& port);      /*!       * Send a single buffer. diff --git a/host/lib/include/uhdlib/transport/link_base.hpp b/host/lib/include/uhdlib/transport/link_base.hpp index e4d329e2a..a57b681ca 100644 --- a/host/lib/include/uhdlib/transport/link_base.hpp +++ b/host/lib/include/uhdlib/transport/link_base.hpp @@ -56,7 +56,7 @@ private:   * the link interface methods.   *   * This template requires the following methods in the derived class: - *   bool get_send_buf_derived(frame_buff& buf, int32_t timeout_ms); + *   bool get_send_buff_derived(frame_buff& buf, int32_t timeout_ms);   *   void release_send_buf_derived(frame_buff& buf);   *   * Additionally, the subclass must call preload_free_buf for each frame_buff @@ -145,7 +145,7 @@ private:   * the link interface methods.   *   * This template requires the following methods in the derived class: - *   bool get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms); + *   size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms);   *   void release_recv_buff_derived(frame_buff& buff);   *   * Additionally, the subclass must call preload_free_buff for each diff --git a/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp new file mode 100644 index 000000000..eaf3cf7c4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp @@ -0,0 +1,267 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP +#define INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <uhdlib/transport/links.hpp> +#include <rte_udp.h> +#include <cassert> +#include <string> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * A zero copy transport interface to the dpdk DMA library. + */ +class udp_dpdk_link : public virtual recv_link_if, public virtual send_link_if +{ +public: +    using sptr = std::shared_ptr<udp_dpdk_link>; + +    udp_dpdk_link(const dpdk::port_id_t port_id, +        const std::string& remote_addr, +        const std::string& remote_port, +        const std::string& local_port, +        const link_params_t& params); + +    ~udp_dpdk_link(); + +    /*! +     * Make a new dpdk link. Get port ID from routing table. +     * +     * \param remote_addr Remote IP address +     * \param remote_port Remote UDP port +     * \param params Values for frame sizes, num frames, and buffer sizes +     * \return a shared_ptr to a new udp dpdk link +     */ +    static sptr make(const std::string& remote_addr, +        const std::string& remote_port, +        const link_params_t& params); + +    /*! +     * Make a new dpdk link. User specifies DPDK port ID directly. +     * +     * \param port_id DPDK port ID to use for communication +     * \param remote_addr Remote IP address +     * \param remote_port Remote UDP port +     * \param local_port Local UDP port +     * \param params Values for frame sizes, num frames, and buffer sizes +     * \return a shared_ptr to a new udp dpdk link +     */ +    static sptr make(const dpdk::port_id_t port_id, +        const std::string& remote_addr, +        const std::string& remote_port, +        const std::string& local_port, +        const link_params_t& params); + +    /*! +     * Get the associated dpdk_port +     * +     * \return a pointer to the dpdk_port used by this link +     */ +    inline dpdk::dpdk_port* get_port() +    { +        return _port; +    } + +    /*! +     * Get the DMA queue associated with this link +     * +     * \return the queue ID for this link's DMA queue +     */ +    inline dpdk::queue_id_t get_queue_id() +    { +        return _queue; +    } + +    /*! +     * Get the local UDP port used by this link +     * +     * \return the local UDP port, in network order +     */ +    inline uint16_t get_local_port() +    { +        return _local_port; +    } + +    /*! +     * Get the remote UDP port used by this link +     * +     * \return the remote UDP port, in network order +     */ +    inline uint16_t get_remote_port() +    { +        return _remote_port; +    } + +    /*! +     * Get the remote IPv4 address used by this link +     * +     * \return the remote IPv4 address, in network order +     */ +    inline uint32_t get_remote_ipv4() +    { +        return _remote_ipv4; +    } + +    /*! +     * Set the remote host's MAC address +     * This MAC address must be filled in for the remote IPv4 address before +     * the link can reach its destination. +     * +     * \param mac the remote host's MAC address +     */ +    inline void set_remote_mac(struct ether_addr& mac) +    { +        ether_addr_copy(&mac, &_remote_mac); +    } + +    /*! +     * Get the remote host's MAC address +     * +     * \param mac Where to write the MAC address +     */ +    inline void get_remote_mac(struct ether_addr& dst) +    { +        ether_addr_copy(&_remote_mac, &dst); +    } + +    /*! +     * Get the number of frame buffers that can be queued by this link. +     */ +    size_t get_num_send_frames() const +    { +        return _num_send_frames; +    } + +    /*! +     * Get the maximum capacity of a frame buffer. +     */ +    size_t get_send_frame_size() const +    { +        return _send_frame_size; +    } + +    /*! +     * Get the physical adapter ID used for this link +     */ +    inline adapter_id_t get_send_adapter_id() const +    { +        return _adapter_id; +    } + +    /*! +     * Get the number of frame buffers that can be queued by this link. +     */ +    size_t get_num_recv_frames() const +    { +        return _num_recv_frames; +    } + +    /*! +     * Get the maximum capacity of a frame buffer. +     */ +    size_t get_recv_frame_size() const +    { +        return _recv_frame_size; +    } + +    /*! +     * Get the physical adapter ID used for this link +     */ +    inline adapter_id_t get_recv_adapter_id() const +    { +        return _adapter_id; +    } + +    /*! +     * Enqueue a received mbuf, which can be pulled via get_recv_buff() +     */ +    void enqueue_recv_mbuf(struct rte_mbuf* mbuf); + +    /*! +     * Receive a packet and return a frame buffer containing the packet data. +     * The timeout argument is ignored. +     * +     * Received buffers are pulled from the frame buffer list. No buffers can +     * be retrieved unless the corresponding rte_mbufs were placed in the list +     * via the enqueue_recv_mbuf() method. +     * +     * \return a frame buffer, or null uptr if timeout occurs +     */ +    frame_buff::uptr get_recv_buff(int32_t /*timeout_ms*/); + +    /*! +     * Release a frame buffer, allowing the link driver to reuse it. +     * +     * \param buffer frame buffer to release for reuse by the link +     */ +    void release_recv_buff(frame_buff::uptr buff); + +    /*! +     * Get an empty frame buffer in which to write packet contents. +     * +     * \param timeout_ms a positive timeout value specifies the maximum number +                         of ms to wait, a negative value specifies to block +                         until successful, and a value of 0 specifies no wait. +     * \return a frame buffer, or null uptr if timeout occurs +     */ +    frame_buff::uptr get_send_buff(int32_t /*timeout_ms*/); + +    /*! +     * Send a packet with the contents of the frame buffer and release the +     * buffer, allowing the link driver to reuse it. If the size of the frame +     * buffer is 0, the buffer is released with no packet being sent. +     * +     * Note that this function will only fill in the L2 header and send the +     * mbuf. The L3 and L4 headers, in addition to the lengths in the rte_mbuf +     * fields, must be set in the I/O service. +     * +     * \param buffer frame buffer containing packet data +     * +     * Throws an exception if an I/O error occurs while sending +     */ +    void release_send_buff(frame_buff::uptr buff); + +private: +    //! A reference to the DPDK context +    dpdk::dpdk_ctx::sptr _ctx; +    //! The DPDK NIC port used by this link +    dpdk::dpdk_port* _port; +    //! Local UDP port, in network order +    uint16_t _local_port; +    //! Remote UDP port, in network order +    uint16_t _remote_port; +    //! Remote IPv4 address, in network order +    uint32_t _remote_ipv4; +    //! Remote host's MAC address +    struct ether_addr _remote_mac; +    //! Number of recv frames is not validated +    size_t _num_recv_frames; +    //! Maximum bytes of UDP payload data in recv frame +    size_t _recv_frame_size; +    //! Number of send frames is not validated +    size_t _num_send_frames; +    //! Maximum bytes of UDP payload data in send frame +    size_t _send_frame_size; +    //! Registered adapter ID for this link's DPDK NIC port +    adapter_id_t _adapter_id; +    //! The RX frame buff list head +    dpdk::dpdk_frame_buff* _recv_buff_head = nullptr; +    // TODO: Implement ability to use multiple queues +    dpdk::queue_id_t _queue = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 646b2837e..d88631ae3 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -143,5 +143,14 @@ endif(ENABLE_LIBERIO)  if(ENABLE_DPDK)      INCLUDE_SUBDIRECTORY(uhd-dpdk) + +    LIBUHD_APPEND_SOURCES( +        ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp +        ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp +    ) +    set_source_files_properties( +        ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp +        PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" +    )  endif(ENABLE_DPDK) diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp index 001775934..50855f36a 100644 --- a/host/lib/transport/dpdk_simple.cpp +++ b/host/lib/transport/dpdk_simple.cpp @@ -1,179 +1,203 @@  // -// Copyright 2019 Ettus Research, a National Instruments Company +// Copyright 2019 Ettus Research, a National Instruments Brand  //  // SPDX-License-Identifier: GPL-3.0-or-later  // +#include <uhd/transport/frame_buff.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp>  #include <uhdlib/transport/dpdk_simple.hpp> -#include <uhdlib/transport/uhd-dpdk.h> +#include <uhdlib/transport/links.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp>  #include <arpa/inet.h> +  namespace uhd { namespace transport {  namespace { -    constexpr uint64_t USEC = 1000000; -    // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC -    constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4; +constexpr double SEND_TIMEOUT_MS = 500; // seconds  }  class dpdk_simple_impl : public dpdk_simple {  public: -    dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr, -                     const std::string &port, bool filter_bcast) +    dpdk_simple_impl(const std::string& addr, const std::string& port)      { -        UHD_ASSERT_THROW(ctx.is_init_done()); - -        // Get NIC that can route to addr -        int port_id = ctx.get_route(addr); -        UHD_ASSERT_THROW(port_id >= 0); - -        _port_id = port_id; -        uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); -        uint16_t dst_port = htons(std::stoi(port, NULL, 0)); - -        struct uhd_dpdk_sockarg_udp sockarg = { -            .is_tx = false, -            .filter_bcast = filter_bcast, -            .local_port = 0, -            .remote_port = dst_port, -            .dst_addr = dst_ipv4, -            .num_bufs = 1 +        link_params_t link_params = _get_default_link_params(); +        _link = +            uhd::transport::udp_dpdk_link::make(addr, port, link_params); +        UHD_LOG_TRACE("DPDK::SIMPLE", +            "Creating simple UDP object for " << addr << ":" << port +                                              << ", DPDK port index " +                                              << _link->get_port()->get_port_id()); +        // The context must be initialized, or we'd never get here +        auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); +        UHD_ASSERT_THROW(ctx->is_init_done()); + +        // Init I/O service +        _port_id = _link->get_port()->get_port_id(); +        _io_service = ctx->get_io_service(_port_id); +        // This is normally done by the I/O service manager, but with DPDK, this +        // is all it does so we skip that step +        UHD_LOG_TRACE("DPDK::SIMPLE", "Attaching link to I/O service..."); +        _io_service->attach_recv_link(_link); +        _io_service->attach_send_link(_link); + +        auto recv_cb = [this](buff_t::uptr& buff, recv_link_if*, send_link_if*) { +            return this->_recv_callback(buff); +        }; + +        auto fc_cb = [this](buff_t::uptr buff, recv_link_if*, send_link_if*) { +            this->_recv_fc_callback(std::move(buff));          }; -        _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); -        UHD_ASSERT_THROW(_rx_sock != nullptr); - -        // Backfill the local port, in case it was auto-assigned -        uhd_dpdk_udp_get_info(_rx_sock, &sockarg); -        sockarg.is_tx = true; -        sockarg.remote_port = dst_port; -        sockarg.dst_addr = dst_ipv4; -        sockarg.num_bufs = 1; -        _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); -        UHD_ASSERT_THROW(_tx_sock != nullptr); -        UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":" -                               << ntohs(dst_port) << " and NIC(" << _port_id -                               << "):" << ntohs(sockarg.local_port)); + +        _recv_io = _io_service->make_recv_client(_link, +            link_params.num_recv_frames, +            recv_cb, +            nullptr, // No send/fc link +            0, // No send frames +            fc_cb); + +        auto send_cb = [this](buff_t::uptr buff, transport::send_link_if*) { +            this->_send_callback(std::move(buff)); +        }; +        _send_io = _io_service->make_send_client(_link, +            link_params.num_send_frames, +            send_cb, +            nullptr, // no FC link +            0, +            nullptr, // No receive callback necessary +            [](const size_t) { return true; } // We can always send +        ); +        UHD_LOG_TRACE("DPDK::SIMPLE", "Constructor complete");      } -    ~dpdk_simple_impl(void) {} +    ~dpdk_simple_impl(void) +    { +        UHD_LOG_TRACE("DPDK::SIMPLE", +            "~dpdk_simple_impl(), DPDK port index " << _link->get_port()->get_port_id()); +        // Disconnect the clients from the I/O service +        _send_io.reset(); +        _recv_io.reset(); +        // Disconnect the link from the I/O service +        _io_service->detach_recv_link(_link); +        _io_service->detach_send_link(_link); +    } -    /*! -     * Send and release outstanding buffer +    /*! Send and release outstanding buffer       *       * \param length bytes of data to send       * \return number of bytes sent (releases buffer if sent)       */ -    size_t send(const boost::asio::const_buffer& buff) +    size_t send(const boost::asio::const_buffer& user_buff)      { -        struct rte_mbuf* tx_mbuf; -        size_t frame_size = _get_tx_buf(&tx_mbuf); -        UHD_ASSERT_THROW(tx_mbuf) -        size_t nbytes = boost::asio::buffer_size(buff); -        UHD_ASSERT_THROW(nbytes <= frame_size) -        const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(buff); - -        uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_tx_sock, tx_mbuf); -        std::memcpy(pkt_data, user_data, nbytes); -        tx_mbuf->pkt_len = nbytes; -        tx_mbuf->data_len = nbytes; - -        int num_tx = uhd_dpdk_send(_tx_sock, &tx_mbuf, 1); -        if (num_tx == 0) -            return 0; +        // Extract buff and sanity check +        const size_t nbytes = boost::asio::buffer_size(user_buff); +        UHD_ASSERT_THROW(nbytes <= _link->get_send_frame_size()) +        const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(user_buff); + +        // Get send buff +        auto buff = _send_io->get_send_buff(SEND_TIMEOUT_MS); +        UHD_ASSERT_THROW(buff); +        buff->set_packet_size(nbytes); +        std::memcpy(buff->data(), user_data, nbytes); + +        // Release send buff (send the packet) +        _send_io->release_send_buff(std::move(buff));          return nbytes;      } -    /*! -     * Receive a single packet. +    /*! Receive a single packet. +     *       * Buffer provided by transport (must be freed before next operation).       *       * \param buf a pointer to place to write buffer location       * \param timeout the timeout in seconds       * \return the number of bytes received or zero on timeout       */ -    size_t recv(const boost::asio::mutable_buffer& buff, double timeout) +    size_t recv(const boost::asio::mutable_buffer& user_buff, double timeout)      { -        struct rte_mbuf *rx_mbuf; -        size_t buff_size = boost::asio::buffer_size(buff); -        uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(buff); +        size_t user_buff_size = boost::asio::buffer_size(user_buff); +        uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(user_buff); -        int bufs = uhd_dpdk_recv(_rx_sock, &rx_mbuf, 1, (int) (timeout*USEC)); -        if (bufs != 1 || rx_mbuf == nullptr) { -            return 0; -        } -        if ((rx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { -            uhd_dpdk_free_buf(rx_mbuf); +        auto buff = _recv_io->get_recv_buff(static_cast<int32_t>(timeout * 1e3)); +        if (!buff) {              return 0;          } -        uhd_dpdk_get_src_ipv4(_rx_sock, rx_mbuf, &_last_recv_addr); -        const size_t nbytes = uhd_dpdk_get_len(_rx_sock, rx_mbuf); -        UHD_ASSERT_THROW(nbytes <= buff_size); +        // Extract the sender's address. This is only possible because we know +        // the memory layout of the buff +        struct udp_hdr* udp_hdr_end = (struct udp_hdr*)buff->data(); +        struct ipv4_hdr* ip_hdr_end = (struct ipv4_hdr*)(&udp_hdr_end[-1]); +        struct ipv4_hdr* ip_hdr     = (struct ipv4_hdr*)(&ip_hdr_end[-1]); +        _last_recv_addr             = ip_hdr->src_addr; + +        // Extract the buffer data +        const size_t copy_len = std::min(user_buff_size, buff->packet_size()); +        if (copy_len < buff->packet_size()) { +            UHD_LOG_WARNING("DPDK", "Truncating recv packet"); +        } +        std::memcpy(user_data, buff->data(), copy_len); -        uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_rx_sock, rx_mbuf); -        std::memcpy(user_data, pkt_data, nbytes); -        _put_rx_buf(rx_mbuf); -        return nbytes; +        // Housekeeping +        _recv_io->release_recv_buff(std::move(buff)); +        return copy_len;      } -    /*! -     * Get the last IP address as seen by recv(). -     * Only use this with the broadcast socket. -     */      std::string get_recv_addr(void)      { -        char addr_str[INET_ADDRSTRLEN]; -        struct in_addr ipv4_addr; -        ipv4_addr.s_addr = _last_recv_addr; -        inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); -        return std::string(addr_str); +        return dpdk::ipv4_num_to_str(_last_recv_addr);      } -    /*! -     * Get the IP address for the destination -     */      std::string get_send_addr(void)      { -        struct in_addr ipv4_addr; -        int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr); -        UHD_ASSERT_THROW(status); -        char addr_str[INET_ADDRSTRLEN]; -        inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); -        return std::string(addr_str); +        return dpdk::ipv4_num_to_str(_link->get_remote_ipv4());      } +  private: -    /*! -     * Request a single send buffer of specified size. -     * -     * \param buf a pointer to place to write buffer location -     * \return the maximum length of the buffer -     */ -    size_t _get_tx_buf(struct rte_mbuf** buf) +    using buff_t = frame_buff; + +    link_params_t _get_default_link_params()      { -        int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, buf, 1, 0); -        if (bufs != 1) { -            *buf = nullptr; -            return 0; -        } -        return _mtu - DPDK_SIMPLE_NONDATA_SIZE; +        link_params_t link_params; +        link_params.recv_frame_size = 8000; +        link_params.send_frame_size = 8000; +        link_params.num_recv_frames = 1; +        link_params.num_send_frames = 1; +        link_params.recv_buff_size  = 8000; +        link_params.send_buff_size  = 8000; +        return link_params;      } -    /*! -     * Return/free receive buffer -     * Can also use to free un-sent TX bufs -     */ -    void _put_rx_buf(struct rte_mbuf *rx_mbuf) +    void _send_callback(buff_t::uptr buff) +    { +        _link->release_send_buff(std::move(buff)); +    } + +    bool _recv_callback(buff_t::uptr&)      { -        UHD_ASSERT_THROW(rx_mbuf) -        uhd_dpdk_free_buf(rx_mbuf); +        // Queue it up +        return true;      } +    void _recv_fc_callback(buff_t::uptr buff) +    { +        _link->release_recv_buff(std::move(buff)); +    } + +    /*** Attributes **********************************************************/      unsigned int _port_id; -    size_t _mtu; -    struct uhd_dpdk_socket *_tx_sock; -    struct uhd_dpdk_socket *_rx_sock;      uint32_t _last_recv_addr; + +    udp_dpdk_link::sptr _link; + +    dpdk_io_service::sptr _io_service; + +    send_io_if::sptr _send_io; + +    recv_io_if::sptr _recv_io;  };  dpdk_simple::~dpdk_simple(void) {} @@ -182,16 +206,16 @@ dpdk_simple::~dpdk_simple(void) {}   * DPDK simple transport public make functions   **********************************************************************/  udp_simple::sptr dpdk_simple::make_connected( -    struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port -){ -    return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, true)); +    const std::string& addr, const std::string& port) +{ +    return udp_simple::sptr(new dpdk_simple_impl(addr, port));  } +// For DPDK, this is not special and the same as make_connected  udp_simple::sptr dpdk_simple::make_broadcast( -    struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port -){ -    return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, false)); +    const std::string& addr, const std::string& port) +{ +    return udp_simple::sptr(new dpdk_simple_impl(addr, port));  }  }} // namespace uhd::transport - diff --git a/host/lib/transport/udp_dpdk_link.cpp b/host/lib/transport/udp_dpdk_link.cpp new file mode 100644 index 000000000..dc56de43c --- /dev/null +++ b/host/lib/transport/udp_dpdk_link.cpp @@ -0,0 +1,198 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/config.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/static.hpp> +#include <uhdlib/transport/adapter.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> +#include <arpa/inet.h> +#include <memory> + +using namespace uhd::transport; +using namespace uhd::transport::dpdk; + +udp_dpdk_link::udp_dpdk_link(dpdk::port_id_t port_id, +    const std::string& remote_addr, +    const std::string& remote_port, +    const std::string& local_port, +    const link_params_t& params) +    : _num_recv_frames(params.num_recv_frames) +    , _recv_frame_size(params.recv_frame_size) +    , _num_send_frames(params.num_send_frames) +    , _send_frame_size(params.send_frame_size) +{ +    // Get a reference to the context, since this class manages DPDK memory +    _ctx = dpdk_ctx::get(); +    UHD_ASSERT_THROW(_ctx); + +    // Fill in remote IPv4 address and UDP port +    // NOTE: Remote MAC address is filled in later by I/O service +    int status = inet_pton(AF_INET, remote_addr.c_str(), &_remote_ipv4); +    if (status != 1) { +        UHD_LOG_ERROR("DPDK", std::string("Invalid destination address ") + remote_addr); +        throw uhd::runtime_error( +            std::string("DPDK: Invalid destination address ") + remote_addr); +    } +    _remote_port = rte_cpu_to_be_16(std::stoul(remote_port)); + +    // Grab the port with a route to the remote host +    _port = _ctx->get_port(port_id); + +    uint16_t local_port_num = rte_cpu_to_be_16(std::stoul(local_port)); +    // Get an unused UDP port for listening +    _local_port = _port->alloc_udp_port(local_port_num); + +    // Validate params +    const size_t max_frame_size = _port->get_mtu() - dpdk::HDR_SIZE_UDP_IPV4; +    UHD_ASSERT_THROW(params.send_frame_size <= max_frame_size); +    UHD_ASSERT_THROW(params.recv_frame_size <= max_frame_size); + +    // Register the adapter +    auto info      = _port->get_adapter_info(); +    auto& adap_ctx = adapter_ctx::get(); +    _adapter_id    = adap_ctx.register_adapter(info); +    UHD_LOGGER_TRACE("DPDK") << boost::format("Created udp_dpdk_link to (%s:%s)") +                                    % remote_addr % remote_port; +    UHD_LOGGER_TRACE("DPDK") +        << boost::format("num_recv_frames=%d, recv_frame_size=%d, num_send_frames=%d, " +                         "send_frame_size=%d") +               % params.num_recv_frames % params.recv_frame_size % params.num_send_frames +               % params.send_frame_size; +} + +udp_dpdk_link::~udp_dpdk_link() {} + +udp_dpdk_link::sptr udp_dpdk_link::make(const std::string& remote_addr, +    const std::string& remote_port, +    const link_params_t& params) +{ +    auto ctx  = dpdk::dpdk_ctx::get(); +    auto port = ctx->get_route(remote_addr); +    if (!port) { +        UHD_LOG_ERROR("DPDK", +            std::string("Could not find route to destination address ") + remote_addr); +        throw uhd::runtime_error( +            std::string("DPDK: Could not find route to destination address ") +            + remote_addr); +    } +    return make(port->get_port_id(), remote_addr, remote_port, "0", params); +} + +udp_dpdk_link::sptr udp_dpdk_link::make(const dpdk::port_id_t port_id, +    const std::string& remote_addr, +    const std::string& remote_port, +    const std::string& local_port, +    const link_params_t& params) +{ +    UHD_ASSERT_THROW(params.recv_frame_size > 0); +    UHD_ASSERT_THROW(params.send_frame_size > 0); +    UHD_ASSERT_THROW(params.num_send_frames > 0); +    UHD_ASSERT_THROW(params.num_recv_frames > 0); + +    return std::make_shared<udp_dpdk_link>( +        port_id, remote_addr, remote_port, local_port, params); +} + +void udp_dpdk_link::enqueue_recv_mbuf(struct rte_mbuf* mbuf) +{ +    // Get packet size +    struct udp_hdr* hdr = rte_pktmbuf_mtod_offset( +        mbuf, struct udp_hdr*, sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr)); +    size_t packet_size = rte_be_to_cpu_16(hdr->dgram_len) - sizeof(struct udp_hdr); +    // Prepare the dpdk_frame_buff +    auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf); +    buff->header_jump( +        sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); +    buff->set_packet_size(packet_size); +    // Add the dpdk_frame_buff to the list +    if (_recv_buff_head) { +        buff->prev                  = _recv_buff_head->prev; +        buff->next                  = _recv_buff_head; +        _recv_buff_head->prev->next = buff; +        _recv_buff_head->prev       = buff; +    } else { +        _recv_buff_head = buff; +        buff->next      = buff; +        buff->prev      = buff; +    } +} + +frame_buff::uptr udp_dpdk_link::get_recv_buff(int32_t /*timeout_ms*/) +{ +    auto buff = _recv_buff_head; +    if (buff) { +        if (_recv_buff_head->next == buff) { +            /* Only had the one buff, so the list is empty */ +            _recv_buff_head = nullptr; +        } else { +            /* Make the next buff the new list head */ +            _recv_buff_head->next->prev = _recv_buff_head->prev; +            _recv_buff_head->prev->next = _recv_buff_head->next; +            _recv_buff_head             = _recv_buff_head->next; +        } +        buff->next = nullptr; +        buff->prev = nullptr; +        return frame_buff::uptr(buff); +    } +    return frame_buff::uptr(); +} + +void udp_dpdk_link::release_recv_buff(frame_buff::uptr buff) +{ +    dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release(); +    assert(buff_ptr); +    rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +} + +frame_buff::uptr udp_dpdk_link::get_send_buff(int32_t /*timeout_ms*/) +{ +    auto mbuf = rte_pktmbuf_alloc(_port->get_tx_pktbuf_pool()); +    if (mbuf) { +        auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf); +        buff->header_jump( +            sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); +        return frame_buff::uptr(buff); +    } +    return frame_buff::uptr(); +} + +void udp_dpdk_link::release_send_buff(frame_buff::uptr buff) +{ +    dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release(); +    assert(buff_ptr); +    auto mbuf = buff_ptr->get_pktmbuf(); +    if (buff_ptr->packet_size()) { +        // Fill in L2 header +        auto local_mac           = _port->get_mac_addr(); +        struct ether_hdr* l2_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); +        ether_addr_copy(&_remote_mac, &l2_hdr->d_addr); +        ether_addr_copy(&local_mac, &l2_hdr->s_addr); +        l2_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); +        // Fill in L3 and L4 headers +        dpdk::fill_udp_hdr(mbuf, +            _port, +            _remote_ipv4, +            _local_port, +            _remote_port, +            buff_ptr->packet_size()); +        // Prepare the packet buffer and send it out +        int status = rte_eth_tx_prepare(_port->get_port_id(), _queue, &mbuf, 1); +        if (status != 1) { +            throw uhd::runtime_error("DPDK: Failed to prepare TX buffer for send"); +        } +        status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1); +        while (status != 1) { +            status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1); +            // FIXME: Should we make available retrying? +            // throw uhd::runtime_error("DPDK: Failed to send TX buffer"); +        } +    } else { +        // Release the buffer if there is nothing in it +        rte_pktmbuf_free(mbuf); +    } +} diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index ea78aa1e8..a13886653 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -19,9 +19,11 @@ if(ENABLE_DPDK)      LIBUHD_APPEND_SOURCES(          ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp +        ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp      )      set_source_files_properties(          ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp +        ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp          PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE"      )      include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp index 43a1507bb..46818e973 100644 --- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -3,8 +3,13 @@  //  // SPDX-License-Identifier: GPL-3.0-or-later  // + +#include <uhd/utils/algorithm.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/arp.hpp>  #include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp>  #include <uhdlib/utils/prefs.hpp>  #include <arpa/inet.h>  #include <rte_arp.h> @@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512;  inline char* eal_add_opt(      std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)  { +    UHD_LOG_TRACE("DPDK", opt << " " << arg);      char* ptr = dst;      strncpy(ptr, opt, n);      argv.push_back(ptr); @@ -34,15 +40,6 @@ inline char* eal_add_opt(      return ptr;  } -inline std::string eth_addr_to_string(const struct ether_addr mac_addr) -{ -    auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); -    mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] -        % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] -        % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; -    return mac_stream.str(); -} -  inline void separate_ipv4_addr(      const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask)  { @@ -59,28 +56,29 @@ inline void separate_ipv4_addr(  dpdk_port::uptr dpdk_port::make(port_id_t port,      size_t mtu,      uint16_t num_queues, -    size_t num_mbufs, +    uint16_t num_desc,      struct rte_mempool* rx_pktbuf_pool,      struct rte_mempool* tx_pktbuf_pool,      std::string ipv4_address)  {      return std::make_unique<dpdk_port>( -        port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); +        port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);  }  dpdk_port::dpdk_port(port_id_t port,      size_t mtu,      uint16_t num_queues, -    size_t num_mbufs, +    uint16_t num_desc,      struct rte_mempool* rx_pktbuf_pool,      struct rte_mempool* tx_pktbuf_pool,      std::string ipv4_address)      : _port(port)      , _mtu(mtu) +    , _num_queues(num_queues)      , _rx_pktbuf_pool(rx_pktbuf_pool)      , _tx_pktbuf_pool(tx_pktbuf_pool)  { -    /* 1. Set MTU and IPv4 address */ +    /* Set MTU and IPv4 address */      int retval;      retval = rte_eth_dev_set_mtu(_port, _mtu); @@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port,      separate_ipv4_addr(ipv4_address, _ipv4, _netmask); -    /* 2. Set hardware offloads */ +    /* Set hardware offloads */      struct rte_eth_dev_info dev_info;      rte_eth_dev_info_get(_port, &dev_info);      uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; @@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port,          throw uhd::runtime_error("DPDK: Failed to configure the device");      } -    /* 3. Set descriptor ring sizes */ -    uint16_t rx_desc = num_mbufs; +    /* Set descriptor ring sizes */ +    uint16_t rx_desc = num_desc;      if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc          || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) {          UHD_LOGGER_ERROR("DPDK")              << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") -                   % _port % num_mbufs % dev_info.rx_desc_lim.nb_min +                   % _port % num_desc % dev_info.rx_desc_lim.nb_min                     % dev_info.rx_desc_lim.nb_max;          UHD_LOGGER_ERROR("DPDK")              << boost::format("Num RX descriptors must also be aligned to 0x%x") @@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port,          throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors");      } -    uint16_t tx_desc = num_mbufs; +    uint16_t tx_desc = num_desc;      if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc          || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) {          UHD_LOGGER_ERROR("DPDK")              << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") -                   % _port % num_mbufs % dev_info.tx_desc_lim.nb_min +                   % _port % num_desc % dev_info.tx_desc_lim.nb_min                     % dev_info.tx_desc_lim.nb_max;          UHD_LOGGER_ERROR("DPDK")              << boost::format("Num TX descriptors must also be aligned to 0x%x") @@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port,          throw uhd::runtime_error("DPDK: Failed to configure the DMA queues");      } -    /* 4. Set up the RX and TX DMA queues (May not be generally supported after +    /* Set up the RX and TX DMA queues (May not be generally supported after       * eth_dev_start) */      unsigned int cpu_socket = rte_eth_dev_socket_id(_port);      for (uint16_t i = 0; i < _num_queues; i++) { @@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port,          }      } -    /* 5. Set up initial flow table */ +    /* TODO: Enable multiple queues (only support 1 right now) */ -    /* Append all free queues except 0, which is reserved for ARP */ -    _free_queues.reserve(_num_queues - 1); -    for (unsigned int i = 1; i < _num_queues; i++) { -        _free_queues.push_back(i); -    } - -    // struct rte_flow_attr flow_attr; -    // flow_attr.group = 0; -    // flow_attr.priority = 1; -    // flow_attr.ingress = 1; -    // flow_attr.egress = 0; -    // flow_attr.transfer = 0; -    // flow_attr.reserved = 0; - -    // struct rte_flow_item[] flow_pattern = { -    //}; -    // int rte_flow_validate(uint16_t port_id, -    //              const struct rte_flow_attr *attr, -    //              const struct rte_flow_item pattern[], -    //              const struct rte_flow_action actions[], -    //              struct rte_flow_error *error); -    // struct rte_flow * rte_flow_create(uint16_t port_id, -    //            const struct rte_flow_attr *attr, -    //            const struct rte_flow_item pattern[], -    //            const struct rte_flow_action *actions[], -    //            struct rte_flow_error *error); - -    /* 6. Start the Ethernet device */ +    /* Start the Ethernet device */      retval = rte_eth_dev_start(_port);      if (retval < 0) {          UHD_LOGGER_ERROR("DPDK") @@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port,                               << " MAC: " << eth_addr_to_string(_mac_addr);  } -/* TODO: Do flow directions */ -queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +dpdk_port::~dpdk_port()  { -    std::lock_guard<std::mutex> lock(_mutex); -    UHD_ASSERT_THROW(_free_queues.size() != 0); -    auto queue = _free_queues.back(); -    _free_queues.pop_back(); -    return queue; +    rte_eth_dev_stop(_port); +    rte_spinlock_lock(&_spinlock); +    for (auto kv : _arp_table) { +        for (auto req : kv.second->reqs) { +            req->cond.notify_one(); +        } +        rte_free(kv.second); +    } +    _arp_table.clear(); +    rte_spinlock_unlock(&_spinlock);  } -void dpdk_port::free_queue(queue_id_t queue) +uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port)  { +    uint16_t port_selected;      std::lock_guard<std::mutex> lock(_mutex); -    auto flow  = _flow_rules.at(queue); -    int status = rte_flow_destroy(_port, flow, NULL); -    if (status) { -        UHD_LOGGER_ERROR("DPDK") -            << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port -                   % queue; -        throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); +    if (udp_port) { +        if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) { +            return 0; +        } +        port_selected = rte_be_to_cpu_16(udp_port);      } else { -        _flow_rules.erase(queue); +        if (_udp_ports.size() >= 65535) { +            UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain"); +            return 0; +        } +        port_selected = _next_udp_port; +        while (true) { +            if (port_selected == 0) { +                continue; +            } +            if (_udp_ports.count(port_selected) == 0) { +                _next_udp_port = port_selected - 1; +                break; +            } +            if (port_selected - 1 == _next_udp_port) { +                return 0; +            } +            port_selected--; +        }      } -    _free_queues.push_back(queue); +    _udp_ports.insert(port_selected); +    return rte_cpu_to_be_16(port_selected);  } -int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req)  {      struct rte_mbuf* mbuf;      struct ether_hdr* hdr;      struct arp_hdr* arp_frame; -    mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); +    mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool);      if (unlikely(mbuf == NULL)) {          UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response");          return -ENOMEM; @@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar      mbuf->pkt_len  = 42;      mbuf->data_len = 42; -    // ARP replies always on queue 0 -    if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { +    if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) {          UHD_LOGGER_WARNING("DPDK")              << boost::format("%s: TX descriptor ring is full") % __func__;          rte_pktmbuf_free(mbuf); @@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar      return 0;  } -// TODO: ARP processing for queue 0 -// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr -// *arp_frame) -//{ -//    std::lock_guard<std::mutex> lock(_mutex); -//    uint32_t dest_ip = arp_frame->arp_data.arp_sip; -//    struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; -// -//    /* Add entry to ARP table */ -//    struct uhd_dpdk_arp_entry *entry = NULL; -//    rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); -//    if (!entry) { -//        entry = rte_zmalloc(NULL, sizeof(*entry), 0); -//        if (!entry) { -//            return -ENOMEM; -//        } -//        LIST_INIT(&entry->pending_list); -//        ether_addr_copy(&dest_addr, &entry->mac_addr); -//        if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { -//            rte_free(entry); -//            return -ENOSPC; -//        } -//    } else { -//        struct uhd_dpdk_config_req *req = NULL; -//        ether_addr_copy(&dest_addr, &entry->mac_addr); -//        /* Now wake any config reqs waiting for the ARP */ -//        LIST_FOREACH(req, &entry->pending_list, entry) { -//            _uhd_dpdk_config_req_compl(req, 0); -//        } -//        while (entry->pending_list.lh_first != NULL) { -//            LIST_REMOVE(entry->pending_list.lh_first, entry); -//        } -//    } -//    /* Respond if this was an ARP request */ -//    if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && -//        arp_frame->arp_data.arp_tip == port->ipv4_addr) { -//        _arp_reply(tx_pktbuf_pool, arp_frame); -//    } -// -//    return 0; -// -//} - -static dpdk_ctx* global_ctx = nullptr; +static dpdk_ctx::sptr global_ctx = nullptr;  static std::mutex global_ctx_mutex;  dpdk_ctx::sptr dpdk_ctx::get()  {      std::lock_guard<std::mutex> lock(global_ctx_mutex);      if (!global_ctx) { -        auto new_ctx = std::make_shared<dpdk_ctx>(); -        global_ctx   = new_ctx.get(); -        return new_ctx; +        global_ctx = std::make_shared<dpdk_ctx>();      } -    return global_ctx->shared_from_this(); +    return global_ctx;  }  dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} @@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args)      auto args = new std::array<char, 4096>();      char* opt = args->data();      char* end = args->data() + args->size(); +    UHD_LOG_TRACE("DPDK", "EAL init options: ");      for (std::string& key : eal_args.keys()) {          std::string val = eal_args[key];          if (key == "dpdk_coremask") { @@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args)          _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS);          _mbuf_cache_size =              dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); +        UHD_LOG_TRACE("DPDK", +            "mtu: " << _mtu << " num_mbufs: " << _num_mbufs +                    << " mbuf_cache_size: " << _mbuf_cache_size);          /* Get device info for all the NIC ports */          int num_dpdk_ports = rte_eth_dev_count_avail(); -        UHD_ASSERT_THROW(num_dpdk_ports > 0); +        if (num_dpdk_ports == 0) { +            UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!"); +            throw uhd::runtime_error("No available DPDK devices (ports) found!"); +        }          device_addrs_t nics(num_dpdk_ports);          RTE_ETH_FOREACH_DEV(i)          { @@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args)              }              /* Now combine user args with conf file */              auto conf = uhd::prefs::get_dpdk_nic_args(nic); +            // TODO: Enable the use of multiple DMA queues +            conf["dpdk_num_queues"] = "1";              /* Update config, and remove ports that aren't fully configured */              if (conf.has_key("dpdk_ipv4")) { @@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args)              }          } +        std::map<size_t, std::vector<size_t>> lcore_to_port_id_map;          RTE_ETH_FOREACH_DEV(i)          {              auto& conf = nics.at(i);              if (conf.has_key("dpdk_ipv4")) { +                UHD_ASSERT_THROW(conf.has_key("dpdk_lcore")); +                const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0); +                if (!lcore_to_port_id_map.count(lcore_id)) { +                    lcore_to_port_id_map.insert({lcore_id, {}}); +                } +                  // Allocating enough buffers for all DMA queues for each CPU socket                  // - This is a bit inefficient for larger systems, since NICs may not                  //   all be on one socket @@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args)                  _ports[i] = dpdk_port::make(i,                      _mtu,                      conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), -                    _num_mbufs, +                    conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE),                      rx_pool,                      tx_pool,                      conf["dpdk_ipv4"]); + +                // Remember all port IDs that map to an lcore +                lcore_to_port_id_map.at(lcore_id).push_back(i);              }          } @@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args)              rte_eth_link_get(portid, &link);              unsigned int link_status = link.link_status;              unsigned int link_speed  = link.link_speed; -            UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") -                                            % portid % link_status % link_speed; +            UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid +                                            % link_status % link_speed;          } -        UHD_LOG_TRACE("DPDK", "Init DONE!"); - +        UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services");          _init_done = true; + +        // Links are up, now create one IO service per lcore +        for (auto& lcore_portids_pair : lcore_to_port_id_map) { +            const size_t lcore_id = lcore_portids_pair.first; +            std::vector<dpdk_port*> dpdk_ports; +            dpdk_ports.reserve(lcore_portids_pair.second.size()); +            for (const size_t port_id : lcore_portids_pair.second) { +                dpdk_ports.push_back(get_port(port_id)); +            } +            const size_t servq_depth = 32; // FIXME +            UHD_LOG_TRACE("DPDK", +                "Creating I/O service for lcore " +                    << lcore_id << ", servicing " << dpdk_ports.size() +                    << " ports, service queue depth " << servq_depth); +            _io_srv_portid_map.insert( +                {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth), +                    lcore_portids_pair.second}); +        }      }  } @@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const      return link.link_status;  } -int dpdk_ctx::get_route(const std::string& addr) const +dpdk_port* dpdk_ctx::get_route(const std::string& addr) const  {      const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());      for (const auto& port : _ports) { @@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const          uint32_t src_ipv4 = port.second->get_ipv4();          uint32_t netmask  = port.second->get_netmask();          if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { -            return (int)port.first; +            return port.second.get();          }      } -    return -ENODEV; +    return NULL;  } @@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const      return _init_done.load();  } +uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id) +{ +    for (auto& io_srv_portid_pair : _io_srv_portid_map) { +        if (uhd::has(io_srv_portid_pair.second, port_id)) { +            return io_srv_portid_pair.first; +        } +    } + +    std::string err_msg = std::string("Cannot look up I/O service for port ID: ") +                          + std::to_string(port_id) + ". No such port ID!"; +    UHD_LOG_ERROR("DPDK", err_msg); +    throw uhd::lookup_error(err_msg); +} +  struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(      unsigned int cpu_socket, size_t num_bufs)  { @@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(          const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM;          char name[32];          snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); -        _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( -            name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); +        _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name, +            num_bufs, +            _mbuf_cache_size, +            DPDK_MBUF_PRIV_SIZE, +            mbuf_size, +            SOCKET_ID_ANY);          if (!_rx_pktbuf_pools.at(cpu_socket)) {              UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool");              throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp new file mode 100644 index 000000000..1fcedca51 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp @@ -0,0 +1,950 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> +#include <uhdlib/utils/narrow.hpp> +#include <cmath> + +/* + * Memory management + * + * Every object that allocates and frees DPDK memory has a reference to the + * dpdk_ctx. + * + * Ownership hierarchy: + * + * dpdk_io_service_mgr (1) => + *     dpdk_ctx::sptr + *     dpdk_io_service::sptr + * + * xport (1) => + *     dpdk_send_io::sptr + *     dpdk_recv_io::sptr + * + * usrp link_mgr (1) => + *     udp_dpdk_link::sptr + * + * dpdk_send_io (2) => + *     dpdk_ctx::sptr + *     dpdk_io_service::sptr + * + * dpdk_recv_io (2) => + *     dpdk_ctx::sptr + *     dpdk_io_service::sptr + * + * dpdk_io_service (3) => + *     dpdk_ctx::wptr (weak_ptr) + *     udp_dpdk_link::sptr + * + * udp_dpdk_link (4) => + *     dpdk_ctx::sptr + */ + +using namespace uhd::transport; + +dpdk_io_service::dpdk_io_service( +    unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +    : _ctx(dpdk::dpdk_ctx::get()) +    , _lcore_id(lcore_id) +    , _ports(ports) +    , _servq(servq_depth, lcore_id) +{ +    UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id); +    for (auto port : _ports) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", +            "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id()); +        _tx_queues[port->get_port_id()]      = std::list<dpdk_send_io*>(); +        _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>(); +    } +    int status = rte_eal_remote_launch(_io_worker, this, lcore_id); +    if (status) { +        throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore"); +    } +} + +dpdk_io_service::sptr dpdk_io_service::make( +    unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +{ +    return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth)); +} + +dpdk_io_service::~dpdk_io_service() +{ +    UHD_LOG_TRACE( +        "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id); +    dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL); +    if (!req) { +        UHD_LOG_ERROR("DPDK::IO_SERVICE", +            "Could not allocate request for lcore termination for lcore " << _lcore_id); +        return; +    } +    dpdk::wait_req_get(req); +    _servq.submit(req, std::chrono::microseconds(-1)); +    dpdk::wait_req_put(req); +} + +void dpdk_io_service::attach_recv_link(recv_link_if::sptr link) +{ +    struct dpdk_flow_data data; +    data.link    = dynamic_cast<udp_dpdk_link*>(link.get()); +    data.is_recv = true; +    assert(data.link); +    auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    dpdk::wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _recv_links.push_back(link); +    } +} + +void dpdk_io_service::attach_send_link(send_link_if::sptr link) +{ +    udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get()); +    assert(dpdk_link); + +    // First, fill in destination MAC address +    struct dpdk::arp_request arp_data; +    arp_data.tpa  = dpdk_link->get_remote_ipv4(); +    arp_data.port = dpdk_link->get_port()->get_port_id(); +    if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) { +        // If a broadcast IP, skip the ARP and fill with broadcast MAC addr +        memset(arp_data.tha.addr_bytes, 0xFF, 6); +    } else { +        auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); +        if (!arp_req) { +            UHD_LOG_ERROR( +                "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request"); +            throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request"); +        } +        if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) { +            // Try one more time... +            auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); +            if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) { +                wait_req_put(arp_req); +                wait_req_put(arp_req2); +                throw uhd::io_error("DPDK: Could not reach host"); +            } +            wait_req_put(arp_req2); +        } +        wait_req_put(arp_req); +    } +    dpdk_link->set_remote_mac(arp_data.tha); + +    // Then, submit the link to the I/O service thread +    struct dpdk_flow_data data; +    data.link    = dpdk_link; +    data.is_recv = false; +    auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _send_links.push_back(link); +    } +} + +void dpdk_io_service::detach_recv_link(recv_link_if::sptr link) +{ +    auto link_ptr = link.get(); +    struct dpdk_flow_data data; +    data.link    = dynamic_cast<udp_dpdk_link*>(link_ptr); +    data.is_recv = true; +    auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _recv_links.remove_if( +            [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); +    } +} + +void dpdk_io_service::detach_send_link(send_link_if::sptr link) +{ +    auto link_ptr = link.get(); +    struct dpdk_flow_data data; +    data.link    = dynamic_cast<udp_dpdk_link*>(link_ptr); +    data.is_recv = false; +    auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _send_links.remove_if( +            [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); +    } +} + +recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link, +    size_t num_recv_frames, +    recv_callback_t cb, +    send_link_if::sptr /*fc_link*/, +    size_t num_send_frames, +    recv_io_if::fc_callback_t fc_cb) +{ +    auto link    = dynamic_cast<udp_dpdk_link*>(data_link.get()); +    auto recv_io = std::make_shared<dpdk_recv_io>( +        shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb); + +    // Register with I/O service +    recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get()); +    auto xport_req                 = dpdk::wait_req_alloc( +        dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if); +    _servq.submit(xport_req, std::chrono::microseconds(-1)); +    wait_req_put(xport_req); +    return recv_io; +} + +send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link, +    size_t num_send_frames, +    send_io_if::send_callback_t send_cb, +    recv_link_if::sptr /*recv_link*/, +    size_t num_recv_frames, +    recv_callback_t recv_cb, +    send_io_if::fc_callback_t fc_cb) +{ +    auto link    = dynamic_cast<udp_dpdk_link*>(send_link.get()); +    auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(), +        link, +        num_send_frames, +        send_cb, +        num_recv_frames, +        recv_cb, +        fc_cb); + +    // Register with I/O service +    send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get()); +    auto xport_req                 = dpdk::wait_req_alloc( +        dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if); +    _servq.submit(xport_req, std::chrono::microseconds(-1)); +    wait_req_put(xport_req); +    return send_io; +} + + +int dpdk_io_service::_io_worker(void* arg) +{ +    if (!arg) +        return -EINVAL; +    dpdk_io_service* srv = (dpdk_io_service*)arg; + +    /* Check that this is a valid lcore */ +    unsigned int lcore_id = rte_lcore_id(); +    if (lcore_id == LCORE_ID_ANY) +        return -ENODEV; + +    /* Check that this lcore has ports */ +    if (srv->_ports.size() == 0) +        return -ENODEV; + +    char name[16]; +    snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id); +    rte_thread_setname(pthread_self(), name); +    UHD_LOG_TRACE("DPDK::IO_SERVICE", +        "I/O service thread '" << name << "' started on lcore " << lcore_id); + +    uhd::set_thread_priority_safe(); + +    snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id); +    struct rte_hash_parameters hash_params = {.name = name, +        .entries                                    = MAX_FLOWS, +        .reserved                                   = 0, +        .key_len                                    = sizeof(struct dpdk::ipv4_5tuple), +        .hash_func                                  = NULL, +        .hash_func_init_val                         = 0, +        .socket_id  = uhd::narrow_cast<int>(rte_socket_id()), +        .extra_flag = 0}; +    srv->_rx_table                         = rte_hash_create(&hash_params); +    if (srv->_rx_table == NULL) { +        return rte_errno; +    } + +    int status = 0; +    while (!status) { +        /* For each port, attempt to receive packets and process */ +        for (auto port : srv->_ports) { +            srv->_rx_burst(port, 0); +        } +        /* For each port's TX queues, do TX */ +        for (auto port : srv->_ports) { +            srv->_tx_burst(port); +        } +        /* For each port's RX release queues, release buffers */ +        for (auto port : srv->_ports) { +            srv->_rx_release(port); +        } +        /* Retry waking clients */ +        if (srv->_retry_head) { +            dpdk_io_if* node = srv->_retry_head; +            dpdk_io_if* end  = srv->_retry_head->prev; +            while (true) { +                dpdk_io_if* next = node->next; +                srv->_wake_client(node); +                if (node == end) { +                    break; +                } else { +                    node = next; +                    next = node->next; +                } +            } +        } +        /* Check for open()/close()/term() requests and service 1 at a time +         * Leave this last so we immediately terminate if requested +         */ +        status = srv->_service_requests(); +    } + +    return status; +} + +int dpdk_io_service::_service_requests() +{ +    for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) { +        /* Dequeue */ +        dpdk::wait_req* req = _servq.pop(); +        if (!req) { +            break; +        } +        switch (req->reason) { +            case dpdk::wait_type::WAIT_SIMPLE: +                while (_servq.complete(req) == -ENOBUFS) +                    ; +                break; +            case dpdk::wait_type::WAIT_RX: +            case dpdk::wait_type::WAIT_TX_BUF: +                throw uhd::not_implemented_error( +                    "DPDK: _service_requests(): DPDK is still a WIP"); +            case dpdk::wait_type::WAIT_FLOW_OPEN: +                _service_flow_open(req); +                break; +            case dpdk::wait_type::WAIT_FLOW_CLOSE: +                _service_flow_close(req); +                break; +            case dpdk::wait_type::WAIT_XPORT_CONNECT: +                _service_xport_connect(req); +                break; +            case dpdk::wait_type::WAIT_XPORT_DISCONNECT: +                _service_xport_disconnect(req); +                break; +            case dpdk::wait_type::WAIT_ARP: { +                assert(req->data != NULL); +                int arp_status = _service_arp_request(req); +                assert(arp_status != -ENOMEM); +                if (arp_status == 0) { +                    while (_servq.complete(req) == -ENOBUFS) +                        ; +                } +                break; +            } +            case dpdk::wait_type::WAIT_LCORE_TERM: +                rte_free(_rx_table); +                while (_servq.complete(req) == -ENOBUFS) +                    ; +                // Return a positive value to indicate we should terminate +                return 1; +            default: +                UHD_LOG_ERROR( +                    "DPDK::IO_SERVICE", "Invalid reason associated with wait request"); +                while (_servq.complete(req) == -ENOBUFS) +                    ; +                break; +        } +    } +    return 0; +} + +void dpdk_io_service::_service_flow_open(dpdk::wait_req* req) +{ +    auto flow_req_data = (struct dpdk_flow_data*)req->data; +    assert(flow_req_data); +    if (flow_req_data->is_recv) { +        // If RX, add to RX table. Currently, nothing to do for TX. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip   = flow_req_data->link->get_port()->get_ipv4(), +            .src_port = 0, +            .dst_port = flow_req_data->link->get_local_port()}; +        // Check the UDP port isn't in use +        if (rte_hash_lookup(_rx_table, &ht_key) > 0) { +            req->retval = -EADDRINUSE; +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table"); +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +        // Add xport list for this UDP port +        auto rx_entry = new std::list<dpdk_io_if*>(); +        if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) { +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table"); +            delete rx_entry; +            req->retval = -ENOMEM; +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +void dpdk_io_service::_service_flow_close(dpdk::wait_req* req) +{ +    auto flow_req_data = (struct dpdk_flow_data*)req->data; +    assert(flow_req_data); +    if (flow_req_data->is_recv) { +        // If RX, remove from RX table. Currently, nothing to do for TX. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip   = flow_req_data->link->get_port()->get_ipv4(), +            .src_port = 0, +            .dst_port = flow_req_data->link->get_local_port()}; +        std::list<dpdk_io_if*>* xport_list; + +        if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) { +            UHD_ASSERT_THROW(xport_list->empty()); +            delete xport_list; +            rte_hash_del_key(_rx_table, &ht_key); +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req) +{ +    auto dpdk_io = static_cast<dpdk_io_if*>(req->data); +    UHD_ASSERT_THROW(dpdk_io); +    auto port = dpdk_io->link->get_port(); +    if (dpdk_io->recv_cb) { +        // Add to RX table only if have a callback. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip                                   = port->get_ipv4(), +            .src_port                                 = 0, +            .dst_port                                 = dpdk_io->link->get_local_port()}; +        void* hash_data; +        if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { +            req->retval = -ENOENT; +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table"); +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +        // Add to xport list for this UDP port +        auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); +        rx_entry->push_back(dpdk_io); +    } +    if (dpdk_io->is_recv) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request..."); +        // Add to xport list for this NIC port +        auto& xport_list = _recv_xport_map.at(port->get_port_id()); +        xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client); +    } else { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request..."); +        dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); +        // Add to xport list for this NIC port +        auto& xport_list = _tx_queues.at(port->get_port_id()); +        xport_list.push_back(send_io); +        for (size_t i = 0; i < send_io->_num_send_frames; i++) { +            auto buff_ptr = +                (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release(); +            if (!buff_ptr) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", +                    "TX mempool out of memory. Please increase dpdk_num_mbufs."); +                break; +            } +            if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { +                rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +                break; +            } +            send_io->_num_frames_in_use++; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req) +{ +    auto dpdk_io = (struct dpdk_io_if*)req->data; +    assert(dpdk_io); +    auto port = dpdk_io->link->get_port(); +    if (dpdk_io->recv_cb) { +        // Remove from RX table only if have a callback. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip                                   = port->get_ipv4(), +            .src_port                                 = 0, +            .dst_port                                 = dpdk_io->link->get_local_port()}; +        void* hash_data; +        if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) { +            // Remove from xport list for this UDP port +            auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); +            rx_entry->remove(dpdk_io); +        } else { +            req->retval = -EINVAL; +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table"); +        } +    } +    if (dpdk_io->is_recv) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request..."); +        dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client); +        // Remove from xport list for this NIC port +        auto& xport_list = _recv_xport_map.at(port->get_port_id()); +        xport_list.remove(recv_client); +        while (!rte_ring_empty(recv_client->_recv_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr); +            dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); +        } +        while (!rte_ring_empty(recv_client->_release_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr); +            dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); +        } +    } else { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request..."); +        dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client); +        // Remove from xport list for this NIC port +        auto& xport_list = _tx_queues.at(port->get_port_id()); +        xport_list.remove(send_client); +        while (!rte_ring_empty(send_client->_send_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr); +            dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); +        } +        while (!rte_ring_empty(send_client->_buffer_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr); +            dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); +        } +    } +    // Now remove the node if it's on the retry list +    if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) { +        _retry_head = NULL; +    } else if (_retry_head) { +        dpdk_io_if* node = _retry_head->next; +        while (node != _retry_head) { +            if (node == dpdk_io) { +                dpdk_io->prev->next = dpdk_io->next; +                dpdk_io->next->prev = dpdk_io->prev; +                break; +            } +            node = node->next; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +int dpdk_io_service::_service_arp_request(dpdk::wait_req* req) +{ +    int status               = 0; +    auto arp_req_data        = (struct dpdk::arp_request*)req->data; +    dpdk::ipv4_addr dst_addr = arp_req_data->tpa; +    auto ctx_sptr            = _ctx.lock(); +    UHD_ASSERT_THROW(ctx_sptr); +    dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port); +    UHD_LOG_TRACE("DPDK::IO_SERVICE", +        "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr)); + +    rte_spinlock_lock(&port->_spinlock); +    struct dpdk::arp_entry* entry = NULL; +    if (port->_arp_table.count(dst_addr) == 0) { +        entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); +        if (!entry) { +            status = -ENOMEM; +            goto arp_end; +        } +        entry = new (entry) dpdk::arp_entry(); +        entry->reqs.push_back(req); +        port->_arp_table[dst_addr] = entry; +        status                     = -EAGAIN; +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request."); +        _send_arp_request(port, 0, arp_req_data->tpa); +    } else { +        entry = port->_arp_table.at(dst_addr); +        if (is_zero_ether_addr(&entry->mac_addr)) { +            UHD_LOG_TRACE("DPDK::IO_SERVICE", +                "ARP: Address in table, but not populated yet. Resending ARP request."); +            port->_arp_table.at(dst_addr)->reqs.push_back(req); +            status = -EAGAIN; +            _send_arp_request(port, 0, arp_req_data->tpa); +        } else { +            UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table."); +            ether_addr_copy(&entry->mac_addr, &arp_req_data->tha); +            status = 0; +        } +    } +arp_end: +    rte_spinlock_unlock(&port->_spinlock); +    return status; +} + +int dpdk_io_service::_send_arp_request( +    dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip) +{ +    struct rte_mbuf* mbuf; +    struct ether_hdr* hdr; +    struct arp_hdr* arp_frame; + +    mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool()); +    if (unlikely(mbuf == NULL)) { +        UHD_LOG_WARNING( +            "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request"); +        return -ENOMEM; +    } + +    hdr       = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); +    arp_frame = (struct arp_hdr*)&hdr[1]; + +    memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); +    hdr->s_addr     = port->get_mac_addr(); +    hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + +    arp_frame->arp_hrd          = rte_cpu_to_be_16(ARP_HRD_ETHER); +    arp_frame->arp_pro          = rte_cpu_to_be_16(ETHER_TYPE_IPv4); +    arp_frame->arp_hln          = 6; +    arp_frame->arp_pln          = 4; +    arp_frame->arp_op           = rte_cpu_to_be_16(ARP_OP_REQUEST); +    arp_frame->arp_data.arp_sha = port->get_mac_addr(); +    arp_frame->arp_data.arp_sip = port->get_ipv4(); +    memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); +    arp_frame->arp_data.arp_tip = ip; + +    mbuf->pkt_len  = 42; +    mbuf->data_len = 42; + +    if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full"); +        rte_pktmbuf_free(mbuf); +        return -EAGAIN; +    } +    return 0; +} + +/* Do a burst of RX on port */ +int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue) +{ +    struct ether_hdr* hdr; +    char* l2_data; +    struct rte_mbuf* bufs[RX_BURST_SIZE]; +    const uint16_t num_rx = +        rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE); +    if (unlikely(num_rx == 0)) { +        return 0; +    } + +    for (int buf = 0; buf < num_rx; buf++) { +        uint64_t ol_flags = bufs[buf]->ol_flags; +        hdr               = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*); +        l2_data           = (char*)&hdr[1]; +        switch (rte_be_to_cpu_16(hdr->ether_type)) { +            case ETHER_TYPE_ARP: +                _process_arp(port, queue, (struct arp_hdr*)l2_data); +                rte_pktmbuf_free(bufs[buf]); +                break; +            case ETHER_TYPE_IPv4: +                if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { +                    UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum"); +                } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) { +                    UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum"); +                } else { +                    _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data); +                } +                break; +            default: +                rte_pktmbuf_free(bufs[buf]); +                break; +        } +    } +    return num_rx; +} + +int dpdk_io_service::_process_arp( +    dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame) +{ +    uint32_t dest_ip            = arp_frame->arp_data.arp_sip; +    struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; +    UHD_LOG_TRACE("DPDK::IO_SERVICE", +        "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> " +                                  << dpdk::eth_addr_to_string(dest_addr)); +    /* Add entry to ARP table */ +    rte_spinlock_lock(&port->_spinlock); +    struct dpdk::arp_entry* entry = NULL; +    if (port->_arp_table.count(dest_ip) == 0) { +        entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); +        if (!entry) { +            return -ENOMEM; +        } +        entry = new (entry) dpdk::arp_entry(); +        ether_addr_copy(&dest_addr, &entry->mac_addr); +        port->_arp_table[dest_ip] = entry; +    } else { +        entry = port->_arp_table.at(dest_ip); +        ether_addr_copy(&dest_addr, &entry->mac_addr); +        for (auto req : entry->reqs) { +            auto arp_data = (struct dpdk::arp_request*)req->data; +            ether_addr_copy(&dest_addr, &arp_data->tha); +            while (_servq.complete(req) == -ENOBUFS) +                ; +        } +        entry->reqs.clear(); +    } +    rte_spinlock_unlock(&port->_spinlock); + +    /* Respond if this was an ARP request */ +    if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) +        && arp_frame->arp_data.arp_tip == port->get_ipv4()) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply."); +        port->_arp_reply(queue_id, arp_frame); +    } + +    return 0; +} + +int dpdk_io_service::_process_ipv4( +    dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt) +{ +    bool bcast = port->dst_is_broadcast(pkt->dst_addr); +    if (pkt->dst_addr != port->get_ipv4() && !bcast) { +        rte_pktmbuf_free(mbuf); +        return -ENODEV; +    } +    if (pkt->next_proto_id == IPPROTO_UDP) { +        return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast); +    } +    rte_pktmbuf_free(mbuf); +    return -EINVAL; +} + + +int dpdk_io_service::_process_udp( +    dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/) +{ +    // Get the link +    struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +        .src_ip                                   = 0, +        .dst_ip                                   = port->get_ipv4(), +        .src_port                                 = 0, +        .dst_port                                 = pkt->dst_port}; +    void* hash_data; +    if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table"); +        rte_pktmbuf_free(mbuf); +        return -ENOENT; +    } +    // Get xport list for this UDP port +    auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); +    if (rx_entry->empty()) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link"); +        rte_pktmbuf_free(mbuf); +        return -ENOENT; +    } +    // Turn rte_mbuf -> dpdk_frame_buff +    auto link = rx_entry->front()->link; +    link->enqueue_recv_mbuf(mbuf); +    auto buff       = link->get_recv_buff(0); +    bool rcvr_found = false; +    for (auto client_if : *rx_entry) { +        // Check all the muxed receivers... +        if (client_if->recv_cb(buff, link, link)) { +            rcvr_found = true; +            if (buff) { +                assert(client_if->is_recv); +                auto recv_io  = (dpdk_recv_io*)client_if->io_client; +                auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); +                if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) { +                    rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +                    UHD_LOG_WARNING( +                        "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue"); +                } else { +                    recv_io->_num_frames_in_use++; +                    assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames); +                    _wake_client(client_if); +                } +            } +            break; +        } +    } +    if (!rcvr_found) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found"); +        // Release the buffer if no receiver found +        link->release_recv_buff(std::move(buff)); +        return -ENOENT; +    } +    return 0; +} + +/* Do a burst of TX on port's tx queues */ +int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port) +{ +    unsigned int total_tx = 0; +    auto& queues          = _tx_queues.at(port->get_port_id()); + +    for (auto& send_io : queues) { +        unsigned int num_tx   = rte_ring_count(send_io->_send_queue); +        num_tx                = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE; +        bool replaced_buffers = false; +        for (unsigned int i = 0; i < num_tx; i++) { +            size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size(); +            if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) { +                break; +            } +            dpdk::dpdk_frame_buff* buff_ptr; +            int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr); +            if (status) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual"); +                break; +            } +            send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link); +            // Attempt to replace buffer +            buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0) +                           .release(); +            if (!buff_ptr) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", +                    "TX mempool out of memory. Please increase dpdk_num_mbufs."); +                send_io->_num_frames_in_use--; +            } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { +                rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +                send_io->_num_frames_in_use--; +            } else { +                replaced_buffers = true; +            } +        } +        if (replaced_buffers) { +            _wake_client(&send_io->_dpdk_io_if); +        } +        total_tx += num_tx; +    } + +    return total_tx; +} + +int dpdk_io_service::_rx_release(dpdk::dpdk_port* port) +{ +    unsigned int total_bufs = 0; +    auto& queues            = _recv_xport_map.at(port->get_port_id()); + +    for (auto& recv_io : queues) { +        unsigned int num_buf = rte_ring_count(recv_io->_release_queue); +        num_buf              = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE; +        for (unsigned int i = 0; i < num_buf; i++) { +            dpdk::dpdk_frame_buff* buff_ptr; +            int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr); +            if (status) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual"); +                break; +            } +            recv_io->_fc_cb(frame_buff::uptr(buff_ptr), +                recv_io->_dpdk_io_if.link, +                recv_io->_dpdk_io_if.link); +            recv_io->_num_frames_in_use--; +        } +        total_bufs += num_buf; +    } + +    return total_bufs; +} + +uint16_t dpdk_io_service::_get_unique_client_id() +{ +    std::lock_guard<std::mutex> lock(_mutex); +    if (_client_id_set.size() >= MAX_CLIENTS) { +        UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients"); +        throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients"); +    } + +    uint16_t id = _next_client_id++; +    while (_client_id_set.count(id)) { +        id = _next_client_id++; +    } +    _client_id_set.insert(id); +    return id; +} + +void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io) +{ +    dpdk::wait_req* req; +    if (dpdk_io->is_recv) { +        auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client); +        req          = recv_io->_waiter; +    } else { +        auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); +        req          = send_io->_waiter; +    } +    bool stat = req->mutex.try_lock(); +    if (stat) { +        bool active_req = !req->complete; +        if (dpdk_io->next) { +            // On the list: Take it off +            if (dpdk_io->next == dpdk_io) { +                // Only node on the list +                _retry_head = NULL; +            } else { +                // Other nodes are on the list +                if (_retry_head == dpdk_io) { +                    // Move list head to next +                    _retry_head = dpdk_io->next; +                } +                dpdk_io->next->prev = dpdk_io->prev; +                dpdk_io->prev->next = dpdk_io->next; +            } +            dpdk_io->next = NULL; +            dpdk_io->prev = NULL; +        } +        if (active_req) { +            req->complete = true; +            req->cond.notify_one(); +        } +        req->mutex.unlock(); +        if (active_req) { +            wait_req_put(req); +        } +    } else { +        // Put on the retry list, if it isn't already +        if (!dpdk_io->next) { +            if (_retry_head) { +                dpdk_io->next           = _retry_head; +                dpdk_io->prev           = _retry_head->prev; +                _retry_head->prev->next = dpdk_io; +                _retry_head->prev       = dpdk_io; +            } else { +                _retry_head   = dpdk_io; +                dpdk_io->next = dpdk_io; +                dpdk_io->prev = dpdk_io; +            } +        } +    } +} diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 8e58dd591..2c53e4905 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -124,7 +124,10 @@ if(ENABLE_DPDK)          ${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp          ${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp          ${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp +        ${CMAKE_SOURCE_DIR}/lib/transport/adapter.cpp          ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp +        ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp +        ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp          INCLUDE_DIRS          ${DPDK_INCLUDE_DIR}          EXTRA_LIBS ${DPDK_LIBRARIES} @@ -132,6 +135,8 @@ if(ENABLE_DPDK)      )      set_source_files_properties(          ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp +        ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp +        ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp          PROPERTIES COMPILE_FLAGS "-march=native -D_GNU_SOURCE"      )  ENDIF(ENABLE_DPDK) diff --git a/host/tests/dpdk_port_test.cpp b/host/tests/dpdk_port_test.cpp index 7ef386c52..8f6b20c34 100644 --- a/host/tests/dpdk_port_test.cpp +++ b/host/tests/dpdk_port_test.cpp @@ -6,6 +6,8 @@  #include <uhdlib/transport/dpdk/common.hpp>  #include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp>  #include <boost/program_options.hpp>  #include <iostream>  #include <memory> @@ -101,6 +103,46 @@ int main(int argc, char **argv)      service_thread.join();      std::cout << "PASS: Service thread terminated" << std::endl;      delete queue; + +    std::cout << "Starting up ARP thread..." << std::endl; +    std::vector<uhd::transport::dpdk::dpdk_port*> ports; +    ports.push_back(port); +    //auto io_srv = uhd::transport::dpdk_io_service::make(1, ports, 16); +    auto io_srv = ctx->get_io_service(1); + +    // Create link +    std::cout << "Creating UDP link..." << std::endl; +    uhd::transport::link_params_t params; +    params.recv_frame_size = 8000; +    params.send_frame_size = 8000; +    params.num_recv_frames = 511; +    params.num_send_frames = 511; +    params.recv_buff_size = params.recv_frame_size*params.num_recv_frames; +    params.send_buff_size = params.send_frame_size*params.num_send_frames; +    auto link = uhd::transport::udp_dpdk_link::make("192.168.10.2", "49600", params); + +    // Attach link +    std::cout << "Attaching UDP send link..." << std::endl; +    io_srv->attach_send_link(link); +    struct ether_addr dest_mac; +    link->get_remote_mac(dest_mac); +    char mac_str[20]; +    ether_format_addr(mac_str, 20, &dest_mac); +    std::cout << "Remote MAC address is " << mac_str << std::endl; +    std::cout << std::endl; +    std::cout << "Attaching UDP recv link..." << std::endl; +    io_srv->attach_recv_link(link); +    std::cout << "Press any key to quit..." << std::endl; +    std::cin.get(); + +    // Shut down +    std::cout << "Detaching UDP send link..." << std::endl; +    io_srv->detach_send_link(link); +    std::cout << "Detaching UDP recv link..." << std::endl; +    io_srv->detach_recv_link(link); +    std::cout << "Shutting down I/O service..." << std::endl; +    io_srv.reset(); +    std::cout << "Shutting down context..." << std::endl;      ctx.reset();      return 0;  } | 
