aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2018-09-12 06:37:06 -0700
committerBrent Stapleton <brent.stapleton@ettus.com>2019-01-15 17:14:57 -0800
commitd2adc9bfdf72d1830d748c411ac9b2b43ebe740b (patch)
treeb45658e7585d62a78ef8dfbcbc5db3db695e847f /host
parente2195ac505bd423d3d2f973bbe94da1c78296aa6 (diff)
downloaduhd-d2adc9bfdf72d1830d748c411ac9b2b43ebe740b.tar.gz
uhd-d2adc9bfdf72d1830d748c411ac9b2b43ebe740b.tar.bz2
uhd-d2adc9bfdf72d1830d748c411ac9b2b43ebe740b.zip
transport: Add dpdk_zero_copy transport
This transport is based on uhd-dpdk, and it includes a global context that must be initialized prior to creating any dpdk_zero_copy objects.
Diffstat (limited to 'host')
-rw-r--r--host/lib/transport/CMakeLists.txt3
-rw-r--r--host/lib/transport/dpdk_zero_copy.cpp418
-rw-r--r--host/lib/transport/dpdk_zero_copy.hpp106
3 files changed, 527 insertions, 0 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index a7f986277..63d44c80b 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -139,6 +139,9 @@ endif(ENABLE_LIBERIO)
if(ENABLE_DPDK)
INCLUDE_SUBDIRECTORY(uhd-dpdk)
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_zero_copy.cpp
+ )
endif(ENABLE_DPDK)
# Verbose Debug output for send/recv
diff --git a/host/lib/transport/dpdk_zero_copy.cpp b/host/lib/transport/dpdk_zero_copy.cpp
new file mode 100644
index 000000000..fab2098c1
--- /dev/null
+++ b/host/lib/transport/dpdk_zero_copy.cpp
@@ -0,0 +1,418 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "dpdk_zero_copy.hpp"
+#include <uhd/config.hpp>
+#include <uhd/utils/static.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/transport/uhd-dpdk.h>
+#include <boost/make_shared.hpp>
+#include <sys/syslog.h>
+#include <stack>
+#include <arpa/inet.h>
+
+namespace uhd { namespace transport {
+
+namespace {
+ static constexpr uint64_t USEC = 1000000;
+ // FIXME: Make configurable and have uhd-dpdk library track buffer sizes
+ static constexpr size_t DEFAULT_FRAME_SIZE = 8000;
+
+ inline char * eal_add_opt(std::vector<const char*>& argv, size_t n,
+ char *dst, const char *opt, const char *arg)
+ {
+ char *ptr = dst;
+ strncpy(ptr, opt, n);
+ argv.push_back(ptr);
+ ptr += strlen(opt) + 1;
+ n -= ptr - dst;
+ strncpy(ptr, arg, n);
+ argv.push_back(ptr);
+ ptr += strlen(arg) + 1;
+ return ptr;
+ }
+}
+
+uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {}
+
+uhd_dpdk_ctx::~uhd_dpdk_ctx(void) {}
+
+/* Initialize uhd-dpdk (and do only once) */
+void uhd_dpdk_ctx::init(const dict<std::string, std::string> &eal_args,
+ unsigned int num_ports, int *port_thread_mapping,
+ int num_mbufs, int mbuf_cache_size, size_t mtu)
+{
+ std::lock_guard<std::mutex> lock(_init_mutex);
+ if (!_init_done) {
+ _mtu = mtu;
+ /* Build up argc and argv */
+ std::vector<const char *> argv;
+ argv.push_back("uhd-dpdk");
+ char *args = new char[4096];
+ char *opt = args;
+ char *end = args + sizeof(args);
+ for (std::string &key : eal_args.keys()) {
+ std::string val = eal_args[key];
+ if (key == "coremask") {
+ opt = eal_add_opt(argv, end - opt, opt, "-c",
+ val.c_str());
+ } else if (key == "corelist") {
+ /* NOTE: This arg may have commas, so limited to config file */
+ opt = eal_add_opt(argv, end - opt, opt, "-l",
+ val.c_str());
+ } else if (key == "coremap") {
+ opt = eal_add_opt(argv, end - opt, opt, "--lcores",
+ val.c_str());
+ } else if (key == "master-lcore") {
+ opt = eal_add_opt(argv, end - opt, opt, "--master-lcore",
+ val.c_str());
+ } else if (key == "pci-blacklist") {
+ opt = eal_add_opt(argv, end - opt, opt, "-b",
+ val.c_str());
+ } else if (key == "pci-whitelist") {
+ opt = eal_add_opt(argv, end - opt, opt, "-w",
+ val.c_str());
+ } else if (key == "log-level") {
+ opt = eal_add_opt(argv, end - opt, opt, "--log-level",
+ val.c_str());
+ } else if (key == "huge-dir") {
+ opt = eal_add_opt(argv, end - opt, opt, "--huge-dir",
+ val.c_str());
+ } else if (key == "file-prefix") {
+ opt = eal_add_opt(argv, end - opt, opt, "--file-prefix",
+ val.c_str());
+ }
+ }
+ uhd_dpdk_init(argv.size(), argv.data(), num_ports, port_thread_mapping, num_mbufs,
+ mbuf_cache_size, _mtu);
+ delete args;
+ _init_done = true;
+ }
+}
+
+int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr,
+ unsigned int &port_id)
+{
+ UHD_ASSERT_THROW(is_init_done());
+ int num_ports = uhd_dpdk_port_count();
+ for (int i = 0; i < num_ports; i++) {
+ struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int) i);
+ for (int j = 0; j < 6; j++) {
+ if (mac_addr[j] != port_mac_addr.addr[j]) {
+ break;
+ }
+ if (j == 5) {
+ port_id = (unsigned int) i;
+ return 0;
+ }
+ }
+ }
+ return -1;
+}
+
+int uhd_dpdk_ctx::get_route(const std::string &addr) const
+{
+ const uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
+ const unsigned int num_ports = uhd_dpdk_port_count();
+ for (unsigned int port = 0; port < num_ports; port++) {
+ uint32_t src_ipv4;
+ uint32_t netmask;
+ uhd_dpdk_get_ipv4_addr(port, &src_ipv4, &netmask);
+ if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) {
+ return (int) port;
+ }
+ }
+ return -ENODEV;
+}
+
+int uhd_dpdk_ctx::set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr,
+ uint32_t netmask)
+{
+ return uhd_dpdk_set_ipv4_addr(port_id, ipv4_addr, netmask);
+}
+
+bool uhd_dpdk_ctx::is_init_done(void)
+{
+ return _init_done.load();
+}
+
+
+class dpdk_zero_copy_msb : public managed_send_buffer {
+public:
+ dpdk_zero_copy_msb(struct uhd_dpdk_socket *sock,
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &free_bufs)
+ : _sock(sock), _buf(nullptr), _free_bufs(free_bufs) {};
+
+ ~dpdk_zero_copy_msb(void) {}
+
+ void release(void)
+ {
+ if (_buf) {
+ _buf->pkt_len = _length;
+ _buf->data_len = _length;
+ int num_tx = uhd_dpdk_send(_sock, &_buf, 1);
+ if (num_tx == 0) {
+ /* Drop packet and free buffer (do not share sockets!) */
+ UHD_LOG_ERROR("DPDK", "Invalid shared socket usage detected. Dropping packet...");
+ uhd_dpdk_free_buf(_buf);
+ }
+ // Push back into pool
+ _free_bufs.push(this);
+ }
+ }
+
+ sptr get_new(double timeout)
+ {
+ int bufs = uhd_dpdk_request_tx_bufs(_sock, &_buf, 1, timeout);
+ if (bufs != 1 || !_buf)
+ return sptr();
+
+ return make(this, uhd_dpdk_buf_to_data(_sock, _buf),
+ DEFAULT_FRAME_SIZE);
+ }
+
+private:
+ struct uhd_dpdk_socket *_sock;
+ struct rte_mbuf *_buf;
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &_free_bufs;
+};
+
+class dpdk_zero_copy_mrb : public managed_recv_buffer {
+public:
+ dpdk_zero_copy_mrb(struct uhd_dpdk_socket *sock,
+ std::stack<dpdk_zero_copy_mrb *, std::vector<dpdk_zero_copy_mrb *>> &free_bufs)
+ : _sock(sock), _buf(nullptr), _free_bufs(free_bufs) {};
+ ~dpdk_zero_copy_mrb(void) {}
+
+ void release(void)
+ {
+ if (_buf) {
+ uhd_dpdk_free_buf(_buf);
+ _free_bufs.push(this);
+ }
+ }
+
+ sptr get_new(double timeout)
+ {
+ int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int) (timeout*USEC));
+ if (bufs != 1 || _buf == nullptr) {
+ // Push back into pool if we didn't get a real buffer
+ _free_bufs.push(this);
+ return sptr();
+ }
+ if ((_buf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
+ UHD_LOG_WARNING("DPDK", "IP checksum failure detected. Dropping packet...");
+ uhd_dpdk_free_buf(_buf);
+ _free_bufs.push(this);
+ return sptr();
+ }
+
+ return make(this, uhd_dpdk_buf_to_data(_sock, _buf),
+ uhd_dpdk_get_len(_sock, _buf));
+ }
+
+private:
+ struct uhd_dpdk_socket *_sock;
+ struct rte_mbuf *_buf;
+ std::stack<dpdk_zero_copy_mrb *, std::vector<dpdk_zero_copy_mrb *>> &_free_bufs;
+};
+
+class dpdk_zero_copy_impl : public dpdk_zero_copy {
+public:
+
+ dpdk_zero_copy_impl(struct uhd_dpdk_ctx &ctx,
+ const unsigned int dpdk_port_id,
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port,
+ const zero_copy_xport_params& xport_params)
+ : _num_send_frames(xport_params.num_send_frames),
+ _send_frame_size(xport_params.send_frame_size),
+ _num_recv_frames(xport_params.num_recv_frames),
+ _recv_frame_size(xport_params.recv_frame_size),
+ _port_id(dpdk_port_id),
+ _rx_empty_count(0),
+ _tx_empty_count(0)
+ {
+ // TODO: Handle xport_params
+ UHD_ASSERT_THROW(xport_params.recv_frame_size > 0);
+ UHD_ASSERT_THROW(xport_params.send_frame_size > 0);
+ UHD_ASSERT_THROW(xport_params.num_send_frames > 0);
+ UHD_ASSERT_THROW(xport_params.num_recv_frames > 0);
+
+ UHD_ASSERT_THROW(ctx.is_init_done());
+
+ const int num_ports = uhd_dpdk_port_count();
+ UHD_ASSERT_THROW(num_ports > 0);
+ UHD_ASSERT_THROW(dpdk_port_id < (unsigned int) num_ports);
+
+ // Convert ipv4 addr from string to uint32_t, network format
+ uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
+ // Convert port from string to uint16_t, network format
+ uint16_t dst_port = htons(std::stoi(remote_port, NULL, 0));
+ uint16_t src_port = htons(std::stoi(local_port, NULL, 0));
+
+ // Create RX socket first
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .local_port = src_port,
+ .remote_port = dst_port,
+ .dst_addr = dst_ipv4
+ };
+ _rx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
+ UHD_ASSERT_THROW(_rx_sock != nullptr);
+
+ // Backfill the local port, in case it was auto-assigned
+ uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+
+ sockarg.is_tx = true;
+ sockarg.remote_port = dst_port;
+ sockarg.dst_addr = dst_ipv4;
+ _tx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
+ UHD_ASSERT_THROW(_tx_sock != nullptr);
+
+ // Create managed_buffer containers
+ for (size_t i = 0; i < _num_recv_frames; i++) {
+ _mrb_pool.push(new dpdk_zero_copy_mrb(_rx_sock, _mrb_pool));
+ }
+ for (size_t i = 0; i < _num_send_frames; i++) {
+ _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool));
+ }
+
+ UHD_LOG_TRACE("DPDK", "Created transports between " << addr << ":"
+ << remote_port << " and NIC(" << dpdk_port_id
+ << "):" << ntohs(sockarg.local_port));
+ }
+
+ ~dpdk_zero_copy_impl(void)
+ {
+ struct uhd_dpdk_sockarg_udp sockarg;
+ size_t count;
+ uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+ uhd_dpdk_get_drop_count(_rx_sock, &count);
+ UHD_LOG_TRACE("DPDK", "Closing transports between " << sockarg.dst_addr << ":"
+ << ntohs(sockarg.remote_port) << " and local:"
+ << ntohs(sockarg.local_port));
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << " Dropped "<< count << " packets");
+ uhd_dpdk_get_xfer_count(_rx_sock, &count);
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << " Received "<< count << " packets");
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << "RX empty count is " << _rx_empty_count);
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << "TX empty count is " << _tx_empty_count);
+ uhd_dpdk_sock_close(_rx_sock);
+ uhd_dpdk_sock_close(_tx_sock);
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout = 0.1)
+ {
+ if (_mrb_pool.empty()) {
+ _rx_empty_count++;
+ return managed_recv_buffer::sptr();
+ }
+
+ dpdk_zero_copy_mrb *mrb = _mrb_pool.top();
+ _mrb_pool.pop();
+ managed_recv_buffer::sptr buff = mrb->get_new(timeout);
+ if (!buff)
+ _rx_empty_count++;
+ return buff;
+ }
+
+ size_t get_num_recv_frames(void) const
+ {
+ return _num_recv_frames;
+ }
+
+ size_t get_recv_frame_size(void) const
+ {
+ return _recv_frame_size;
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout = 0.1)
+ {
+ if (_msb_pool.empty()) {
+ _tx_empty_count++;
+ return managed_send_buffer::sptr();
+ }
+
+ dpdk_zero_copy_msb *msb = _msb_pool.top();
+ _msb_pool.pop();
+ managed_send_buffer::sptr buff = msb->get_new(timeout);
+ if (!buff)
+ _tx_empty_count++;
+ return buff;
+ }
+
+ size_t get_num_send_frames(void) const
+ {
+ return _num_send_frames;
+ }
+
+ size_t get_send_frame_size(void) const
+ {
+ return _send_frame_size;
+ }
+
+ uint16_t get_local_port(void) const
+ {
+ struct uhd_dpdk_sockarg_udp sockarg;
+ int status = uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+ return ntohs(sockarg.local_port);
+ }
+
+ std::string get_local_addr(void) const
+ {
+ uint32_t ipv4_addr;
+ int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr, NULL);
+ auto retval = std::to_string(ipv4_addr >> 0 & 0xff) +
+ "." +
+ std::to_string(ipv4_addr >> 8 & 0xff) +
+ "." +
+ std::to_string(ipv4_addr >> 16 & 0xff) +
+ "." +
+ std::to_string(ipv4_addr >> 24 & 0xff);
+ return retval;
+ }
+
+ uint32_t get_drop_count(void) const
+ {
+ size_t drop_count = 0;
+ uhd_dpdk_get_drop_count(_rx_sock, &drop_count);
+ return drop_count;
+ }
+private:
+ struct uhd_dpdk_socket *_rx_sock;
+ struct uhd_dpdk_socket *_tx_sock;
+ const size_t _num_send_frames;
+ const size_t _send_frame_size;
+ const size_t _num_recv_frames;
+ const size_t _recv_frame_size;
+ const unsigned int _port_id;
+ unsigned int _rx_empty_count;
+ unsigned int _tx_empty_count;
+
+ std::stack<dpdk_zero_copy_mrb *, std::vector<dpdk_zero_copy_mrb *>> _mrb_pool;
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> _msb_pool;
+};
+
+dpdk_zero_copy::sptr dpdk_zero_copy::make(
+ struct uhd_dpdk_ctx &ctx,
+ const unsigned int dpdk_port_id,
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port,
+ const zero_copy_xport_params &default_buff_args,
+ const device_addr_t &hints)
+{
+ return dpdk_zero_copy::sptr(
+ new dpdk_zero_copy_impl(ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args)
+ );
+}
+
+}}
diff --git a/host/lib/transport/dpdk_zero_copy.hpp b/host/lib/transport/dpdk_zero_copy.hpp
new file mode 100644
index 000000000..0e5e30285
--- /dev/null
+++ b/host/lib/transport/dpdk_zero_copy.hpp
@@ -0,0 +1,106 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef DPDK_ZERO_COPY_HPP
+#define DPDK_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <uhd/types/device_addr.hpp>
+#include <uhd/utils/static.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
+#include <vector>
+#include <mutex>
+
+
+namespace uhd { namespace transport {
+
+class uhd_dpdk_ctx : boost::noncopyable {
+public:
+ UHD_SINGLETON_FCN(uhd_dpdk_ctx, get);
+
+ ~uhd_dpdk_ctx(void);
+
+ /*!
+ * Initialize uhd-dpdk (and do only once)
+ * \param eal_args Arguments to pass to DPDK rte_eal_init() function
+ * \param num_ports Size of port_thread_mapping array (also number in use)
+ * \param port_thread_mapping Map NICs to threads: index=port, value=thread
+ * \param num_mbufs Number of packet buffers for each port's memory pool
+ * \param mbuf_cache_size Size of per-core packet buffer cache from mempool
+ * \param mtu MTU of NIC ports
+ */
+ void init(const dict<std::string, std::string> &eal_args, unsigned int num_ports,
+ int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
+ size_t mtu);
+
+ /*!
+ * Get port ID from provided MAC address
+ * \param mac_addr MAC address
+ * \param port_id Int to write ID of port corresponding to MAC address
+ * \return 0 if match found, else no match
+ */
+ int get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int &port_id);
+
+ /*!
+ * Get port ID for routing packet destined for given address
+ * \param addr Destination address
+ * \return port ID from routing table
+ */
+ int get_route(const std::string &addr) const;
+
+ /*!
+ * Set IPv4 address and subnet mask of given NIC port
+ * Not thread-safe. Should only be written before ports are in use.
+ * \param port_id NIC port ID
+ * \param ipv4_addr IPv4 address to write
+ * \param netmask Subnet mask identifying network number in ipv4_addr
+ * \return 0 if successful, else error
+ */
+ int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask);
+
+ /*!
+ * \return whether init() has been called
+ */
+ bool is_init_done(void);
+
+private:
+ uhd_dpdk_ctx(void);
+
+ size_t _mtu;
+ std::mutex _init_mutex;
+ std::atomic<bool> _init_done;
+};
+
+/*!
+ * A zero copy transport interface to the dpdk DMA library.
+ */
+class dpdk_zero_copy : public virtual zero_copy_if {
+public:
+ typedef boost::shared_ptr<dpdk_zero_copy> sptr;
+
+ static sptr make(
+ struct uhd_dpdk_ctx &ctx,
+ const unsigned int dpdk_port_id,
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port, /* 0 = auto-assign */
+ const zero_copy_xport_params &default_buff_args,
+ const device_addr_t &hints
+ );
+
+ virtual uint16_t get_local_port(void) const = 0;
+
+ virtual std::string get_local_addr(void) const = 0;
+
+ virtual uint32_t get_drop_count(void) const = 0;
+};
+
+}}
+
+#endif /* DPDK_ZERO_COPY_HPP */