diff options
| author | Alex Williams <alex.williams@ni.com> | 2019-12-01 21:58:13 -0800 | 
|---|---|---|
| committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-12-20 16:32:22 -0800 | 
| commit | 4e38eef817813c1bbd8a9cf972e4cf0134d24308 (patch) | |
| tree | f6200a048a7da5b7b588a4a9aae881ce7551825e /host/lib/transport/uhd-dpdk | |
| parent | 797d54bc2573688eebcb2c639cb07e4ab6d5ab9d (diff) | |
| download | uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.gz uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.bz2 uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.zip | |
dpdk: Add new DPDK stack to integrate with I/O services
docs: Update DPDK docs with new parameters:
Parameter names have had their hyphens changed to underscores, and
the I/O CPU argument is now named after the lcores and reflects
the naming used by DPDK.
transport: Add new udp_dpdk_link, based atop the new APIs:
This link is tightly coupled with the DPDK I/O service. The link class
carries all the address information to communicate with the other
host, and it can send packets directly through the DPDK NIC ports.
However, for receiving packets, the I/O service must pull the packets
from the DMA queue and attach them to the appropriate link object.
The link object merely formats the frame_buff object underneath, which
is embedded in the rte_mbuf container. For get_recv_buff, the link
will pull buffers only from its internal queue (the one filled by the
I/O service).
transport: Add DPDK-specific I/O service:
The I/O service is split into two parts, the user threads and the
I/O worker threads. The user threads submit requests through
various appropriate queues, and the I/O threads perform all the
I/O on their behalf. This includes routing UDP packets to the
correct receiver and getting the MAC address of a destination (by
performing the ARP request and handling the ARP replies).
The DPDK context stores I/O services. The context spawns all I/O
services on init(), and I/O services can be fetched from the dpdk_ctx
object by using a port ID.
I/O service clients:
The clients have two lockless ring buffers. One is to get a buffer
from the I/O service; the other is to release a buffer back to the
I/O service. Threads sleeping on buffer I/O are kept in a separate
list from the service queue and are processed in the course of doing
RX or TX.
The list nodes are embedded in the dpdk_io_if, and the head of the
list is on the dpdk_io_service. The I/O service will transfer the
embedded wait_req to the list if it cannot acquire the mutex to
complete the condition for waking.
Co-authored-by: Martin Braun <martin.braun@ettus.com>
Co-authored-by: Ciro Nishiguchi <ciro.nishiguchi@ni.com>
Co-authored-by: Brent Stapleton <brent.stapleton@ettus.com>
Diffstat (limited to 'host/lib/transport/uhd-dpdk')
| -rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_common.cpp | 258 | ||||
| -rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_io_service.cpp | 950 | 
3 files changed, 1081 insertions, 129 deletions
| diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index ea78aa1e8..a13886653 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -19,9 +19,11 @@ if(ENABLE_DPDK)      LIBUHD_APPEND_SOURCES(          ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp +        ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp      )      set_source_files_properties(          ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp +        ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp          PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE"      )      include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp index 43a1507bb..46818e973 100644 --- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -3,8 +3,13 @@  //  // SPDX-License-Identifier: GPL-3.0-or-later  // + +#include <uhd/utils/algorithm.hpp>  #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/arp.hpp>  #include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp>  #include <uhdlib/utils/prefs.hpp>  #include <arpa/inet.h>  #include <rte_arp.h> @@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512;  inline char* eal_add_opt(      std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)  { +    UHD_LOG_TRACE("DPDK", opt << " " << arg);      char* ptr = dst;      strncpy(ptr, opt, n);      argv.push_back(ptr); @@ -34,15 +40,6 @@ inline char* eal_add_opt(      return ptr;  } -inline std::string eth_addr_to_string(const struct ether_addr mac_addr) -{ -    auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); -    mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] -        % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] -        % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; -    return mac_stream.str(); -} -  inline void separate_ipv4_addr(      const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask)  { @@ -59,28 +56,29 @@ inline void separate_ipv4_addr(  dpdk_port::uptr dpdk_port::make(port_id_t port,      size_t mtu,      uint16_t num_queues, -    size_t num_mbufs, +    uint16_t num_desc,      struct rte_mempool* rx_pktbuf_pool,      struct rte_mempool* tx_pktbuf_pool,      std::string ipv4_address)  {      return std::make_unique<dpdk_port>( -        port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); +        port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);  }  dpdk_port::dpdk_port(port_id_t port,      size_t mtu,      uint16_t num_queues, -    size_t num_mbufs, +    uint16_t num_desc,      struct rte_mempool* rx_pktbuf_pool,      struct rte_mempool* tx_pktbuf_pool,      std::string ipv4_address)      : _port(port)      , _mtu(mtu) +    , _num_queues(num_queues)      , _rx_pktbuf_pool(rx_pktbuf_pool)      , _tx_pktbuf_pool(tx_pktbuf_pool)  { -    /* 1. Set MTU and IPv4 address */ +    /* Set MTU and IPv4 address */      int retval;      retval = rte_eth_dev_set_mtu(_port, _mtu); @@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port,      separate_ipv4_addr(ipv4_address, _ipv4, _netmask); -    /* 2. Set hardware offloads */ +    /* Set hardware offloads */      struct rte_eth_dev_info dev_info;      rte_eth_dev_info_get(_port, &dev_info);      uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; @@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port,          throw uhd::runtime_error("DPDK: Failed to configure the device");      } -    /* 3. Set descriptor ring sizes */ -    uint16_t rx_desc = num_mbufs; +    /* Set descriptor ring sizes */ +    uint16_t rx_desc = num_desc;      if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc          || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) {          UHD_LOGGER_ERROR("DPDK")              << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") -                   % _port % num_mbufs % dev_info.rx_desc_lim.nb_min +                   % _port % num_desc % dev_info.rx_desc_lim.nb_min                     % dev_info.rx_desc_lim.nb_max;          UHD_LOGGER_ERROR("DPDK")              << boost::format("Num RX descriptors must also be aligned to 0x%x") @@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port,          throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors");      } -    uint16_t tx_desc = num_mbufs; +    uint16_t tx_desc = num_desc;      if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc          || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) {          UHD_LOGGER_ERROR("DPDK")              << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") -                   % _port % num_mbufs % dev_info.tx_desc_lim.nb_min +                   % _port % num_desc % dev_info.tx_desc_lim.nb_min                     % dev_info.tx_desc_lim.nb_max;          UHD_LOGGER_ERROR("DPDK")              << boost::format("Num TX descriptors must also be aligned to 0x%x") @@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port,          throw uhd::runtime_error("DPDK: Failed to configure the DMA queues");      } -    /* 4. Set up the RX and TX DMA queues (May not be generally supported after +    /* Set up the RX and TX DMA queues (May not be generally supported after       * eth_dev_start) */      unsigned int cpu_socket = rte_eth_dev_socket_id(_port);      for (uint16_t i = 0; i < _num_queues; i++) { @@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port,          }      } -    /* 5. Set up initial flow table */ +    /* TODO: Enable multiple queues (only support 1 right now) */ -    /* Append all free queues except 0, which is reserved for ARP */ -    _free_queues.reserve(_num_queues - 1); -    for (unsigned int i = 1; i < _num_queues; i++) { -        _free_queues.push_back(i); -    } - -    // struct rte_flow_attr flow_attr; -    // flow_attr.group = 0; -    // flow_attr.priority = 1; -    // flow_attr.ingress = 1; -    // flow_attr.egress = 0; -    // flow_attr.transfer = 0; -    // flow_attr.reserved = 0; - -    // struct rte_flow_item[] flow_pattern = { -    //}; -    // int rte_flow_validate(uint16_t port_id, -    //              const struct rte_flow_attr *attr, -    //              const struct rte_flow_item pattern[], -    //              const struct rte_flow_action actions[], -    //              struct rte_flow_error *error); -    // struct rte_flow * rte_flow_create(uint16_t port_id, -    //            const struct rte_flow_attr *attr, -    //            const struct rte_flow_item pattern[], -    //            const struct rte_flow_action *actions[], -    //            struct rte_flow_error *error); - -    /* 6. Start the Ethernet device */ +    /* Start the Ethernet device */      retval = rte_eth_dev_start(_port);      if (retval < 0) {          UHD_LOGGER_ERROR("DPDK") @@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port,                               << " MAC: " << eth_addr_to_string(_mac_addr);  } -/* TODO: Do flow directions */ -queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +dpdk_port::~dpdk_port()  { -    std::lock_guard<std::mutex> lock(_mutex); -    UHD_ASSERT_THROW(_free_queues.size() != 0); -    auto queue = _free_queues.back(); -    _free_queues.pop_back(); -    return queue; +    rte_eth_dev_stop(_port); +    rte_spinlock_lock(&_spinlock); +    for (auto kv : _arp_table) { +        for (auto req : kv.second->reqs) { +            req->cond.notify_one(); +        } +        rte_free(kv.second); +    } +    _arp_table.clear(); +    rte_spinlock_unlock(&_spinlock);  } -void dpdk_port::free_queue(queue_id_t queue) +uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port)  { +    uint16_t port_selected;      std::lock_guard<std::mutex> lock(_mutex); -    auto flow  = _flow_rules.at(queue); -    int status = rte_flow_destroy(_port, flow, NULL); -    if (status) { -        UHD_LOGGER_ERROR("DPDK") -            << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port -                   % queue; -        throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); +    if (udp_port) { +        if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) { +            return 0; +        } +        port_selected = rte_be_to_cpu_16(udp_port);      } else { -        _flow_rules.erase(queue); +        if (_udp_ports.size() >= 65535) { +            UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain"); +            return 0; +        } +        port_selected = _next_udp_port; +        while (true) { +            if (port_selected == 0) { +                continue; +            } +            if (_udp_ports.count(port_selected) == 0) { +                _next_udp_port = port_selected - 1; +                break; +            } +            if (port_selected - 1 == _next_udp_port) { +                return 0; +            } +            port_selected--; +        }      } -    _free_queues.push_back(queue); +    _udp_ports.insert(port_selected); +    return rte_cpu_to_be_16(port_selected);  } -int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req)  {      struct rte_mbuf* mbuf;      struct ether_hdr* hdr;      struct arp_hdr* arp_frame; -    mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); +    mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool);      if (unlikely(mbuf == NULL)) {          UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response");          return -ENOMEM; @@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar      mbuf->pkt_len  = 42;      mbuf->data_len = 42; -    // ARP replies always on queue 0 -    if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { +    if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) {          UHD_LOGGER_WARNING("DPDK")              << boost::format("%s: TX descriptor ring is full") % __func__;          rte_pktmbuf_free(mbuf); @@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar      return 0;  } -// TODO: ARP processing for queue 0 -// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr -// *arp_frame) -//{ -//    std::lock_guard<std::mutex> lock(_mutex); -//    uint32_t dest_ip = arp_frame->arp_data.arp_sip; -//    struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; -// -//    /* Add entry to ARP table */ -//    struct uhd_dpdk_arp_entry *entry = NULL; -//    rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); -//    if (!entry) { -//        entry = rte_zmalloc(NULL, sizeof(*entry), 0); -//        if (!entry) { -//            return -ENOMEM; -//        } -//        LIST_INIT(&entry->pending_list); -//        ether_addr_copy(&dest_addr, &entry->mac_addr); -//        if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { -//            rte_free(entry); -//            return -ENOSPC; -//        } -//    } else { -//        struct uhd_dpdk_config_req *req = NULL; -//        ether_addr_copy(&dest_addr, &entry->mac_addr); -//        /* Now wake any config reqs waiting for the ARP */ -//        LIST_FOREACH(req, &entry->pending_list, entry) { -//            _uhd_dpdk_config_req_compl(req, 0); -//        } -//        while (entry->pending_list.lh_first != NULL) { -//            LIST_REMOVE(entry->pending_list.lh_first, entry); -//        } -//    } -//    /* Respond if this was an ARP request */ -//    if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && -//        arp_frame->arp_data.arp_tip == port->ipv4_addr) { -//        _arp_reply(tx_pktbuf_pool, arp_frame); -//    } -// -//    return 0; -// -//} - -static dpdk_ctx* global_ctx = nullptr; +static dpdk_ctx::sptr global_ctx = nullptr;  static std::mutex global_ctx_mutex;  dpdk_ctx::sptr dpdk_ctx::get()  {      std::lock_guard<std::mutex> lock(global_ctx_mutex);      if (!global_ctx) { -        auto new_ctx = std::make_shared<dpdk_ctx>(); -        global_ctx   = new_ctx.get(); -        return new_ctx; +        global_ctx = std::make_shared<dpdk_ctx>();      } -    return global_ctx->shared_from_this(); +    return global_ctx;  }  dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} @@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args)      auto args = new std::array<char, 4096>();      char* opt = args->data();      char* end = args->data() + args->size(); +    UHD_LOG_TRACE("DPDK", "EAL init options: ");      for (std::string& key : eal_args.keys()) {          std::string val = eal_args[key];          if (key == "dpdk_coremask") { @@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args)          _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS);          _mbuf_cache_size =              dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); +        UHD_LOG_TRACE("DPDK", +            "mtu: " << _mtu << " num_mbufs: " << _num_mbufs +                    << " mbuf_cache_size: " << _mbuf_cache_size);          /* Get device info for all the NIC ports */          int num_dpdk_ports = rte_eth_dev_count_avail(); -        UHD_ASSERT_THROW(num_dpdk_ports > 0); +        if (num_dpdk_ports == 0) { +            UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!"); +            throw uhd::runtime_error("No available DPDK devices (ports) found!"); +        }          device_addrs_t nics(num_dpdk_ports);          RTE_ETH_FOREACH_DEV(i)          { @@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args)              }              /* Now combine user args with conf file */              auto conf = uhd::prefs::get_dpdk_nic_args(nic); +            // TODO: Enable the use of multiple DMA queues +            conf["dpdk_num_queues"] = "1";              /* Update config, and remove ports that aren't fully configured */              if (conf.has_key("dpdk_ipv4")) { @@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args)              }          } +        std::map<size_t, std::vector<size_t>> lcore_to_port_id_map;          RTE_ETH_FOREACH_DEV(i)          {              auto& conf = nics.at(i);              if (conf.has_key("dpdk_ipv4")) { +                UHD_ASSERT_THROW(conf.has_key("dpdk_lcore")); +                const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0); +                if (!lcore_to_port_id_map.count(lcore_id)) { +                    lcore_to_port_id_map.insert({lcore_id, {}}); +                } +                  // Allocating enough buffers for all DMA queues for each CPU socket                  // - This is a bit inefficient for larger systems, since NICs may not                  //   all be on one socket @@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args)                  _ports[i] = dpdk_port::make(i,                      _mtu,                      conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), -                    _num_mbufs, +                    conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE),                      rx_pool,                      tx_pool,                      conf["dpdk_ipv4"]); + +                // Remember all port IDs that map to an lcore +                lcore_to_port_id_map.at(lcore_id).push_back(i);              }          } @@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args)              rte_eth_link_get(portid, &link);              unsigned int link_status = link.link_status;              unsigned int link_speed  = link.link_speed; -            UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") -                                            % portid % link_status % link_speed; +            UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid +                                            % link_status % link_speed;          } -        UHD_LOG_TRACE("DPDK", "Init DONE!"); - +        UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services");          _init_done = true; + +        // Links are up, now create one IO service per lcore +        for (auto& lcore_portids_pair : lcore_to_port_id_map) { +            const size_t lcore_id = lcore_portids_pair.first; +            std::vector<dpdk_port*> dpdk_ports; +            dpdk_ports.reserve(lcore_portids_pair.second.size()); +            for (const size_t port_id : lcore_portids_pair.second) { +                dpdk_ports.push_back(get_port(port_id)); +            } +            const size_t servq_depth = 32; // FIXME +            UHD_LOG_TRACE("DPDK", +                "Creating I/O service for lcore " +                    << lcore_id << ", servicing " << dpdk_ports.size() +                    << " ports, service queue depth " << servq_depth); +            _io_srv_portid_map.insert( +                {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth), +                    lcore_portids_pair.second}); +        }      }  } @@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const      return link.link_status;  } -int dpdk_ctx::get_route(const std::string& addr) const +dpdk_port* dpdk_ctx::get_route(const std::string& addr) const  {      const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());      for (const auto& port : _ports) { @@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const          uint32_t src_ipv4 = port.second->get_ipv4();          uint32_t netmask  = port.second->get_netmask();          if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { -            return (int)port.first; +            return port.second.get();          }      } -    return -ENODEV; +    return NULL;  } @@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const      return _init_done.load();  } +uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id) +{ +    for (auto& io_srv_portid_pair : _io_srv_portid_map) { +        if (uhd::has(io_srv_portid_pair.second, port_id)) { +            return io_srv_portid_pair.first; +        } +    } + +    std::string err_msg = std::string("Cannot look up I/O service for port ID: ") +                          + std::to_string(port_id) + ". No such port ID!"; +    UHD_LOG_ERROR("DPDK", err_msg); +    throw uhd::lookup_error(err_msg); +} +  struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(      unsigned int cpu_socket, size_t num_bufs)  { @@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(          const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM;          char name[32];          snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); -        _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( -            name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); +        _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name, +            num_bufs, +            _mbuf_cache_size, +            DPDK_MBUF_PRIV_SIZE, +            mbuf_size, +            SOCKET_ID_ANY);          if (!_rx_pktbuf_pools.at(cpu_socket)) {              UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool");              throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp new file mode 100644 index 000000000..1fcedca51 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp @@ -0,0 +1,950 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> +#include <uhdlib/utils/narrow.hpp> +#include <cmath> + +/* + * Memory management + * + * Every object that allocates and frees DPDK memory has a reference to the + * dpdk_ctx. + * + * Ownership hierarchy: + * + * dpdk_io_service_mgr (1) => + *     dpdk_ctx::sptr + *     dpdk_io_service::sptr + * + * xport (1) => + *     dpdk_send_io::sptr + *     dpdk_recv_io::sptr + * + * usrp link_mgr (1) => + *     udp_dpdk_link::sptr + * + * dpdk_send_io (2) => + *     dpdk_ctx::sptr + *     dpdk_io_service::sptr + * + * dpdk_recv_io (2) => + *     dpdk_ctx::sptr + *     dpdk_io_service::sptr + * + * dpdk_io_service (3) => + *     dpdk_ctx::wptr (weak_ptr) + *     udp_dpdk_link::sptr + * + * udp_dpdk_link (4) => + *     dpdk_ctx::sptr + */ + +using namespace uhd::transport; + +dpdk_io_service::dpdk_io_service( +    unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +    : _ctx(dpdk::dpdk_ctx::get()) +    , _lcore_id(lcore_id) +    , _ports(ports) +    , _servq(servq_depth, lcore_id) +{ +    UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id); +    for (auto port : _ports) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", +            "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id()); +        _tx_queues[port->get_port_id()]      = std::list<dpdk_send_io*>(); +        _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>(); +    } +    int status = rte_eal_remote_launch(_io_worker, this, lcore_id); +    if (status) { +        throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore"); +    } +} + +dpdk_io_service::sptr dpdk_io_service::make( +    unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +{ +    return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth)); +} + +dpdk_io_service::~dpdk_io_service() +{ +    UHD_LOG_TRACE( +        "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id); +    dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL); +    if (!req) { +        UHD_LOG_ERROR("DPDK::IO_SERVICE", +            "Could not allocate request for lcore termination for lcore " << _lcore_id); +        return; +    } +    dpdk::wait_req_get(req); +    _servq.submit(req, std::chrono::microseconds(-1)); +    dpdk::wait_req_put(req); +} + +void dpdk_io_service::attach_recv_link(recv_link_if::sptr link) +{ +    struct dpdk_flow_data data; +    data.link    = dynamic_cast<udp_dpdk_link*>(link.get()); +    data.is_recv = true; +    assert(data.link); +    auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    dpdk::wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _recv_links.push_back(link); +    } +} + +void dpdk_io_service::attach_send_link(send_link_if::sptr link) +{ +    udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get()); +    assert(dpdk_link); + +    // First, fill in destination MAC address +    struct dpdk::arp_request arp_data; +    arp_data.tpa  = dpdk_link->get_remote_ipv4(); +    arp_data.port = dpdk_link->get_port()->get_port_id(); +    if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) { +        // If a broadcast IP, skip the ARP and fill with broadcast MAC addr +        memset(arp_data.tha.addr_bytes, 0xFF, 6); +    } else { +        auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); +        if (!arp_req) { +            UHD_LOG_ERROR( +                "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request"); +            throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request"); +        } +        if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) { +            // Try one more time... +            auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); +            if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) { +                wait_req_put(arp_req); +                wait_req_put(arp_req2); +                throw uhd::io_error("DPDK: Could not reach host"); +            } +            wait_req_put(arp_req2); +        } +        wait_req_put(arp_req); +    } +    dpdk_link->set_remote_mac(arp_data.tha); + +    // Then, submit the link to the I/O service thread +    struct dpdk_flow_data data; +    data.link    = dpdk_link; +    data.is_recv = false; +    auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _send_links.push_back(link); +    } +} + +void dpdk_io_service::detach_recv_link(recv_link_if::sptr link) +{ +    auto link_ptr = link.get(); +    struct dpdk_flow_data data; +    data.link    = dynamic_cast<udp_dpdk_link*>(link_ptr); +    data.is_recv = true; +    auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _recv_links.remove_if( +            [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); +    } +} + +void dpdk_io_service::detach_send_link(send_link_if::sptr link) +{ +    auto link_ptr = link.get(); +    struct dpdk_flow_data data; +    data.link    = dynamic_cast<udp_dpdk_link*>(link_ptr); +    data.is_recv = false; +    auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); +    if (!req) { +        UHD_LOG_ERROR( +            "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link"); +        throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link"); +    } +    _servq.submit(req, std::chrono::microseconds(-1)); +    wait_req_put(req); +    { +        std::lock_guard<std::mutex> lock(_mutex); +        _send_links.remove_if( +            [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); +    } +} + +recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link, +    size_t num_recv_frames, +    recv_callback_t cb, +    send_link_if::sptr /*fc_link*/, +    size_t num_send_frames, +    recv_io_if::fc_callback_t fc_cb) +{ +    auto link    = dynamic_cast<udp_dpdk_link*>(data_link.get()); +    auto recv_io = std::make_shared<dpdk_recv_io>( +        shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb); + +    // Register with I/O service +    recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get()); +    auto xport_req                 = dpdk::wait_req_alloc( +        dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if); +    _servq.submit(xport_req, std::chrono::microseconds(-1)); +    wait_req_put(xport_req); +    return recv_io; +} + +send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link, +    size_t num_send_frames, +    send_io_if::send_callback_t send_cb, +    recv_link_if::sptr /*recv_link*/, +    size_t num_recv_frames, +    recv_callback_t recv_cb, +    send_io_if::fc_callback_t fc_cb) +{ +    auto link    = dynamic_cast<udp_dpdk_link*>(send_link.get()); +    auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(), +        link, +        num_send_frames, +        send_cb, +        num_recv_frames, +        recv_cb, +        fc_cb); + +    // Register with I/O service +    send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get()); +    auto xport_req                 = dpdk::wait_req_alloc( +        dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if); +    _servq.submit(xport_req, std::chrono::microseconds(-1)); +    wait_req_put(xport_req); +    return send_io; +} + + +int dpdk_io_service::_io_worker(void* arg) +{ +    if (!arg) +        return -EINVAL; +    dpdk_io_service* srv = (dpdk_io_service*)arg; + +    /* Check that this is a valid lcore */ +    unsigned int lcore_id = rte_lcore_id(); +    if (lcore_id == LCORE_ID_ANY) +        return -ENODEV; + +    /* Check that this lcore has ports */ +    if (srv->_ports.size() == 0) +        return -ENODEV; + +    char name[16]; +    snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id); +    rte_thread_setname(pthread_self(), name); +    UHD_LOG_TRACE("DPDK::IO_SERVICE", +        "I/O service thread '" << name << "' started on lcore " << lcore_id); + +    uhd::set_thread_priority_safe(); + +    snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id); +    struct rte_hash_parameters hash_params = {.name = name, +        .entries                                    = MAX_FLOWS, +        .reserved                                   = 0, +        .key_len                                    = sizeof(struct dpdk::ipv4_5tuple), +        .hash_func                                  = NULL, +        .hash_func_init_val                         = 0, +        .socket_id  = uhd::narrow_cast<int>(rte_socket_id()), +        .extra_flag = 0}; +    srv->_rx_table                         = rte_hash_create(&hash_params); +    if (srv->_rx_table == NULL) { +        return rte_errno; +    } + +    int status = 0; +    while (!status) { +        /* For each port, attempt to receive packets and process */ +        for (auto port : srv->_ports) { +            srv->_rx_burst(port, 0); +        } +        /* For each port's TX queues, do TX */ +        for (auto port : srv->_ports) { +            srv->_tx_burst(port); +        } +        /* For each port's RX release queues, release buffers */ +        for (auto port : srv->_ports) { +            srv->_rx_release(port); +        } +        /* Retry waking clients */ +        if (srv->_retry_head) { +            dpdk_io_if* node = srv->_retry_head; +            dpdk_io_if* end  = srv->_retry_head->prev; +            while (true) { +                dpdk_io_if* next = node->next; +                srv->_wake_client(node); +                if (node == end) { +                    break; +                } else { +                    node = next; +                    next = node->next; +                } +            } +        } +        /* Check for open()/close()/term() requests and service 1 at a time +         * Leave this last so we immediately terminate if requested +         */ +        status = srv->_service_requests(); +    } + +    return status; +} + +int dpdk_io_service::_service_requests() +{ +    for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) { +        /* Dequeue */ +        dpdk::wait_req* req = _servq.pop(); +        if (!req) { +            break; +        } +        switch (req->reason) { +            case dpdk::wait_type::WAIT_SIMPLE: +                while (_servq.complete(req) == -ENOBUFS) +                    ; +                break; +            case dpdk::wait_type::WAIT_RX: +            case dpdk::wait_type::WAIT_TX_BUF: +                throw uhd::not_implemented_error( +                    "DPDK: _service_requests(): DPDK is still a WIP"); +            case dpdk::wait_type::WAIT_FLOW_OPEN: +                _service_flow_open(req); +                break; +            case dpdk::wait_type::WAIT_FLOW_CLOSE: +                _service_flow_close(req); +                break; +            case dpdk::wait_type::WAIT_XPORT_CONNECT: +                _service_xport_connect(req); +                break; +            case dpdk::wait_type::WAIT_XPORT_DISCONNECT: +                _service_xport_disconnect(req); +                break; +            case dpdk::wait_type::WAIT_ARP: { +                assert(req->data != NULL); +                int arp_status = _service_arp_request(req); +                assert(arp_status != -ENOMEM); +                if (arp_status == 0) { +                    while (_servq.complete(req) == -ENOBUFS) +                        ; +                } +                break; +            } +            case dpdk::wait_type::WAIT_LCORE_TERM: +                rte_free(_rx_table); +                while (_servq.complete(req) == -ENOBUFS) +                    ; +                // Return a positive value to indicate we should terminate +                return 1; +            default: +                UHD_LOG_ERROR( +                    "DPDK::IO_SERVICE", "Invalid reason associated with wait request"); +                while (_servq.complete(req) == -ENOBUFS) +                    ; +                break; +        } +    } +    return 0; +} + +void dpdk_io_service::_service_flow_open(dpdk::wait_req* req) +{ +    auto flow_req_data = (struct dpdk_flow_data*)req->data; +    assert(flow_req_data); +    if (flow_req_data->is_recv) { +        // If RX, add to RX table. Currently, nothing to do for TX. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip   = flow_req_data->link->get_port()->get_ipv4(), +            .src_port = 0, +            .dst_port = flow_req_data->link->get_local_port()}; +        // Check the UDP port isn't in use +        if (rte_hash_lookup(_rx_table, &ht_key) > 0) { +            req->retval = -EADDRINUSE; +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table"); +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +        // Add xport list for this UDP port +        auto rx_entry = new std::list<dpdk_io_if*>(); +        if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) { +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table"); +            delete rx_entry; +            req->retval = -ENOMEM; +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +void dpdk_io_service::_service_flow_close(dpdk::wait_req* req) +{ +    auto flow_req_data = (struct dpdk_flow_data*)req->data; +    assert(flow_req_data); +    if (flow_req_data->is_recv) { +        // If RX, remove from RX table. Currently, nothing to do for TX. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip   = flow_req_data->link->get_port()->get_ipv4(), +            .src_port = 0, +            .dst_port = flow_req_data->link->get_local_port()}; +        std::list<dpdk_io_if*>* xport_list; + +        if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) { +            UHD_ASSERT_THROW(xport_list->empty()); +            delete xport_list; +            rte_hash_del_key(_rx_table, &ht_key); +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req) +{ +    auto dpdk_io = static_cast<dpdk_io_if*>(req->data); +    UHD_ASSERT_THROW(dpdk_io); +    auto port = dpdk_io->link->get_port(); +    if (dpdk_io->recv_cb) { +        // Add to RX table only if have a callback. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip                                   = port->get_ipv4(), +            .src_port                                 = 0, +            .dst_port                                 = dpdk_io->link->get_local_port()}; +        void* hash_data; +        if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { +            req->retval = -ENOENT; +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table"); +            while (_servq.complete(req) == -ENOBUFS) +                ; +            return; +        } +        // Add to xport list for this UDP port +        auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); +        rx_entry->push_back(dpdk_io); +    } +    if (dpdk_io->is_recv) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request..."); +        // Add to xport list for this NIC port +        auto& xport_list = _recv_xport_map.at(port->get_port_id()); +        xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client); +    } else { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request..."); +        dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); +        // Add to xport list for this NIC port +        auto& xport_list = _tx_queues.at(port->get_port_id()); +        xport_list.push_back(send_io); +        for (size_t i = 0; i < send_io->_num_send_frames; i++) { +            auto buff_ptr = +                (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release(); +            if (!buff_ptr) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", +                    "TX mempool out of memory. Please increase dpdk_num_mbufs."); +                break; +            } +            if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { +                rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +                break; +            } +            send_io->_num_frames_in_use++; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req) +{ +    auto dpdk_io = (struct dpdk_io_if*)req->data; +    assert(dpdk_io); +    auto port = dpdk_io->link->get_port(); +    if (dpdk_io->recv_cb) { +        // Remove from RX table only if have a callback. +        struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +            .src_ip                                   = 0, +            .dst_ip                                   = port->get_ipv4(), +            .src_port                                 = 0, +            .dst_port                                 = dpdk_io->link->get_local_port()}; +        void* hash_data; +        if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) { +            // Remove from xport list for this UDP port +            auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); +            rx_entry->remove(dpdk_io); +        } else { +            req->retval = -EINVAL; +            UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table"); +        } +    } +    if (dpdk_io->is_recv) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request..."); +        dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client); +        // Remove from xport list for this NIC port +        auto& xport_list = _recv_xport_map.at(port->get_port_id()); +        xport_list.remove(recv_client); +        while (!rte_ring_empty(recv_client->_recv_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr); +            dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); +        } +        while (!rte_ring_empty(recv_client->_release_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr); +            dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); +        } +    } else { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request..."); +        dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client); +        // Remove from xport list for this NIC port +        auto& xport_list = _tx_queues.at(port->get_port_id()); +        xport_list.remove(send_client); +        while (!rte_ring_empty(send_client->_send_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr); +            dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); +        } +        while (!rte_ring_empty(send_client->_buffer_queue)) { +            frame_buff* buff_ptr; +            rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr); +            dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); +        } +    } +    // Now remove the node if it's on the retry list +    if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) { +        _retry_head = NULL; +    } else if (_retry_head) { +        dpdk_io_if* node = _retry_head->next; +        while (node != _retry_head) { +            if (node == dpdk_io) { +                dpdk_io->prev->next = dpdk_io->next; +                dpdk_io->next->prev = dpdk_io->prev; +                break; +            } +            node = node->next; +        } +    } +    while (_servq.complete(req) == -ENOBUFS) +        ; +} + +int dpdk_io_service::_service_arp_request(dpdk::wait_req* req) +{ +    int status               = 0; +    auto arp_req_data        = (struct dpdk::arp_request*)req->data; +    dpdk::ipv4_addr dst_addr = arp_req_data->tpa; +    auto ctx_sptr            = _ctx.lock(); +    UHD_ASSERT_THROW(ctx_sptr); +    dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port); +    UHD_LOG_TRACE("DPDK::IO_SERVICE", +        "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr)); + +    rte_spinlock_lock(&port->_spinlock); +    struct dpdk::arp_entry* entry = NULL; +    if (port->_arp_table.count(dst_addr) == 0) { +        entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); +        if (!entry) { +            status = -ENOMEM; +            goto arp_end; +        } +        entry = new (entry) dpdk::arp_entry(); +        entry->reqs.push_back(req); +        port->_arp_table[dst_addr] = entry; +        status                     = -EAGAIN; +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request."); +        _send_arp_request(port, 0, arp_req_data->tpa); +    } else { +        entry = port->_arp_table.at(dst_addr); +        if (is_zero_ether_addr(&entry->mac_addr)) { +            UHD_LOG_TRACE("DPDK::IO_SERVICE", +                "ARP: Address in table, but not populated yet. Resending ARP request."); +            port->_arp_table.at(dst_addr)->reqs.push_back(req); +            status = -EAGAIN; +            _send_arp_request(port, 0, arp_req_data->tpa); +        } else { +            UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table."); +            ether_addr_copy(&entry->mac_addr, &arp_req_data->tha); +            status = 0; +        } +    } +arp_end: +    rte_spinlock_unlock(&port->_spinlock); +    return status; +} + +int dpdk_io_service::_send_arp_request( +    dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip) +{ +    struct rte_mbuf* mbuf; +    struct ether_hdr* hdr; +    struct arp_hdr* arp_frame; + +    mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool()); +    if (unlikely(mbuf == NULL)) { +        UHD_LOG_WARNING( +            "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request"); +        return -ENOMEM; +    } + +    hdr       = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); +    arp_frame = (struct arp_hdr*)&hdr[1]; + +    memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); +    hdr->s_addr     = port->get_mac_addr(); +    hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + +    arp_frame->arp_hrd          = rte_cpu_to_be_16(ARP_HRD_ETHER); +    arp_frame->arp_pro          = rte_cpu_to_be_16(ETHER_TYPE_IPv4); +    arp_frame->arp_hln          = 6; +    arp_frame->arp_pln          = 4; +    arp_frame->arp_op           = rte_cpu_to_be_16(ARP_OP_REQUEST); +    arp_frame->arp_data.arp_sha = port->get_mac_addr(); +    arp_frame->arp_data.arp_sip = port->get_ipv4(); +    memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); +    arp_frame->arp_data.arp_tip = ip; + +    mbuf->pkt_len  = 42; +    mbuf->data_len = 42; + +    if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full"); +        rte_pktmbuf_free(mbuf); +        return -EAGAIN; +    } +    return 0; +} + +/* Do a burst of RX on port */ +int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue) +{ +    struct ether_hdr* hdr; +    char* l2_data; +    struct rte_mbuf* bufs[RX_BURST_SIZE]; +    const uint16_t num_rx = +        rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE); +    if (unlikely(num_rx == 0)) { +        return 0; +    } + +    for (int buf = 0; buf < num_rx; buf++) { +        uint64_t ol_flags = bufs[buf]->ol_flags; +        hdr               = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*); +        l2_data           = (char*)&hdr[1]; +        switch (rte_be_to_cpu_16(hdr->ether_type)) { +            case ETHER_TYPE_ARP: +                _process_arp(port, queue, (struct arp_hdr*)l2_data); +                rte_pktmbuf_free(bufs[buf]); +                break; +            case ETHER_TYPE_IPv4: +                if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { +                    UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum"); +                } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) { +                    UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum"); +                } else { +                    _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data); +                } +                break; +            default: +                rte_pktmbuf_free(bufs[buf]); +                break; +        } +    } +    return num_rx; +} + +int dpdk_io_service::_process_arp( +    dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame) +{ +    uint32_t dest_ip            = arp_frame->arp_data.arp_sip; +    struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; +    UHD_LOG_TRACE("DPDK::IO_SERVICE", +        "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> " +                                  << dpdk::eth_addr_to_string(dest_addr)); +    /* Add entry to ARP table */ +    rte_spinlock_lock(&port->_spinlock); +    struct dpdk::arp_entry* entry = NULL; +    if (port->_arp_table.count(dest_ip) == 0) { +        entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); +        if (!entry) { +            return -ENOMEM; +        } +        entry = new (entry) dpdk::arp_entry(); +        ether_addr_copy(&dest_addr, &entry->mac_addr); +        port->_arp_table[dest_ip] = entry; +    } else { +        entry = port->_arp_table.at(dest_ip); +        ether_addr_copy(&dest_addr, &entry->mac_addr); +        for (auto req : entry->reqs) { +            auto arp_data = (struct dpdk::arp_request*)req->data; +            ether_addr_copy(&dest_addr, &arp_data->tha); +            while (_servq.complete(req) == -ENOBUFS) +                ; +        } +        entry->reqs.clear(); +    } +    rte_spinlock_unlock(&port->_spinlock); + +    /* Respond if this was an ARP request */ +    if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) +        && arp_frame->arp_data.arp_tip == port->get_ipv4()) { +        UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply."); +        port->_arp_reply(queue_id, arp_frame); +    } + +    return 0; +} + +int dpdk_io_service::_process_ipv4( +    dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt) +{ +    bool bcast = port->dst_is_broadcast(pkt->dst_addr); +    if (pkt->dst_addr != port->get_ipv4() && !bcast) { +        rte_pktmbuf_free(mbuf); +        return -ENODEV; +    } +    if (pkt->next_proto_id == IPPROTO_UDP) { +        return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast); +    } +    rte_pktmbuf_free(mbuf); +    return -EINVAL; +} + + +int dpdk_io_service::_process_udp( +    dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/) +{ +    // Get the link +    struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, +        .src_ip                                   = 0, +        .dst_ip                                   = port->get_ipv4(), +        .src_port                                 = 0, +        .dst_port                                 = pkt->dst_port}; +    void* hash_data; +    if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table"); +        rte_pktmbuf_free(mbuf); +        return -ENOENT; +    } +    // Get xport list for this UDP port +    auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); +    if (rx_entry->empty()) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link"); +        rte_pktmbuf_free(mbuf); +        return -ENOENT; +    } +    // Turn rte_mbuf -> dpdk_frame_buff +    auto link = rx_entry->front()->link; +    link->enqueue_recv_mbuf(mbuf); +    auto buff       = link->get_recv_buff(0); +    bool rcvr_found = false; +    for (auto client_if : *rx_entry) { +        // Check all the muxed receivers... +        if (client_if->recv_cb(buff, link, link)) { +            rcvr_found = true; +            if (buff) { +                assert(client_if->is_recv); +                auto recv_io  = (dpdk_recv_io*)client_if->io_client; +                auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); +                if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) { +                    rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +                    UHD_LOG_WARNING( +                        "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue"); +                } else { +                    recv_io->_num_frames_in_use++; +                    assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames); +                    _wake_client(client_if); +                } +            } +            break; +        } +    } +    if (!rcvr_found) { +        UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found"); +        // Release the buffer if no receiver found +        link->release_recv_buff(std::move(buff)); +        return -ENOENT; +    } +    return 0; +} + +/* Do a burst of TX on port's tx queues */ +int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port) +{ +    unsigned int total_tx = 0; +    auto& queues          = _tx_queues.at(port->get_port_id()); + +    for (auto& send_io : queues) { +        unsigned int num_tx   = rte_ring_count(send_io->_send_queue); +        num_tx                = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE; +        bool replaced_buffers = false; +        for (unsigned int i = 0; i < num_tx; i++) { +            size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size(); +            if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) { +                break; +            } +            dpdk::dpdk_frame_buff* buff_ptr; +            int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr); +            if (status) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual"); +                break; +            } +            send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link); +            // Attempt to replace buffer +            buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0) +                           .release(); +            if (!buff_ptr) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", +                    "TX mempool out of memory. Please increase dpdk_num_mbufs."); +                send_io->_num_frames_in_use--; +            } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { +                rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +                send_io->_num_frames_in_use--; +            } else { +                replaced_buffers = true; +            } +        } +        if (replaced_buffers) { +            _wake_client(&send_io->_dpdk_io_if); +        } +        total_tx += num_tx; +    } + +    return total_tx; +} + +int dpdk_io_service::_rx_release(dpdk::dpdk_port* port) +{ +    unsigned int total_bufs = 0; +    auto& queues            = _recv_xport_map.at(port->get_port_id()); + +    for (auto& recv_io : queues) { +        unsigned int num_buf = rte_ring_count(recv_io->_release_queue); +        num_buf              = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE; +        for (unsigned int i = 0; i < num_buf; i++) { +            dpdk::dpdk_frame_buff* buff_ptr; +            int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr); +            if (status) { +                UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual"); +                break; +            } +            recv_io->_fc_cb(frame_buff::uptr(buff_ptr), +                recv_io->_dpdk_io_if.link, +                recv_io->_dpdk_io_if.link); +            recv_io->_num_frames_in_use--; +        } +        total_bufs += num_buf; +    } + +    return total_bufs; +} + +uint16_t dpdk_io_service::_get_unique_client_id() +{ +    std::lock_guard<std::mutex> lock(_mutex); +    if (_client_id_set.size() >= MAX_CLIENTS) { +        UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients"); +        throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients"); +    } + +    uint16_t id = _next_client_id++; +    while (_client_id_set.count(id)) { +        id = _next_client_id++; +    } +    _client_id_set.insert(id); +    return id; +} + +void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io) +{ +    dpdk::wait_req* req; +    if (dpdk_io->is_recv) { +        auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client); +        req          = recv_io->_waiter; +    } else { +        auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); +        req          = send_io->_waiter; +    } +    bool stat = req->mutex.try_lock(); +    if (stat) { +        bool active_req = !req->complete; +        if (dpdk_io->next) { +            // On the list: Take it off +            if (dpdk_io->next == dpdk_io) { +                // Only node on the list +                _retry_head = NULL; +            } else { +                // Other nodes are on the list +                if (_retry_head == dpdk_io) { +                    // Move list head to next +                    _retry_head = dpdk_io->next; +                } +                dpdk_io->next->prev = dpdk_io->prev; +                dpdk_io->prev->next = dpdk_io->next; +            } +            dpdk_io->next = NULL; +            dpdk_io->prev = NULL; +        } +        if (active_req) { +            req->complete = true; +            req->cond.notify_one(); +        } +        req->mutex.unlock(); +        if (active_req) { +            wait_req_put(req); +        } +    } else { +        // Put on the retry list, if it isn't already +        if (!dpdk_io->next) { +            if (_retry_head) { +                dpdk_io->next           = _retry_head; +                dpdk_io->prev           = _retry_head->prev; +                _retry_head->prev->next = dpdk_io; +                _retry_head->prev       = dpdk_io; +            } else { +                _retry_head   = dpdk_io; +                dpdk_io->next = dpdk_io; +                dpdk_io->prev = dpdk_io; +            } +        } +    } +} | 
