aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/uhd-dpdk/dpdk_common.cpp
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2019-12-01 21:58:13 -0800
committerBrent Stapleton <brent.stapleton@ettus.com>2019-12-20 16:32:22 -0800
commit4e38eef817813c1bbd8a9cf972e4cf0134d24308 (patch)
treef6200a048a7da5b7b588a4a9aae881ce7551825e /host/lib/transport/uhd-dpdk/dpdk_common.cpp
parent797d54bc2573688eebcb2c639cb07e4ab6d5ab9d (diff)
downloaduhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.gz
uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.bz2
uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.zip
dpdk: Add new DPDK stack to integrate with I/O services
docs: Update DPDK docs with new parameters: Parameter names have had their hyphens changed to underscores, and the I/O CPU argument is now named after the lcores and reflects the naming used by DPDK. transport: Add new udp_dpdk_link, based atop the new APIs: This link is tightly coupled with the DPDK I/O service. The link class carries all the address information to communicate with the other host, and it can send packets directly through the DPDK NIC ports. However, for receiving packets, the I/O service must pull the packets from the DMA queue and attach them to the appropriate link object. The link object merely formats the frame_buff object underneath, which is embedded in the rte_mbuf container. For get_recv_buff, the link will pull buffers only from its internal queue (the one filled by the I/O service). transport: Add DPDK-specific I/O service: The I/O service is split into two parts, the user threads and the I/O worker threads. The user threads submit requests through various appropriate queues, and the I/O threads perform all the I/O on their behalf. This includes routing UDP packets to the correct receiver and getting the MAC address of a destination (by performing the ARP request and handling the ARP replies). The DPDK context stores I/O services. The context spawns all I/O services on init(), and I/O services can be fetched from the dpdk_ctx object by using a port ID. I/O service clients: The clients have two lockless ring buffers. One is to get a buffer from the I/O service; the other is to release a buffer back to the I/O service. Threads sleeping on buffer I/O are kept in a separate list from the service queue and are processed in the course of doing RX or TX. The list nodes are embedded in the dpdk_io_if, and the head of the list is on the dpdk_io_service. The I/O service will transfer the embedded wait_req to the list if it cannot acquire the mutex to complete the condition for waking. Co-authored-by: Martin Braun <martin.braun@ettus.com> Co-authored-by: Ciro Nishiguchi <ciro.nishiguchi@ni.com> Co-authored-by: Brent Stapleton <brent.stapleton@ettus.com>
Diffstat (limited to 'host/lib/transport/uhd-dpdk/dpdk_common.cpp')
-rw-r--r--host/lib/transport/uhd-dpdk/dpdk_common.cpp258
1 files changed, 129 insertions, 129 deletions
diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp
index 43a1507bb..46818e973 100644
--- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp
+++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp
@@ -3,8 +3,13 @@
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
+
+#include <uhd/utils/algorithm.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/dpdk/arp.hpp>
#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/dpdk/udp.hpp>
+#include <uhdlib/transport/dpdk_io_service.hpp>
#include <uhdlib/utils/prefs.hpp>
#include <arpa/inet.h>
#include <rte_arp.h>
@@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512;
inline char* eal_add_opt(
std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)
{
+ UHD_LOG_TRACE("DPDK", opt << " " << arg);
char* ptr = dst;
strncpy(ptr, opt, n);
argv.push_back(ptr);
@@ -34,15 +40,6 @@ inline char* eal_add_opt(
return ptr;
}
-inline std::string eth_addr_to_string(const struct ether_addr mac_addr)
-{
- auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx");
- mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1]
- % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3]
- % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5];
- return mac_stream.str();
-}
-
inline void separate_ipv4_addr(
const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask)
{
@@ -59,28 +56,29 @@ inline void separate_ipv4_addr(
dpdk_port::uptr dpdk_port::make(port_id_t port,
size_t mtu,
uint16_t num_queues,
- size_t num_mbufs,
+ uint16_t num_desc,
struct rte_mempool* rx_pktbuf_pool,
struct rte_mempool* tx_pktbuf_pool,
std::string ipv4_address)
{
return std::make_unique<dpdk_port>(
- port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);
+ port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);
}
dpdk_port::dpdk_port(port_id_t port,
size_t mtu,
uint16_t num_queues,
- size_t num_mbufs,
+ uint16_t num_desc,
struct rte_mempool* rx_pktbuf_pool,
struct rte_mempool* tx_pktbuf_pool,
std::string ipv4_address)
: _port(port)
, _mtu(mtu)
+ , _num_queues(num_queues)
, _rx_pktbuf_pool(rx_pktbuf_pool)
, _tx_pktbuf_pool(tx_pktbuf_pool)
{
- /* 1. Set MTU and IPv4 address */
+ /* Set MTU and IPv4 address */
int retval;
retval = rte_eth_dev_set_mtu(_port, _mtu);
@@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port,
separate_ipv4_addr(ipv4_address, _ipv4, _netmask);
- /* 2. Set hardware offloads */
+ /* Set hardware offloads */
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(_port, &dev_info);
uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM;
@@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port,
throw uhd::runtime_error("DPDK: Failed to configure the device");
}
- /* 3. Set descriptor ring sizes */
- uint16_t rx_desc = num_mbufs;
+ /* Set descriptor ring sizes */
+ uint16_t rx_desc = num_desc;
if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc
|| (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) {
UHD_LOGGER_ERROR("DPDK")
<< boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]")
- % _port % num_mbufs % dev_info.rx_desc_lim.nb_min
+ % _port % num_desc % dev_info.rx_desc_lim.nb_min
% dev_info.rx_desc_lim.nb_max;
UHD_LOGGER_ERROR("DPDK")
<< boost::format("Num RX descriptors must also be aligned to 0x%x")
@@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port,
throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors");
}
- uint16_t tx_desc = num_mbufs;
+ uint16_t tx_desc = num_desc;
if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc
|| (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) {
UHD_LOGGER_ERROR("DPDK")
<< boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]")
- % _port % num_mbufs % dev_info.tx_desc_lim.nb_min
+ % _port % num_desc % dev_info.tx_desc_lim.nb_min
% dev_info.tx_desc_lim.nb_max;
UHD_LOGGER_ERROR("DPDK")
<< boost::format("Num TX descriptors must also be aligned to 0x%x")
@@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port,
throw uhd::runtime_error("DPDK: Failed to configure the DMA queues");
}
- /* 4. Set up the RX and TX DMA queues (May not be generally supported after
+ /* Set up the RX and TX DMA queues (May not be generally supported after
* eth_dev_start) */
unsigned int cpu_socket = rte_eth_dev_socket_id(_port);
for (uint16_t i = 0; i < _num_queues; i++) {
@@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port,
}
}
- /* 5. Set up initial flow table */
+ /* TODO: Enable multiple queues (only support 1 right now) */
- /* Append all free queues except 0, which is reserved for ARP */
- _free_queues.reserve(_num_queues - 1);
- for (unsigned int i = 1; i < _num_queues; i++) {
- _free_queues.push_back(i);
- }
-
- // struct rte_flow_attr flow_attr;
- // flow_attr.group = 0;
- // flow_attr.priority = 1;
- // flow_attr.ingress = 1;
- // flow_attr.egress = 0;
- // flow_attr.transfer = 0;
- // flow_attr.reserved = 0;
-
- // struct rte_flow_item[] flow_pattern = {
- //};
- // int rte_flow_validate(uint16_t port_id,
- // const struct rte_flow_attr *attr,
- // const struct rte_flow_item pattern[],
- // const struct rte_flow_action actions[],
- // struct rte_flow_error *error);
- // struct rte_flow * rte_flow_create(uint16_t port_id,
- // const struct rte_flow_attr *attr,
- // const struct rte_flow_item pattern[],
- // const struct rte_flow_action *actions[],
- // struct rte_flow_error *error);
-
- /* 6. Start the Ethernet device */
+ /* Start the Ethernet device */
retval = rte_eth_dev_start(_port);
if (retval < 0) {
UHD_LOGGER_ERROR("DPDK")
@@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port,
<< " MAC: " << eth_addr_to_string(_mac_addr);
}
-/* TODO: Do flow directions */
-queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[])
+dpdk_port::~dpdk_port()
{
- std::lock_guard<std::mutex> lock(_mutex);
- UHD_ASSERT_THROW(_free_queues.size() != 0);
- auto queue = _free_queues.back();
- _free_queues.pop_back();
- return queue;
+ rte_eth_dev_stop(_port);
+ rte_spinlock_lock(&_spinlock);
+ for (auto kv : _arp_table) {
+ for (auto req : kv.second->reqs) {
+ req->cond.notify_one();
+ }
+ rte_free(kv.second);
+ }
+ _arp_table.clear();
+ rte_spinlock_unlock(&_spinlock);
}
-void dpdk_port::free_queue(queue_id_t queue)
+uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port)
{
+ uint16_t port_selected;
std::lock_guard<std::mutex> lock(_mutex);
- auto flow = _flow_rules.at(queue);
- int status = rte_flow_destroy(_port, flow, NULL);
- if (status) {
- UHD_LOGGER_ERROR("DPDK")
- << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port
- % queue;
- throw uhd::runtime_error("DPDK: Failed to destroy flow rule");
+ if (udp_port) {
+ if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) {
+ return 0;
+ }
+ port_selected = rte_be_to_cpu_16(udp_port);
} else {
- _flow_rules.erase(queue);
+ if (_udp_ports.size() >= 65535) {
+ UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain");
+ return 0;
+ }
+ port_selected = _next_udp_port;
+ while (true) {
+ if (port_selected == 0) {
+ continue;
+ }
+ if (_udp_ports.count(port_selected) == 0) {
+ _next_udp_port = port_selected - 1;
+ break;
+ }
+ if (port_selected - 1 == _next_udp_port) {
+ return 0;
+ }
+ port_selected--;
+ }
}
- _free_queues.push_back(queue);
+ _udp_ports.insert(port_selected);
+ return rte_cpu_to_be_16(port_selected);
}
-int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req)
+int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req)
{
struct rte_mbuf* mbuf;
struct ether_hdr* hdr;
struct arp_hdr* arp_frame;
- mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool);
+ mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool);
if (unlikely(mbuf == NULL)) {
UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response");
return -ENOMEM;
@@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar
mbuf->pkt_len = 42;
mbuf->data_len = 42;
- // ARP replies always on queue 0
- if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) {
+ if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) {
UHD_LOGGER_WARNING("DPDK")
<< boost::format("%s: TX descriptor ring is full") % __func__;
rte_pktmbuf_free(mbuf);
@@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar
return 0;
}
-// TODO: ARP processing for queue 0
-// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr
-// *arp_frame)
-//{
-// std::lock_guard<std::mutex> lock(_mutex);
-// uint32_t dest_ip = arp_frame->arp_data.arp_sip;
-// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
-//
-// /* Add entry to ARP table */
-// struct uhd_dpdk_arp_entry *entry = NULL;
-// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry);
-// if (!entry) {
-// entry = rte_zmalloc(NULL, sizeof(*entry), 0);
-// if (!entry) {
-// return -ENOMEM;
-// }
-// LIST_INIT(&entry->pending_list);
-// ether_addr_copy(&dest_addr, &entry->mac_addr);
-// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) {
-// rte_free(entry);
-// return -ENOSPC;
-// }
-// } else {
-// struct uhd_dpdk_config_req *req = NULL;
-// ether_addr_copy(&dest_addr, &entry->mac_addr);
-// /* Now wake any config reqs waiting for the ARP */
-// LIST_FOREACH(req, &entry->pending_list, entry) {
-// _uhd_dpdk_config_req_compl(req, 0);
-// }
-// while (entry->pending_list.lh_first != NULL) {
-// LIST_REMOVE(entry->pending_list.lh_first, entry);
-// }
-// }
-// /* Respond if this was an ARP request */
-// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) &&
-// arp_frame->arp_data.arp_tip == port->ipv4_addr) {
-// _arp_reply(tx_pktbuf_pool, arp_frame);
-// }
-//
-// return 0;
-//
-//}
-
-static dpdk_ctx* global_ctx = nullptr;
+static dpdk_ctx::sptr global_ctx = nullptr;
static std::mutex global_ctx_mutex;
dpdk_ctx::sptr dpdk_ctx::get()
{
std::lock_guard<std::mutex> lock(global_ctx_mutex);
if (!global_ctx) {
- auto new_ctx = std::make_shared<dpdk_ctx>();
- global_ctx = new_ctx.get();
- return new_ctx;
+ global_ctx = std::make_shared<dpdk_ctx>();
}
- return global_ctx->shared_from_this();
+ return global_ctx;
}
dpdk_ctx::dpdk_ctx(void) : _init_done(false) {}
@@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args)
auto args = new std::array<char, 4096>();
char* opt = args->data();
char* end = args->data() + args->size();
+ UHD_LOG_TRACE("DPDK", "EAL init options: ");
for (std::string& key : eal_args.keys()) {
std::string val = eal_args[key];
if (key == "dpdk_coremask") {
@@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args)
_num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS);
_mbuf_cache_size =
dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE);
+ UHD_LOG_TRACE("DPDK",
+ "mtu: " << _mtu << " num_mbufs: " << _num_mbufs
+ << " mbuf_cache_size: " << _mbuf_cache_size);
/* Get device info for all the NIC ports */
int num_dpdk_ports = rte_eth_dev_count_avail();
- UHD_ASSERT_THROW(num_dpdk_ports > 0);
+ if (num_dpdk_ports == 0) {
+ UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!");
+ throw uhd::runtime_error("No available DPDK devices (ports) found!");
+ }
device_addrs_t nics(num_dpdk_ports);
RTE_ETH_FOREACH_DEV(i)
{
@@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args)
}
/* Now combine user args with conf file */
auto conf = uhd::prefs::get_dpdk_nic_args(nic);
+ // TODO: Enable the use of multiple DMA queues
+ conf["dpdk_num_queues"] = "1";
/* Update config, and remove ports that aren't fully configured */
if (conf.has_key("dpdk_ipv4")) {
@@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args)
}
}
+ std::map<size_t, std::vector<size_t>> lcore_to_port_id_map;
RTE_ETH_FOREACH_DEV(i)
{
auto& conf = nics.at(i);
if (conf.has_key("dpdk_ipv4")) {
+ UHD_ASSERT_THROW(conf.has_key("dpdk_lcore"));
+ const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0);
+ if (!lcore_to_port_id_map.count(lcore_id)) {
+ lcore_to_port_id_map.insert({lcore_id, {}});
+ }
+
// Allocating enough buffers for all DMA queues for each CPU socket
// - This is a bit inefficient for larger systems, since NICs may not
// all be on one socket
@@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args)
_ports[i] = dpdk_port::make(i,
_mtu,
conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()),
- _num_mbufs,
+ conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE),
rx_pool,
tx_pool,
conf["dpdk_ipv4"]);
+
+ // Remember all port IDs that map to an lcore
+ lcore_to_port_id_map.at(lcore_id).push_back(i);
}
}
@@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args)
rte_eth_link_get(portid, &link);
unsigned int link_status = link.link_status;
unsigned int link_speed = link.link_speed;
- UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n")
- % portid % link_status % link_speed;
+ UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid
+ % link_status % link_speed;
}
- UHD_LOG_TRACE("DPDK", "Init DONE!");
-
+ UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services");
_init_done = true;
+
+ // Links are up, now create one IO service per lcore
+ for (auto& lcore_portids_pair : lcore_to_port_id_map) {
+ const size_t lcore_id = lcore_portids_pair.first;
+ std::vector<dpdk_port*> dpdk_ports;
+ dpdk_ports.reserve(lcore_portids_pair.second.size());
+ for (const size_t port_id : lcore_portids_pair.second) {
+ dpdk_ports.push_back(get_port(port_id));
+ }
+ const size_t servq_depth = 32; // FIXME
+ UHD_LOG_TRACE("DPDK",
+ "Creating I/O service for lcore "
+ << lcore_id << ", servicing " << dpdk_ports.size()
+ << " ports, service queue depth " << servq_depth);
+ _io_srv_portid_map.insert(
+ {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth),
+ lcore_portids_pair.second});
+ }
}
}
@@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const
return link.link_status;
}
-int dpdk_ctx::get_route(const std::string& addr) const
+dpdk_port* dpdk_ctx::get_route(const std::string& addr) const
{
const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
for (const auto& port : _ports) {
@@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const
uint32_t src_ipv4 = port.second->get_ipv4();
uint32_t netmask = port.second->get_netmask();
if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) {
- return (int)port.first;
+ return port.second.get();
}
}
- return -ENODEV;
+ return NULL;
}
@@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const
return _init_done.load();
}
+uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id)
+{
+ for (auto& io_srv_portid_pair : _io_srv_portid_map) {
+ if (uhd::has(io_srv_portid_pair.second, port_id)) {
+ return io_srv_portid_pair.first;
+ }
+ }
+
+ std::string err_msg = std::string("Cannot look up I/O service for port ID: ")
+ + std::to_string(port_id) + ". No such port ID!";
+ UHD_LOG_ERROR("DPDK", err_msg);
+ throw uhd::lookup_error(err_msg);
+}
+
struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(
unsigned int cpu_socket, size_t num_bufs)
{
@@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(
const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM;
char name[32];
snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket);
- _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(
- name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY);
+ _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name,
+ num_bufs,
+ _mbuf_cache_size,
+ DPDK_MBUF_PRIV_SIZE,
+ mbuf_size,
+ SOCKET_ID_ANY);
if (!_rx_pktbuf_pools.at(cpu_socket)) {
UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool");
throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool");