aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
Diffstat (limited to 'host')
-rw-r--r--host/lib/include/uhdlib/transport/dpdk_common.hpp80
-rw-r--r--host/lib/include/uhdlib/transport/dpdk_simple.hpp95
-rw-r--r--host/lib/include/uhdlib/transport/uhd-dpdk.h33
-rw-r--r--host/lib/include/uhdlib/utils/prefs.hpp34
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/dpdk_simple.cpp192
-rw-r--r--host/lib/transport/dpdk_zero_copy.cpp405
-rw-r--r--host/lib/transport/dpdk_zero_copy.hpp86
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk.c49
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h13
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c19
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h3
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c74
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h1
-rw-r--r--host/lib/usrp/mpmd/CMakeLists.txt11
-rw-r--r--host/lib/usrp/mpmd/mpmd_find.cpp10
-rw-r--r--host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp266
-rw-r--r--host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp55
-rw-r--r--host/lib/usrp/mpmd/mpmd_xport_mgr.cpp8
-rw-r--r--host/lib/utils/prefs.cpp45
-rw-r--r--host/tests/CMakeLists.txt4
-rw-r--r--host/tests/dpdk_test.cpp80
22 files changed, 1220 insertions, 344 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk_common.hpp b/host/lib/include/uhdlib/transport/dpdk_common.hpp
new file mode 100644
index 000000000..2f320e79e
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk_common.hpp
@@ -0,0 +1,80 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_DPDK_COMMON_HPP
+#define INCLUDED_DPDK_COMMON_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/types/device_addr.hpp>
+#include <uhd/utils/static.hpp>
+#include <array>
+#include <atomic>
+#include <mutex>
+#include <string>
+
+namespace uhd { namespace transport {
+
+class uhd_dpdk_ctx : boost::noncopyable {
+public:
+ UHD_SINGLETON_FCN(uhd_dpdk_ctx, get);
+
+ ~uhd_dpdk_ctx(void);
+
+ /*!
+ * Initialize uhd-dpdk (and do only once)
+ * \param user_args User args passed in to override config files
+ */
+ void init(const device_addr_t &user_args);
+
+ /*!
+ * Get MTU of NICs used by DPDK
+ *
+ * \return Number of Bytes in MTU
+ */
+ size_t get_mtu(void) const;
+
+ /*!
+ * Get port ID from provided MAC address
+ * \param mac_addr MAC address
+ * \param port_id Int to write ID of port corresponding to MAC address
+ * \return 0 if match found, else no match
+ */
+ int get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int &port_id) const;
+
+ /*!
+ * Get port ID for routing packet destined for given address
+ * \param addr Destination address
+ * \return port ID from routing table
+ */
+ int get_route(const std::string &addr) const;
+
+ /*!
+ * Set IPv4 address and subnet mask of given NIC port
+ * Not thread-safe. Should only be written before ports are in use.
+ * \param port_id NIC port ID
+ * \param ipv4_addr IPv4 address to write
+ * \param netmask Subnet mask identifying network number in ipv4_addr
+ * \return 0 if successful, else error
+ */
+ int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask);
+
+ /*!
+ * \return whether init() has been called
+ */
+ bool is_init_done(void) const;
+
+private:
+ uhd_dpdk_ctx(void);
+
+ size_t _mtu;
+ std::mutex _init_mutex;
+ std::atomic<bool> _init_done;
+ uhd::dict<uint32_t, unsigned int> _routes;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_DPDK_COMMON_HPP */
diff --git a/host/lib/include/uhdlib/transport/dpdk_simple.hpp b/host/lib/include/uhdlib/transport/dpdk_simple.hpp
new file mode 100644
index 000000000..62728b38d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk_simple.hpp
@@ -0,0 +1,95 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_DPDK_SIMPLE_HPP
+#define INCLUDED_DPDK_SIMPLE_HPP
+
+#include <uhdlib/transport/dpdk_common.hpp>
+
+namespace uhd { namespace transport {
+
+class dpdk_simple : boost::noncopyable
+{
+public:
+ typedef boost::shared_ptr<dpdk_simple> sptr;
+
+ virtual ~dpdk_simple(void) = 0;
+
+ /*!
+ * Make a new connected dpdk transport:
+ * This transport is for sending and receiving
+ * between this host and a single endpoint.
+ * The primary usage for this transport will be control transactions.
+ *
+ * The address must be an ipv4 address.
+ * The port must be a number.
+ *
+ * \param addr a string representing the destination address
+ * \param port a string representing the destination port
+ */
+ static sptr make_connected(struct uhd_dpdk_ctx &ctx,
+ const std::string &addr, const std::string &port);
+
+ /*!
+ * Make a new broadcasting dpdk transport:
+ * This transport can send broadcast datagrams
+ * and receive datagrams from multiple sources.
+ * The primary usage for this transport will be to discover devices.
+ *
+ * The address must be an ipv4 address.
+ * The port must be a number.
+ *
+ * \param addr a string representing the destination address
+ * \param port a string representing the destination port
+ */
+ static sptr make_broadcast(struct uhd_dpdk_ctx &ctx,
+ const std::string &addr, const std::string &port);
+
+ /*!
+ * Request a single send buffer of specified size.
+ *
+ * \param buf a pointer to place to write buffer location
+ * \return the maximum length of the buffer in Bytes
+ */
+ virtual size_t get_tx_buf(void** buf) = 0;
+
+ /*!
+ * Send and release outstanding buffer
+ *
+ * \param number of bytes sent (releases buffer if sent)
+ */
+ virtual size_t send(size_t length) = 0;
+
+ /*!
+ * Receive a single packet.
+ * Buffer provided by transport (must be freed).
+ *
+ * \param buf a pointer to place to write buffer location
+ * \param timeout the timeout in seconds
+ * \return the number of bytes received or zero on timeout
+ */
+ virtual size_t recv(void **buf, double timeout = 0.1) = 0;
+
+ /*!
+ * Return/free receive buffer
+ */
+ virtual void put_rx_buf(void) = 0;
+
+ /*!
+ * Get the last IP address as seen by recv().
+ * Only use this with the broadcast socket.
+ */
+ virtual std::string get_recv_addr(void) = 0;
+
+ /*!
+ * Get the IP address for the destination
+ */
+ virtual std::string get_send_addr(void) = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_DPDK_SIMPLE_HPP */
diff --git a/host/lib/include/uhdlib/transport/uhd-dpdk.h b/host/lib/include/uhdlib/transport/uhd-dpdk.h
index 8d46912bd..ae7d31383 100644
--- a/host/lib/include/uhdlib/transport/uhd-dpdk.h
+++ b/host/lib/include/uhdlib/transport/uhd-dpdk.h
@@ -31,12 +31,22 @@ enum uhd_dpdk_sock_type {
};
/**
- * Init UHD-DPDK environment and bring up ports (link UP).
- *
- * Offload capabilities will be used if available
+ * Init UHD-DPDK environment, including DPDK's EAL.
+ * This will make available information about the DPDK-assigned NIC devices.
*
* @param argc passed directly to rte_eal_init()
* @param argv passed directly to rte_eal_init()
+ *
+ * @return Returns negative error code if there were issues, else 0
+ */
+int uhd_dpdk_init(int argc, const char **argv);
+
+/**
+ * Start UHD-DPDK networking stack. Bring ports up (link UP).
+ * uhd_dpdk_init() must be called first.
+ *
+ * Offload capabilities will be used if available
+ *
* @param num_ports number of network interfaces to map
* @param port_thread_mapping array of num_ports entries specifying which thread
* will drive the I/O for a given port (determined by array index)
@@ -47,9 +57,8 @@ enum uhd_dpdk_sock_type {
*
* @return Returns negative error code if there were issues, else 0
*/
-int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
- int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
- int mtu);
+int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping,
+ int num_mbufs, int mbuf_cache_size, int mtu);
/**
* @return Returns number of ports registered to DPDK.
@@ -58,6 +67,12 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
int uhd_dpdk_port_count(void);
/**
+ * @return Returns 0 if link is down, 1 if link is up, and negative error code
+ * if error occurred.
+ */
+int uhd_dpdk_port_link_status(unsigned int portid);
+
+/**
* @return Returns Ethernet MAC address of requested port
*
* @param portid ID number of network interface
@@ -121,17 +136,21 @@ int uhd_dpdk_sock_close(struct uhd_dpdk_socket *sock);
/**
* Arguments for a UDP socket
- * All data should be provided in network format
+ * All address/port data should be provided in network format
*/
struct uhd_dpdk_sockarg_udp {
/*! True for TX socket, false for RX socket */
bool is_tx;
+ /*! True to filter broadcast packets, else recv */
+ bool filter_bcast;
/*! Local udp port. This is dst_port for RX, src_port for TX */
uint16_t local_port;
/*! Remote udp port. This is dst_port for TX */
uint16_t remote_port;
/*! IPv4 address for destination (TX) */
uint32_t dst_addr;
+ /*! Number of buffers in ring */
+ size_t num_bufs;
};
/**
diff --git a/host/lib/include/uhdlib/utils/prefs.hpp b/host/lib/include/uhdlib/utils/prefs.hpp
index 62831a875..e528450cd 100644
--- a/host/lib/include/uhdlib/utils/prefs.hpp
+++ b/host/lib/include/uhdlib/utils/prefs.hpp
@@ -48,6 +48,40 @@ namespace uhd { namespace prefs {
*/
uhd::device_addr_t get_usrp_args(const uhd::device_addr_t &user_args);
+ /*! Convenience function to update global DPDK args with settings from
+ * config files.
+ *
+ * Searches for a profile attached to the dpdk-conf key, like this:
+ * [dpdk-conf=myconfig]
+ * num_mbufs=4095
+ * mbuf_cache_size=315
+ * mtu=8000
+ *
+ * \param user_args After getting the device args from the config
+ * files, all of these key/value pairs will be applied
+ * and will overwrite the settings from config files
+ * if they exist.
+ */
+ uhd::device_addr_t get_dpdk_args(const uhd::device_addr_t &user_args);
+
+ /*! Convenience function to update per-NIC DPDK args with settings from
+ * config files.
+ *
+ * Grabs settings based on provided MAC address. Sections created like so:
+ * [dpdk-mac=00:01:02:03:04:05]
+ * dpdk-ipv4 = 192.168.20.1/24
+ * dpdk-io-cpu = 1
+ *
+ * [dpdk-mac=00:01:02:03:04:06]
+ * dpdk-ipv4 = 192.168.40.1/24
+ * dpdk-io-cpu = 1
+ *
+ * \param user_args After getting the device args from the config
+ * files, all of these key/value pairs will be applied
+ * and will overwrite the settings from config files
+ * if they exist.
+ */
+ uhd::device_addr_t get_dpdk_nic_args(const uhd::device_addr_t &user_args);
}} /* namespace uhd::prefs */
#endif /* INCLUDED_LIBUHD_UTILS_PREFS_HPP */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 63d44c80b..7794f0760 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -141,6 +141,7 @@ if(ENABLE_DPDK)
INCLUDE_SUBDIRECTORY(uhd-dpdk)
LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/dpdk_zero_copy.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp
)
endif(ENABLE_DPDK)
diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp
new file mode 100644
index 000000000..74bb979ef
--- /dev/null
+++ b/host/lib/transport/dpdk_simple.cpp
@@ -0,0 +1,192 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/dpdk_simple.hpp>
+#include <uhdlib/transport/uhd-dpdk.h>
+#include <arpa/inet.h>
+
+namespace uhd { namespace transport {
+
+namespace {
+ constexpr uint64_t USEC = 1000000;
+ // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC
+ constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4;
+}
+
+class dpdk_simple_impl : public dpdk_simple {
+public:
+ dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr,
+ const std::string &port, bool filter_bcast)
+ {
+ UHD_ASSERT_THROW(ctx.is_init_done());
+
+ // Get NIC that can route to addr
+ int port_id = ctx.get_route(addr);
+ UHD_ASSERT_THROW(port_id >= 0);
+
+ _port_id = port_id;
+ uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
+ uint16_t dst_port = htons(std::stoi(port, NULL, 0));
+
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .filter_bcast = filter_bcast,
+ .local_port = 0,
+ .remote_port = dst_port,
+ .dst_addr = dst_ipv4,
+ .num_bufs = 1
+ };
+ _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
+ UHD_ASSERT_THROW(_rx_sock != nullptr);
+
+ // Backfill the local port, in case it was auto-assigned
+ uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+ sockarg.is_tx = true;
+ sockarg.remote_port = dst_port;
+ sockarg.dst_addr = dst_ipv4;
+ sockarg.num_bufs = 1;
+ _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
+ UHD_ASSERT_THROW(_tx_sock != nullptr);
+ UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":"
+ << ntohs(dst_port) << " and NIC(" << _port_id
+ << "):" << ntohs(sockarg.local_port));
+ }
+
+ ~dpdk_simple_impl(void)
+ {
+ if (_rx_mbuf)
+ uhd_dpdk_free_buf(_rx_mbuf);
+ if (_tx_mbuf)
+ uhd_dpdk_free_buf(_tx_mbuf);
+ }
+
+ /*!
+ * Request a single send buffer of specified size.
+ *
+ * \param buf a pointer to place to write buffer location
+ * \return the maximum length of the buffer
+ */
+ size_t get_tx_buf(void** buf)
+ {
+ UHD_ASSERT_THROW(!_tx_mbuf);
+ int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, &_tx_mbuf, 1, 0);
+ if (bufs != 1 || !_tx_mbuf) {
+ *buf = nullptr;
+ return 0;
+ }
+ *buf = uhd_dpdk_buf_to_data(_tx_sock, _tx_mbuf);
+ return _mtu - DPDK_SIMPLE_NONDATA_SIZE;
+ }
+
+ /*!
+ * Send and release outstanding buffer
+ *
+ * \param length bytes of data to send
+ * \return number of bytes sent (releases buffer if sent)
+ */
+ size_t send(size_t length)
+ {
+ UHD_ASSERT_THROW(_tx_mbuf)
+ _tx_mbuf->pkt_len = length;
+ _tx_mbuf->data_len = length;
+ int num_tx = uhd_dpdk_send(_tx_sock, &_tx_mbuf, 1);
+ if (num_tx == 0)
+ return 0;
+ _tx_mbuf = nullptr;
+ return length;
+ }
+
+ /*!
+ * Receive a single packet.
+ * Buffer provided by transport (must be freed before next operation).
+ *
+ * \param buf a pointer to place to write buffer location
+ * \param timeout the timeout in seconds
+ * \return the number of bytes received or zero on timeout
+ */
+ size_t recv(void **buf, double timeout)
+ {
+ UHD_ASSERT_THROW(!_rx_mbuf);
+ int bufs = uhd_dpdk_recv(_rx_sock, &_rx_mbuf, 1, (int) (timeout*USEC));
+ if (bufs != 1 || _rx_mbuf == nullptr) {
+ *buf = nullptr;
+ return 0;
+ }
+ if ((_tx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
+ uhd_dpdk_free_buf(_rx_mbuf);
+ _rx_mbuf = nullptr;
+ return 0;
+ }
+ uhd_dpdk_get_src_ipv4(_rx_sock, _rx_mbuf, &_last_recv_addr);
+ *buf = uhd_dpdk_buf_to_data(_rx_sock, _rx_mbuf);
+ return uhd_dpdk_get_len(_rx_sock, _rx_mbuf);
+ }
+
+ /*!
+ * Return/free receive buffer
+ * Can also use to free un-sent TX bufs
+ */
+ void put_rx_buf(void)
+ {
+ UHD_ASSERT_THROW(_rx_mbuf)
+ uhd_dpdk_free_buf(_rx_mbuf);
+ }
+
+ /*!
+ * Get the last IP address as seen by recv().
+ * Only use this with the broadcast socket.
+ */
+ std::string get_recv_addr(void)
+ {
+ char addr_str[INET_ADDRSTRLEN];
+ struct in_addr ipv4_addr;
+ ipv4_addr.s_addr = _last_recv_addr;
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
+ }
+
+ /*!
+ * Get the IP address for the destination
+ */
+ std::string get_send_addr(void)
+ {
+ struct in_addr ipv4_addr;
+ int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr);
+ UHD_ASSERT_THROW(status);
+ char addr_str[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
+ }
+private:
+ unsigned int _port_id;
+ size_t _mtu;
+ struct uhd_dpdk_socket *_tx_sock;
+ struct rte_mbuf *_tx_mbuf = nullptr;
+ struct uhd_dpdk_socket *_rx_sock;
+ struct rte_mbuf *_rx_mbuf = nullptr;
+ uint32_t _last_recv_addr;
+};
+
+dpdk_simple::~dpdk_simple(void) {}
+
+/***********************************************************************
+ * DPDK simple transport public make functions
+ **********************************************************************/
+dpdk_simple::sptr dpdk_simple::make_connected(
+ struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port
+){
+ return sptr(new dpdk_simple_impl(ctx, addr, port, true));
+}
+
+dpdk_simple::sptr dpdk_simple::make_broadcast(
+ struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port
+){
+ return sptr(new dpdk_simple_impl(ctx, addr, port, false));
+}
+}} // namespace uhd::transport
+
+
diff --git a/host/lib/transport/dpdk_zero_copy.cpp b/host/lib/transport/dpdk_zero_copy.cpp
index 51c9b9dba..9649c1cfb 100644
--- a/host/lib/transport/dpdk_zero_copy.cpp
+++ b/host/lib/transport/dpdk_zero_copy.cpp
@@ -7,24 +7,28 @@
#include "dpdk_zero_copy.hpp"
#include <uhd/config.hpp>
#include <uhd/utils/log.hpp>
-#include <uhd/utils/static.hpp>
#include <uhdlib/transport/uhd-dpdk.h>
-#include <arpa/inet.h>
-#include <sys/syslog.h>
+#include <uhdlib/utils/prefs.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/format.hpp>
#include <boost/make_shared.hpp>
#include <stack>
+#include <sys/syslog.h>
+#include <arpa/inet.h>
namespace uhd { namespace transport {
namespace {
-static constexpr uint64_t USEC = 1000000;
-// FIXME: Make configurable and have uhd-dpdk library track buffer sizes
-static constexpr size_t DEFAULT_FRAME_SIZE = 8000;
-
-inline char* eal_add_opt(
- std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)
+constexpr uint64_t USEC = 1000000;
+constexpr size_t DEFAULT_FRAME_SIZE = 8000;
+constexpr int DEFAULT_NUM_MBUFS = 4095;
+constexpr int DEFAULT_MBUF_CACHE_SIZE = 315;
+constexpr size_t UHD_DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP
+
+inline char * eal_add_opt(std::vector<const char*> &argv, size_t n,
+ char *dst, const char *opt, const char *arg)
{
- char* ptr = dst;
+ char *ptr = dst;
strncpy(ptr, opt, n);
argv.push_back(ptr);
ptr += strlen(opt) + 1;
@@ -34,6 +38,78 @@ inline char* eal_add_opt(
ptr += strlen(arg) + 1;
return ptr;
}
+
+inline void uhd_dpdk_eal_init(const device_addr_t &eal_args)
+{
+ /* Build up argc and argv */
+ std::vector<const char *> argv;
+ argv.push_back("uhd-dpdk");
+ auto args = new std::array<char, 4096>();
+ char *opt = args->data();
+ char *end = args->data() + args->size();
+ for (std::string &key : eal_args.keys()) {
+ std::string val = eal_args[key];
+ if (key == "dpdk-coremask") {
+ opt = eal_add_opt(argv, end - opt, opt, "-c",
+ val.c_str());
+ } else if (key == "dpdk-corelist") {
+ /* NOTE: This arg may have commas, so limited to config file */
+ opt = eal_add_opt(argv, end - opt, opt, "-l",
+ val.c_str());
+ } else if (key == "dpdk-coremap") {
+ opt = eal_add_opt(argv, end - opt, opt, "--lcores",
+ val.c_str());
+ } else if (key == "dpdk-master-lcore") {
+ opt = eal_add_opt(argv, end - opt, opt, "--master-lcore",
+ val.c_str());
+ } else if (key == "dpdk-pci-blacklist") {
+ opt = eal_add_opt(argv, end - opt, opt, "-b",
+ val.c_str());
+ } else if (key == "dpdk-pci-whitelist") {
+ opt = eal_add_opt(argv, end - opt, opt, "-w",
+ val.c_str());
+ } else if (key == "dpdk-log-level") {
+ opt = eal_add_opt(argv, end - opt, opt, "--log-level",
+ val.c_str());
+ } else if (key == "dpdk-huge-dir") {
+ opt = eal_add_opt(argv, end - opt, opt, "--huge-dir",
+ val.c_str());
+ } else if (key == "dpdk-file-prefix") {
+ opt = eal_add_opt(argv, end - opt, opt, "--file-prefix",
+ val.c_str());
+ } else if (key == "dpdk-driver") {
+ opt = eal_add_opt(argv, end - opt, opt, "-d",
+ val.c_str());
+ }
+ /* TODO: Change where log goes?
+ int rte_openlog_stream( FILE * f)
+ */
+ }
+ /* Init DPDK's EAL */
+ uhd_dpdk_init(argv.size(), argv.data());
+ delete args;
+}
+
+inline std::string eth_addr_to_string(struct eth_addr mac_addr)
+{
+ auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx");
+ mac_stream % (uint32_t) mac_addr.addr[0] % (uint32_t) mac_addr.addr[1]
+ % (uint32_t) mac_addr.addr[2] % (uint32_t) mac_addr.addr[3]
+ % (uint32_t) mac_addr.addr[4] % (uint32_t) mac_addr.addr[5];
+ return mac_stream.str();
+}
+
+inline void separate_ipv4_addr(const std::string ipv4,
+ uint32_t &ipv4_addr, uint32_t &netmask)
+{
+ std::vector<std::string> result;
+ boost::algorithm::split(result, ipv4,
+ [](const char &in) {return in == '/';}, boost::token_compress_on);
+ UHD_ASSERT_THROW(result.size() == 2);
+ ipv4_addr = (uint32_t) inet_addr(result[0].c_str());
+ int netbits = std::atoi(result[1].c_str());
+ netmask = htonl(0xffffffff << (32-netbits));
+}
} // namespace
uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {}
@@ -41,69 +117,82 @@ uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {}
uhd_dpdk_ctx::~uhd_dpdk_ctx(void) {}
/* Initialize uhd-dpdk (and do only once) */
-void uhd_dpdk_ctx::init(const dict<std::string, std::string>& eal_args,
- unsigned int num_ports,
- int* port_thread_mapping,
- int num_mbufs,
- int mbuf_cache_size,
- size_t mtu)
+void uhd_dpdk_ctx::init(const device_addr_t &user_args)
{
std::lock_guard<std::mutex> lock(_init_mutex);
if (!_init_done) {
- _mtu = mtu;
- /* Build up argc and argv */
- std::vector<const char*> argv;
- argv.push_back("uhd-dpdk");
- char* args = new char[4096];
- char* opt = args;
- char* end = args + sizeof(args);
- for (std::string& key : eal_args.keys()) {
- std::string val = eal_args[key];
- if (key == "coremask") {
- opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str());
- } else if (key == "corelist") {
- /* NOTE: This arg may have commas, so limited to config file */
- opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str());
- } else if (key == "coremap") {
- opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str());
- } else if (key == "master-lcore") {
- opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str());
- } else if (key == "pci-blacklist") {
- opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str());
- } else if (key == "pci-whitelist") {
- opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str());
- } else if (key == "log-level") {
- opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str());
- } else if (key == "huge-dir") {
- opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str());
- } else if (key == "file-prefix") {
- opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str());
+ /* Gather global config, build args for EAL, and init UHD-DPDK */
+ const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args);
+ UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl
+ << dpdk_args.to_pp_string());
+ uhd_dpdk_eal_init(dpdk_args);
+
+ _mtu = dpdk_args.has_key("dpdk-mtu")
+ ? dpdk_args.cast<size_t>("dpdk-mtu", 0)
+ : DEFAULT_FRAME_SIZE;
+ const int num_mbufs = dpdk_args.has_key("dpdk-num-mbufs")
+ ? dpdk_args.cast<int>("dpdk-num-mbufs", 0)
+ : DEFAULT_NUM_MBUFS;
+ const int mbuf_cache_size = dpdk_args.has_key("dpdk-mbuf-cache-size")
+ ? dpdk_args.cast<int>("dpdk-mbuf-cache-size", 0)
+ : DEFAULT_MBUF_CACHE_SIZE;
+
+ /* Get configuration for all the NIC ports */
+ device_addrs_t args = separate_device_addr(user_args);
+ int num_ports = uhd_dpdk_port_count();
+ std::vector<int> io_cpu_map(num_ports);
+ device_addrs_t nics(num_ports);
+ for (ssize_t i = 0; i < num_ports; i++) {
+ struct eth_addr mac_addr = uhd_dpdk_get_eth_addr(i);
+ nics[i]["dpdk-mac"] = eth_addr_to_string(mac_addr);
+ for (const auto &arg: args) {
+ if (arg.has_key("dpdk-mac")
+ && arg["dpdk-mac"] == nics[i]["dpdk-mac"]) {
+ for (const auto& key: arg.keys()) {
+ nics[i][key] = arg[key];
+ }
+ break;
+ }
}
+ nics[i] = uhd::prefs::get_dpdk_nic_args(nics[i]);
+ if (nics[i].has_key("dpdk-ipv4")
+ && nics[i].has_key("dpdk-io-cpu")) {
+ uint32_t ipv4_addr, netmask;
+ io_cpu_map[i] = std::atoi(nics[i]["dpdk-io-cpu"].c_str());
+ separate_ipv4_addr(nics[i]["dpdk-ipv4"], ipv4_addr, netmask);
+ uhd_dpdk_set_ipv4_addr((unsigned int) i, ipv4_addr, netmask);
+ } else {
+ /* Not enough configuration to use NIC */
+ io_cpu_map[i] = -1;
+ }
+ UHD_LOG_TRACE("DPDK", "Found NIC(" << i << "):" << std::endl
+ << nics[i].to_pp_string());
}
- uhd_dpdk_init(argv.size(),
- argv.data(),
- num_ports,
- port_thread_mapping,
- num_mbufs,
- mbuf_cache_size,
- _mtu);
- delete args;
+ uhd_dpdk_start(num_ports, io_cpu_map.data(), num_mbufs,
+ mbuf_cache_size, _mtu);
_init_done = true;
}
}
-int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& port_id)
+size_t uhd_dpdk_ctx::get_mtu(void) const
+{
+ UHD_ASSERT_THROW(is_init_done());
+ return _mtu;
+}
+
+int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr,
+ unsigned int &port_id) const
{
UHD_ASSERT_THROW(is_init_done());
int num_ports = uhd_dpdk_port_count();
for (int i = 0; i < num_ports; i++) {
- struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int)i);
+ struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int) i);
for (int j = 0; j < 6; j++) {
if (mac_addr[j] != port_mac_addr.addr[j]) {
break;
}
if (j == 5) {
- port_id = (unsigned int)i;
+ port_id = (unsigned int) i;
return 0;
}
}
@@ -111,52 +200,54 @@ int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& por
return -1;
}
-int uhd_dpdk_ctx::get_route(const std::string& addr) const
+int uhd_dpdk_ctx::get_route(const std::string &addr) const
{
- const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
+ const uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
const unsigned int num_ports = uhd_dpdk_port_count();
for (unsigned int port = 0; port < num_ports; port++) {
uint32_t src_ipv4;
uint32_t netmask;
+ if (uhd_dpdk_port_link_status(port) < 1)
+ continue;
uhd_dpdk_get_ipv4_addr(port, &src_ipv4, &netmask);
if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) {
- return (int)port;
- }
+ return (int) port;
+ }
}
return -ENODEV;
}
-int uhd_dpdk_ctx::set_ipv4_addr(
- unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask)
+int uhd_dpdk_ctx::set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr,
+ uint32_t netmask)
{
return uhd_dpdk_set_ipv4_addr(port_id, ipv4_addr, netmask);
}
-bool uhd_dpdk_ctx::is_init_done(void)
+bool uhd_dpdk_ctx::is_init_done(void) const
{
return _init_done.load();
}
-class dpdk_zero_copy_msb : public managed_send_buffer
-{
+class dpdk_zero_copy_msb : public managed_send_buffer {
public:
- dpdk_zero_copy_msb(struct uhd_dpdk_socket* sock,
- std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>>& free_bufs)
- : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){};
+ dpdk_zero_copy_msb(struct uhd_dpdk_socket *sock,
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &free_bufs,
+ size_t frame_size)
+ : _sock(sock), _buf(nullptr), _free_bufs(free_bufs),
+ _frame_size(frame_size) {};
~dpdk_zero_copy_msb(void) {}
void release(void)
{
if (_buf) {
- _buf->pkt_len = _length;
+ _buf->pkt_len = _length;
_buf->data_len = _length;
- int num_tx = uhd_dpdk_send(_sock, &_buf, 1);
+ int num_tx = uhd_dpdk_send(_sock, &_buf, 1);
if (num_tx == 0) {
/* Drop packet and free buffer (do not share sockets!) */
- UHD_LOG_ERROR(
- "DPDK", "Invalid shared socket usage detected. Dropping packet...");
+ UHD_LOG_ERROR("DPDK", "Invalid shared socket usage detected. Dropping packet...");
uhd_dpdk_free_buf(_buf);
}
// Push back into pool
@@ -170,21 +261,22 @@ public:
if (bufs != 1 || !_buf)
return sptr();
- return make(this, uhd_dpdk_buf_to_data(_sock, _buf), DEFAULT_FRAME_SIZE);
+ return make(this, uhd_dpdk_buf_to_data(_sock, _buf),
+ _frame_size);
}
private:
- struct uhd_dpdk_socket* _sock;
- struct rte_mbuf* _buf;
- std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>>& _free_bufs;
+ struct uhd_dpdk_socket *_sock;
+ struct rte_mbuf *_buf;
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &_free_bufs;
+ size_t _frame_size;
};
-class dpdk_zero_copy_mrb : public managed_recv_buffer
-{
+class dpdk_zero_copy_mrb : public managed_recv_buffer {
public:
- dpdk_zero_copy_mrb(struct uhd_dpdk_socket* sock,
- std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>>& free_bufs)
- : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){};
+ dpdk_zero_copy_mrb(struct uhd_dpdk_socket *sock,
+ std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> &free_bufs)
+ : _sock(sock), _buf(nullptr), _free_bufs(free_bufs) {};
~dpdk_zero_copy_mrb(void) {}
void release(void)
@@ -197,78 +289,78 @@ public:
sptr get_new(double timeout)
{
- int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int)(timeout * USEC));
+ int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int) (timeout*USEC));
if (bufs != 1 || _buf == nullptr) {
// Push back into pool if we didn't get a real buffer
_free_bufs.push(this);
return sptr();
}
- if ((_buf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
- UHD_LOG_WARNING("DPDK", "IP checksum failure detected. Dropping packet...");
- uhd_dpdk_free_buf(_buf);
- _free_bufs.push(this);
- return sptr();
- }
- return make(
- this, uhd_dpdk_buf_to_data(_sock, _buf), uhd_dpdk_get_len(_sock, _buf));
+ return make(this, uhd_dpdk_buf_to_data(_sock, _buf),
+ uhd_dpdk_get_len(_sock, _buf));
}
private:
- struct uhd_dpdk_socket* _sock;
- struct rte_mbuf* _buf;
- std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>>& _free_bufs;
+ struct uhd_dpdk_socket *_sock;
+ struct rte_mbuf *_buf;
+ std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> &_free_bufs;
};
-class dpdk_zero_copy_impl : public dpdk_zero_copy
-{
+class dpdk_zero_copy_impl : public dpdk_zero_copy {
public:
- dpdk_zero_copy_impl(struct uhd_dpdk_ctx& ctx,
- const unsigned int dpdk_port_id,
- const std::string& addr,
- const std::string& remote_port,
- const std::string& local_port,
- const zero_copy_xport_params& xport_params)
- : _num_send_frames(xport_params.num_send_frames)
- , _send_frame_size(xport_params.send_frame_size)
- , _num_recv_frames(xport_params.num_recv_frames)
- , _recv_frame_size(xport_params.recv_frame_size)
- , _port_id(dpdk_port_id)
- , _rx_empty_count(0)
- , _tx_empty_count(0)
+
+ dpdk_zero_copy_impl(const struct uhd_dpdk_ctx &ctx,
+ const unsigned int dpdk_port_id,
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port,
+ const zero_copy_xport_params& xport_params)
+ : _num_send_frames(xport_params.num_send_frames),
+ _send_frame_size(xport_params.send_frame_size),
+ _num_recv_frames(xport_params.num_recv_frames),
+ _recv_frame_size(xport_params.recv_frame_size),
+ _port_id(dpdk_port_id),
+ _rx_empty_count(0),
+ _tx_empty_count(0)
{
- // TODO: Handle xport_params
UHD_ASSERT_THROW(xport_params.recv_frame_size > 0);
UHD_ASSERT_THROW(xport_params.send_frame_size > 0);
UHD_ASSERT_THROW(xport_params.num_send_frames > 0);
UHD_ASSERT_THROW(xport_params.num_recv_frames > 0);
UHD_ASSERT_THROW(ctx.is_init_done());
+ UHD_ASSERT_THROW(xport_params.recv_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE);
+ UHD_ASSERT_THROW(xport_params.send_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE);
const int num_ports = uhd_dpdk_port_count();
UHD_ASSERT_THROW(num_ports > 0);
- UHD_ASSERT_THROW(dpdk_port_id < (unsigned int)num_ports);
+ UHD_ASSERT_THROW(dpdk_port_id < (unsigned int) num_ports);
// Convert ipv4 addr from string to uint32_t, network format
- uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
+ uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
// Convert port from string to uint16_t, network format
uint16_t dst_port = htons(std::stoi(remote_port, NULL, 0));
uint16_t src_port = htons(std::stoi(local_port, NULL, 0));
// Create RX socket first
- struct uhd_dpdk_sockarg_udp sockarg = {.is_tx = false,
- .local_port = src_port,
- .remote_port = dst_port,
- .dst_addr = dst_ipv4};
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .filter_bcast = true,
+ .local_port = src_port,
+ .remote_port = dst_port,
+ .dst_addr = dst_ipv4,
+ .num_bufs = _num_recv_frames
+ };
_rx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
UHD_ASSERT_THROW(_rx_sock != nullptr);
// Backfill the local port, in case it was auto-assigned
uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
- sockarg.is_tx = true;
+ sockarg.is_tx = true;
+ sockarg.num_bufs = _num_send_frames;
sockarg.remote_port = dst_port;
- sockarg.dst_addr = dst_ipv4;
+ sockarg.dst_addr = dst_ipv4;
_tx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
UHD_ASSERT_THROW(_tx_sock != nullptr);
@@ -277,13 +369,12 @@ public:
_mrb_pool.push(new dpdk_zero_copy_mrb(_rx_sock, _mrb_pool));
}
for (size_t i = 0; i < _num_send_frames; i++) {
- _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool));
+ _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool, _send_frame_size));
}
- UHD_LOG_TRACE("DPDK",
- "Created transports between " << addr << ":" << remote_port << " and NIC("
- << dpdk_port_id
- << "):" << ntohs(sockarg.local_port));
+ UHD_LOG_TRACE("DPDK", "Created transports between " << addr << ":"
+ << remote_port << " and NIC(" << dpdk_port_id
+ << "):" << ntohs(sockarg.local_port));
}
~dpdk_zero_copy_impl(void)
@@ -292,23 +383,18 @@ public:
size_t count;
uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
uhd_dpdk_get_drop_count(_rx_sock, &count);
- UHD_LOG_TRACE("DPDK",
- "Closing transports between " << sockarg.dst_addr << ":"
- << ntohs(sockarg.remote_port)
- << " and local:" << ntohs(sockarg.local_port));
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << " Dropped " << count << " packets");
+ UHD_LOG_TRACE("DPDK", "Closing transports between " << sockarg.dst_addr << ":"
+ << ntohs(sockarg.remote_port) << " and local:"
+ << ntohs(sockarg.local_port));
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << " Dropped "<< count << " packets");
uhd_dpdk_get_xfer_count(_rx_sock, &count);
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << " Received " << count << " packets");
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << "RX empty count is " << _rx_empty_count);
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << "TX empty count is " << _tx_empty_count);
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << " Received "<< count << " packets");
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << "RX empty count is " << _rx_empty_count);
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << "TX empty count is " << _tx_empty_count);
uhd_dpdk_sock_close(_rx_sock);
uhd_dpdk_sock_close(_tx_sock);
}
@@ -320,7 +406,7 @@ public:
return managed_recv_buffer::sptr();
}
- dpdk_zero_copy_mrb* mrb = _mrb_pool.top();
+ dpdk_zero_copy_mrb *mrb = _mrb_pool.top();
_mrb_pool.pop();
managed_recv_buffer::sptr buff = mrb->get_new(timeout);
if (!buff)
@@ -345,7 +431,7 @@ public:
return managed_send_buffer::sptr();
}
- dpdk_zero_copy_msb* msb = _msb_pool.top();
+ dpdk_zero_copy_msb *msb = _msb_pool.top();
_msb_pool.pop();
managed_send_buffer::sptr buff = msb->get_new(timeout);
if (!buff)
@@ -367,18 +453,18 @@ public:
{
struct uhd_dpdk_sockarg_udp sockarg;
int status = uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+ UHD_ASSERT_THROW(status == 0);
return ntohs(sockarg.local_port);
}
std::string get_local_addr(void) const
{
- uint32_t ipv4_addr;
- int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr, NULL);
- auto retval = std::to_string(ipv4_addr >> 0 & 0xff) + "."
- + std::to_string(ipv4_addr >> 8 & 0xff) + "."
- + std::to_string(ipv4_addr >> 16 & 0xff) + "."
- + std::to_string(ipv4_addr >> 24 & 0xff);
- return retval;
+ struct in_addr ipv4_addr;
+ int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, NULL);
+ UHD_ASSERT_THROW(status == 0);
+ char addr_str[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
}
uint32_t get_drop_count(void) const
@@ -387,10 +473,9 @@ public:
uhd_dpdk_get_drop_count(_rx_sock, &drop_count);
return drop_count;
}
-
private:
- struct uhd_dpdk_socket* _rx_sock;
- struct uhd_dpdk_socket* _tx_sock;
+ struct uhd_dpdk_socket *_rx_sock;
+ struct uhd_dpdk_socket *_tx_sock;
const size_t _num_send_frames;
const size_t _send_frame_size;
const size_t _num_recv_frames;
@@ -399,20 +484,22 @@ private:
unsigned int _rx_empty_count;
unsigned int _tx_empty_count;
- std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> _mrb_pool;
- std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>> _msb_pool;
+ std::stack<dpdk_zero_copy_mrb *, std::vector<dpdk_zero_copy_mrb *>> _mrb_pool;
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> _msb_pool;
};
-dpdk_zero_copy::sptr dpdk_zero_copy::make(struct uhd_dpdk_ctx& ctx,
+dpdk_zero_copy::sptr dpdk_zero_copy::make(
+ const struct uhd_dpdk_ctx &ctx,
const unsigned int dpdk_port_id,
- const std::string& addr,
- const std::string& remote_port,
- const std::string& local_port,
- const zero_copy_xport_params& default_buff_args,
- const device_addr_t& hints)
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port,
+ const zero_copy_xport_params &default_buff_args,
+ const device_addr_t &/*hints*/)
{
- return dpdk_zero_copy::sptr(new dpdk_zero_copy_impl(
- ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args));
+ return dpdk_zero_copy::sptr(
+ new dpdk_zero_copy_impl(ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args)
+ );
}
}} // namespace uhd::transport
diff --git a/host/lib/transport/dpdk_zero_copy.hpp b/host/lib/transport/dpdk_zero_copy.hpp
index f90e73d0e..8dcce6eee 100644
--- a/host/lib/transport/dpdk_zero_copy.hpp
+++ b/host/lib/transport/dpdk_zero_copy.hpp
@@ -7,95 +7,31 @@
#ifndef DPDK_ZERO_COPY_HPP
#define DPDK_ZERO_COPY_HPP
-#include <uhd/config.hpp>
-#include <uhd/transport/zero_copy.hpp>
+#include <uhdlib/transport/dpdk_common.hpp>
#include <uhd/types/device_addr.hpp>
-#include <uhd/utils/log.hpp>
-#include <uhd/utils/static.hpp>
+#include <uhd/transport/zero_copy.hpp>
#include <boost/shared_ptr.hpp>
-#include <mutex>
#include <string>
-#include <vector>
namespace uhd { namespace transport {
-class uhd_dpdk_ctx : boost::noncopyable
-{
-public:
- UHD_SINGLETON_FCN(uhd_dpdk_ctx, get);
-
- ~uhd_dpdk_ctx(void);
-
- /*!
- * Initialize uhd-dpdk (and do only once)
- * \param eal_args Arguments to pass to DPDK rte_eal_init() function
- * \param num_ports Size of port_thread_mapping array (also number in use)
- * \param port_thread_mapping Map NICs to threads: index=port, value=thread
- * \param num_mbufs Number of packet buffers for each port's memory pool
- * \param mbuf_cache_size Size of per-core packet buffer cache from mempool
- * \param mtu MTU of NIC ports
- */
- void init(const dict<std::string, std::string>& eal_args,
- unsigned int num_ports,
- int* port_thread_mapping,
- int num_mbufs,
- int mbuf_cache_size,
- size_t mtu);
-
- /*!
- * Get port ID from provided MAC address
- * \param mac_addr MAC address
- * \param port_id Int to write ID of port corresponding to MAC address
- * \return 0 if match found, else no match
- */
- int get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& port_id);
-
- /*!
- * Get port ID for routing packet destined for given address
- * \param addr Destination address
- * \return port ID from routing table
- */
- int get_route(const std::string& addr) const;
-
- /*!
- * Set IPv4 address and subnet mask of given NIC port
- * Not thread-safe. Should only be written before ports are in use.
- * \param port_id NIC port ID
- * \param ipv4_addr IPv4 address to write
- * \param netmask Subnet mask identifying network number in ipv4_addr
- * \return 0 if successful, else error
- */
- int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask);
-
- /*!
- * \return whether init() has been called
- */
- bool is_init_done(void);
-
-private:
- uhd_dpdk_ctx(void);
-
- size_t _mtu;
- std::mutex _init_mutex;
- std::atomic<bool> _init_done;
-};
-
/*!
* A zero copy transport interface to the dpdk DMA library.
*/
-class dpdk_zero_copy : public virtual zero_copy_if
-{
+class dpdk_zero_copy : public virtual zero_copy_if {
public:
typedef boost::shared_ptr<dpdk_zero_copy> sptr;
- static sptr make(struct uhd_dpdk_ctx& ctx,
+ static sptr make(
+ const struct uhd_dpdk_ctx &ctx,
const unsigned int dpdk_port_id,
- const std::string& addr,
- const std::string& remote_port,
- const std::string& local_port, /* 0 = auto-assign */
- const zero_copy_xport_params& default_buff_args,
- const device_addr_t& hints);
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port, /* 0 = auto-assign */
+ const zero_copy_xport_params &default_buff_args,
+ const device_addr_t &hints
+ );
virtual uint16_t get_local_port(void) const = 0;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
index 1be6b2335..09897eb11 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
@@ -34,6 +34,20 @@ int uhd_dpdk_port_count(void)
return ctx->num_ports;
}
+int uhd_dpdk_port_link_status(unsigned int portid)
+{
+ if (!ctx)
+ return -ENODEV;
+
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ struct rte_eth_link link;
+ rte_eth_link_get_nowait(p->id, &link);
+ return link.link_status;
+ }
+ return -ENODEV;
+}
+
struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid)
{
struct eth_addr retval;
@@ -43,6 +57,7 @@ struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid)
if (p) {
memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN);
}
+
return retval;
}
@@ -211,19 +226,12 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lco
return 0;
}
-
-int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
- int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
- int mtu)
+int uhd_dpdk_init(int argc, const char **argv)
{
/* Init context only once */
if (ctx)
return 1;
- if ((num_ports == 0) || (port_thread_mapping == NULL)) {
- return -EINVAL;
- }
-
/* Grabs arguments intended for DPDK's EAL */
int ret = rte_eal_init(argc, (char **) argv);
if (ret < 0)
@@ -241,8 +249,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
ctx->num_ports = rte_eth_dev_count();
if (ctx->num_ports < 1)
rte_exit(EXIT_FAILURE, "Error: Found no ports\n");
- if (ctx->num_ports < num_ports)
- rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n");
/* Get memory for thread and port data structures */
ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0);
@@ -252,6 +258,28 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
if (!ctx->ports)
rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n");
+ for (size_t i = 0; i < ctx->num_ports; i++) {
+ struct uhd_dpdk_port *port = &ctx->ports[i];
+ port->id = i;
+ rte_eth_macaddr_get(port->id, &port->mac_addr);
+ }
+
+ return 0;
+}
+
+int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping,
+ int num_mbufs, int mbuf_cache_size, int mtu)
+{
+ if (!ctx)
+ return -EIO;
+
+ if ((num_ports == 0) || (port_thread_mapping == NULL)) {
+ return -EINVAL;
+ }
+
+ if (ctx->num_ports < num_ports)
+ rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n");
+
/* Initialize the thread data structures */
for (int i = rte_get_next_lcore(-1, 1, 0);
(i < RTE_MAX_LCORE);
@@ -306,7 +334,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i);
struct uhd_dpdk_port *port = &ctx->ports[i];
- port->id = i;
port->parent = &ctx->threads[thread_id];
ctx->threads[thread_id].num_ports++;
LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry);
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
index d497e5d5a..6f43ae1cf 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
@@ -24,7 +24,7 @@
#define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT
#define UHD_DPDK_TXQ_SIZE 64
#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1)
-#define UHD_DPDK_RXQ_SIZE 64
+#define UHD_DPDK_RXQ_SIZE 128
#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1)
struct uhd_dpdk_port;
@@ -256,13 +256,10 @@ static inline struct uhd_dpdk_port * find_port(unsigned int portid)
if (!ctx)
return NULL;
- for (unsigned int i = 0; i < ctx->num_threads; i++) {
- struct uhd_dpdk_thread *t = &ctx->threads[i];
- struct uhd_dpdk_port *p;
- LIST_FOREACH(p, &t->port_list, port_entry) {
- if (p->id == portid) {
- return p;
- }
+ for (unsigned int i = 0; i < ctx->num_ports; i++) {
+ struct uhd_dpdk_port *p = &ctx->ports[i];
+ if (p->id == portid) {
+ return p;
}
}
return NULL;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
index f603f1f8f..61ed836df 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
@@ -43,7 +43,6 @@ int _uhd_dpdk_arp_reply(struct uhd_dpdk_port *port, struct arp_hdr *arp_req)
mbuf->pkt_len = 42;
mbuf->data_len = 42;
- mbuf->ol_flags = PKT_TX_IP_CKSUM;
if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) {
RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__);
@@ -125,7 +124,6 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip)
mbuf->pkt_len = 42;
mbuf->data_len = 42;
- mbuf->ol_flags = PKT_TX_IP_CKSUM;
if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) {
RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__);
@@ -135,7 +133,8 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip)
return 0;
}
-int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt)
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf,
+ struct udp_hdr *pkt, bool bcast)
{
int status = 0;
struct uhd_dpdk_ipv4_5tuple ht_key = {
@@ -155,6 +154,10 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str
}
struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv;
+ if (bcast && pdata->filter_bcast) {
+ // Filter broadcast packets if not listening
+ goto udp_rx_drop;
+ }
status = rte_ring_enqueue(entry->sock->rx_ring, mbuf);
if (entry->waiter) {
_uhd_dpdk_waiter_wake(entry->waiter, port->parent);
@@ -172,14 +175,16 @@ udp_rx_drop:
return status;
}
-int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt)
+int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf,
+ struct ipv4_hdr *pkt)
{
- if (pkt->dst_addr != port->ipv4_addr) {
+ bool bcast = is_broadcast(port, pkt->dst_addr);
+ if (pkt->dst_addr != port->ipv4_addr && !bcast) {
rte_pktmbuf_free(mbuf);
return -ENODEV;
}
if (pkt->next_proto_id == 0x11) {
- return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]);
+ return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1], bcast);
}
rte_pktmbuf_free(mbuf);
return -EINVAL;
@@ -417,7 +422,7 @@ static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port)
rte_pktmbuf_free(bufs[buf]);
break;
case ETHER_TYPE_IPv4:
- if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
+ if (ol_flags & PKT_RX_IP_CKSUM_BAD) {
RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
} else {
_uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
index b0d3e42cd..f94a678ba 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
@@ -20,7 +20,8 @@ static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_ad
int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame);
-int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt);
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf,
+ struct udp_hdr *pkt, bool bcast);
int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt);
int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port,
struct uhd_dpdk_socket *sock,
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
index 6ea77b930..4fc375b77 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -19,7 +19,8 @@
* I/O thread ONLY
*/
-static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue)
+static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid,
+ struct uhd_dpdk_tx_queue **queue, size_t num_bufs)
{
*queue = NULL;
struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
@@ -31,25 +32,25 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk
LIST_INIT(&q->tx_list);
char name[32];
- snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid);
+ snprintf(name, sizeof(name), "tx_q%u.%0lx", port->id, (unsigned long) q);
q->queue = rte_ring_create(
name,
- UHD_DPDK_TXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
- snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid);
+ snprintf(name, sizeof(name), "free_q%u.%0lx", port->id, (unsigned long) q);
q->freebufs = rte_ring_create(
name,
- UHD_DPDK_TXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
/* Set up retry queue */
- snprintf(name, sizeof(name), "retry_queue_%u", port->id);
+ snprintf(name, sizeof(name), "redo_q%u.%0lx", port->id, (unsigned long) q);
q->retry_queue = rte_ring_create(
name,
- UHD_DPDK_TXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
@@ -65,24 +66,39 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk
rte_free(q);
return -ENOMEM;
}
- struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
- unsigned int num_bufs = rte_ring_free_count(q->freebufs);
- int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
- if (buf_stat) {
- rte_ring_free(q->freebufs);
- rte_ring_free(q->queue);
- rte_ring_free(q->retry_queue);
- rte_free(q);
- RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
- return -ENOENT;
- }
- unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
- if (enqd != num_bufs) {
- RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
- }
+
+ do {
+ struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
+ num_bufs = rte_ring_free_count(q->freebufs);
+ if (num_bufs > 0) {
+ num_bufs = num_bufs > UHD_DPDK_TXQ_SIZE ? UHD_DPDK_TXQ_SIZE : num_bufs;
+ int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
+ if (buf_stat) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
+ goto unwind_txq;
+ }
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
+ goto unwind_txq;
+ }
+ }
+ } while (num_bufs > 0);
LIST_INSERT_HEAD(&port->txq_list, q, entry);
*queue = q;
return 0;
+
+unwind_txq:
+ while (!rte_ring_empty(q->freebufs)) {
+ struct rte_mbuf *buf;
+ if (rte_ring_dequeue(q->freebufs, (void **) &buf) == 0)
+ rte_free(buf);
+ }
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->queue);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ return -ENOENT;
}
/* Finish setting up UDP socket (unless ARP needs to be done)
@@ -134,11 +150,14 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
return -EADDRINUSE;
}
+ size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_RX_BURST_SIZE + 1)) ?
+ UHD_DPDK_RX_BURST_SIZE + 1 : pdata->xferd_pkts;
+ pdata->xferd_pkts = 0;
char name[32];
snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port));
sock->rx_ring = rte_ring_create(
name,
- UHD_DPDK_RXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
@@ -172,6 +191,9 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
/* Are we doing TX? */
if (sock->tx_queue) {
+ size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_TX_BURST_SIZE + 1)) ?
+ UHD_DPDK_TX_BURST_SIZE + 1 : pdata->xferd_pkts;
+ pdata->xferd_pkts = 0;
sock->tx_queue = NULL;
struct uhd_dpdk_tx_queue *q = NULL;
// FIXME Not sharing txq across all thread's sockets for now
@@ -184,7 +206,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
// }
//}
if (!sock->tx_queue) {
- retval = _alloc_txq(port, sock->tid, &q);
+ retval = _alloc_txq(port, sock->tid, &q, num_bufs);
if (retval) {
_uhd_dpdk_config_req_compl(req, retval);
return retval;
@@ -385,10 +407,13 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
data->src_port = arg->local_port;
data->dst_port = arg->remote_port;
sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock;
+ data->xferd_pkts = arg->num_bufs;
} else {
data->src_port = arg->remote_port;
data->dst_port = arg->local_port;
sock->rx_ring = (struct rte_ring *) sock;
+ data->xferd_pkts = arg->num_bufs;
+ data->filter_bcast = arg->filter_bcast;
}
/* TODO: Add support for I/O thread calling (skip locking and sleep) */
@@ -433,6 +458,7 @@ static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port,
ip_hdr->time_to_live = 64;
ip_hdr->next_proto_id = proto_id;
ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */
+ mbuf->ol_flags |= PKT_TX_IP_CKSUM;
ip_hdr->src_addr = port->ipv4_addr;
ip_hdr->dst_addr = dst_ipv4_addr;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
index 39fcb8597..d7ca5609b 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
@@ -15,6 +15,7 @@ struct uhd_dpdk_udp_priv {
uint32_t dst_ipv4_addr;
size_t dropped_pkts;
size_t xferd_pkts;
+ bool filter_bcast;
/* TODO: Cache destination address ptr to avoid ARP table lookup cost? */
//struct uhd_dpdk_arp_entry *arp_entry;
};
diff --git a/host/lib/usrp/mpmd/CMakeLists.txt b/host/lib/usrp/mpmd/CMakeLists.txt
index 774ad6593..67e08fc91 100644
--- a/host/lib/usrp/mpmd/CMakeLists.txt
+++ b/host/lib/usrp/mpmd/CMakeLists.txt
@@ -10,6 +10,11 @@ if(ENABLE_MPMD)
add_definitions(-DHAVE_LIBERIO)
endif(ENABLE_LIBERIO)
+ if(ENABLE_DPDK)
+ message(STATUS "Compiling MPMD with DPDK support...")
+ add_definitions(-DHAVE_DPDK)
+ endif(ENABLE_DPDK)
+
LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/mpmd_find.cpp
${CMAKE_CURRENT_SOURCE_DIR}/mpmd_image_loader.cpp
@@ -27,4 +32,10 @@ if(ENABLE_MPMD)
)
endif(ENABLE_LIBERIO)
+ if(ENABLE_DPDK)
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/mpmd_xport_ctrl_dpdk_udp.cpp
+ )
+ endif(ENABLE_DPDK)
+
endif(ENABLE_MPMD)
diff --git a/host/lib/usrp/mpmd/mpmd_find.cpp b/host/lib/usrp/mpmd/mpmd_find.cpp
index 5d2406b30..2b8e1350d 100644
--- a/host/lib/usrp/mpmd/mpmd_find.cpp
+++ b/host/lib/usrp/mpmd/mpmd_find.cpp
@@ -8,6 +8,7 @@
#include "mpmd_devices.hpp"
#include "mpmd_impl.hpp"
+#include <uhdlib/transport/dpdk_common.hpp>
#include <uhd/transport/if_addrs.hpp>
#include <uhd/transport/udp_simple.hpp>
#include <uhd/types/device_addr.hpp>
@@ -193,6 +194,15 @@ device_addrs_t mpmd_find_with_bcast(const device_addr_t& hint)
*/
device_addrs_t mpmd_find(const device_addr_t& hint_)
{
+#ifdef HAVE_DPDK
+ // Start DPDK so links come up
+ if (hint_.has_key("use_dpdk")) {
+ auto& dpdk_ctx = uhd::transport::uhd_dpdk_ctx::get();
+ if (not dpdk_ctx.is_init_done()) {
+ dpdk_ctx.init(hint_);
+ }
+ }
+#endif
device_addrs_t hints = separate_device_addr(hint_);
if (hint_.has_key("type")) {
if (std::find(MPM_DEVICE_TYPES.cbegin(), MPM_DEVICE_TYPES.cend(), hint_["type"])
diff --git a/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp
new file mode 100644
index 000000000..f739942b4
--- /dev/null
+++ b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.cpp
@@ -0,0 +1,266 @@
+//
+// Copyright 2017 Ettus Research, National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "mpmd_impl.hpp"
+#include "mpmd_xport_mgr.hpp"
+#include "mpmd_xport_ctrl_dpdk_udp.hpp"
+#include "../../transport/dpdk_zero_copy.hpp"
+#include <uhd/transport/udp_simple.hpp>
+#include <uhd/transport/udp_constants.hpp>
+#include <arpa/inet.h>
+
+
+using namespace uhd;
+using namespace uhd::mpmd::xport;
+
+namespace {
+constexpr unsigned int MPMD_UDP_RESERVED_FRAME_SIZE = 64;
+
+//! Maximum CHDR packet size in bytes
+const size_t MPMD_10GE_DATA_FRAME_MAX_SIZE = 4000;
+
+//! Number of send/recv frames
+const size_t MPMD_ETH_NUM_SEND_FRAMES = 32;
+const size_t MPMD_ETH_NUM_RECV_FRAMES = 128;
+const size_t MPMD_ETH_NUM_CTRL_FRAMES = 32;
+
+//! For MTU discovery, the time we wait for a packet before calling it
+// oversized (seconds).
+const double MPMD_MTU_DISCOVERY_TIMEOUT = 0.02;
+
+std::vector<std::string> get_addrs_from_mb_args(
+ const uhd::device_addr_t& mb_args
+) {
+ // mb_args must always include addr
+ if (not mb_args.has_key(FIRST_ADDR_KEY)) {
+ throw uhd::runtime_error("The " + FIRST_ADDR_KEY + " key must be specified in "
+ "device args to create an Ethernet transport to an RFNoC block");
+ }
+ std::vector<std::string> addrs{mb_args[FIRST_ADDR_KEY]};
+ if (mb_args.has_key(SECOND_ADDR_KEY)){
+ addrs.push_back(mb_args[SECOND_ADDR_KEY]);
+ }
+ return addrs;
+}
+
+/*! Do a binary search to discover MTU
+ *
+ * Uses the MPM echo service to figure out MTU. We simply send a bunch of
+ * packets and see if they come back until we converged on the path MTU.
+ * The end result must lie between \p min_frame_size and \p max_frame_size.
+ *
+ * \param address IP address
+ * \param port UDP port
+ * \param min_frame_size Minimum frame size, initialize algorithm to start
+ * with this value
+ * \param max_frame_size Maximum frame size, initialize algorithm to start
+ * with this value
+ * \param echo_timeout Timeout value in seconds. For frame sizes that
+ * exceed the MTU, we don't expect a response, and this
+ * is the amount of time we'll wait before we assume
+ * the frame size exceeds the MTU.
+ */
+size_t discover_mtu(
+ const std::string &address,
+ const std::string &port,
+ size_t min_frame_size,
+ size_t max_frame_size,
+ const double echo_timeout = 0.020
+) {
+ const auto &ctx = uhd::transport::uhd_dpdk_ctx::get();
+ const size_t echo_prefix_offset =
+ uhd::mpmd::mpmd_impl::MPM_ECHO_CMD.size();
+ const size_t mtu_hdr_len = echo_prefix_offset + 10;
+ const int port_id = ctx.get_route(address);
+ UHD_ASSERT_THROW(port_id >= 0);
+ UHD_ASSERT_THROW(min_frame_size < max_frame_size);
+ UHD_ASSERT_THROW(min_frame_size % 4 == 0);
+ UHD_ASSERT_THROW(max_frame_size % 4 == 0);
+ UHD_ASSERT_THROW(min_frame_size >= echo_prefix_offset + mtu_hdr_len);
+ using namespace uhd::transport;
+ uhd::transport::zero_copy_xport_params buff_args;
+ buff_args.recv_frame_size = max_frame_size;
+ buff_args.send_frame_size = max_frame_size;
+ buff_args.num_send_frames = 1;
+ buff_args.num_recv_frames = 1;
+ auto dev_addr = uhd::device_addr_t();
+ dpdk_zero_copy::sptr sock = dpdk_zero_copy::make(ctx,
+ (unsigned int) port_id, address, port, "0", buff_args, dev_addr);
+ std::string send_buf(uhd::mpmd::mpmd_impl::MPM_ECHO_CMD);
+ send_buf.resize(max_frame_size, '#');
+ UHD_ASSERT_THROW(send_buf.size() == max_frame_size);
+
+ // Little helper to check returned packets match the sent ones
+ auto require_bufs_match = [&send_buf, mtu_hdr_len](
+ const uint8_t *recv_buf,
+ const size_t len
+ ){
+ if (len < mtu_hdr_len or std::memcmp(
+ (void *) &recv_buf[0],
+ (void *) &send_buf[0],
+ mtu_hdr_len
+ ) != 0) {
+ throw uhd::runtime_error("Unexpected content of MTU "
+ "discovery return packet!");
+ }
+ };
+ UHD_LOG_TRACE("MPMD", "Determining UDP MTU... ");
+ size_t seq_no = 0;
+ while (min_frame_size < max_frame_size) {
+ managed_send_buffer::sptr msbuf = sock->get_send_buff(0);
+ UHD_ASSERT_THROW(msbuf.get() != nullptr);
+ max_frame_size = std::min(msbuf->size(), max_frame_size);
+ // Only test multiples of 4 bytes!
+ const size_t test_frame_size =
+ (max_frame_size/2 + min_frame_size/2 + 3) & ~size_t(3);
+ // Encode sequence number and current size in the string, makes it
+ // easy to debug in code or Wireshark. Is also used for identifying
+ // response packets.
+ std::sprintf(
+ &send_buf[echo_prefix_offset],
+ ";%04lu,%04lu",
+ seq_no++,
+ test_frame_size
+ );
+ // Copy to real buffer
+ UHD_LOG_TRACE("MPMD", "Testing frame size " << test_frame_size);
+ auto *tx_buf = msbuf->cast<uint8_t *>();
+ std::memcpy(tx_buf, &send_buf[0], test_frame_size);
+ msbuf->commit(test_frame_size);
+ msbuf.reset();
+
+ managed_recv_buffer::sptr mrbuf = sock->get_recv_buff(echo_timeout);
+ if (mrbuf.get() == nullptr || mrbuf->size() == 0) {
+ // Nothing received, so this is probably too big
+ max_frame_size = test_frame_size - 4;
+ } else if (mrbuf->size() >= test_frame_size) {
+ // Size went through, so bump the minimum
+ require_bufs_match(mrbuf->cast<uint8_t *>(), mrbuf->size());
+ min_frame_size = test_frame_size;
+ } else if (mrbuf->size() < test_frame_size) {
+ // This is an odd case. Something must have snipped the packet
+ // on the way back. Still, we'll just back off and try
+ // something smaller.
+ UHD_LOG_DEBUG("MPMD",
+ "Unexpected packet truncation during MTU discovery.");
+ require_bufs_match(mrbuf->cast<uint8_t *>(), mrbuf->size());
+ max_frame_size = mrbuf->size();
+ }
+ mrbuf.reset();
+ }
+ UHD_LOG_DEBUG("MPMD",
+ "Path MTU for address " << address << ": " << min_frame_size);
+ return min_frame_size;
+}
+
+}
+
+
+mpmd_xport_ctrl_dpdk_udp::mpmd_xport_ctrl_dpdk_udp(
+ const uhd::device_addr_t& mb_args
+) : _mb_args(mb_args)
+ , _ctx(uhd::transport::uhd_dpdk_ctx::get())
+ , _recv_args(filter_args(mb_args, "recv"))
+ , _send_args(filter_args(mb_args, "send"))
+ , _available_addrs(get_addrs_from_mb_args(mb_args))
+ , _mtu(MPMD_10GE_DATA_FRAME_MAX_SIZE)
+{
+ if (not _ctx.is_init_done()) {
+ _ctx.init(mb_args);
+ }
+ const std::string mpm_discovery_port = _mb_args.get(
+ mpmd_impl::MPM_DISCOVERY_PORT_KEY,
+ std::to_string(mpmd_impl::MPM_DISCOVERY_PORT)
+ );
+ auto discover_mtu_for_ip = [mpm_discovery_port](const std::string &ip_addr){
+ return discover_mtu(
+ ip_addr,
+ mpm_discovery_port,
+ IP_PROTOCOL_MIN_MTU_SIZE-IP_PROTOCOL_UDP_PLUS_IP_HEADER,
+ MPMD_10GE_DATA_FRAME_MAX_SIZE,
+ MPMD_MTU_DISCOVERY_TIMEOUT
+ );
+ };
+
+ for (const auto &ip_addr : _available_addrs) {
+ _mtu = std::min(_mtu, discover_mtu_for_ip(ip_addr));
+ }
+}
+
+uhd::both_xports_t
+mpmd_xport_ctrl_dpdk_udp::make_transport(
+ mpmd_xport_mgr::xport_info_t &xport_info,
+ const usrp::device3_impl::xport_type_t xport_type,
+ const uhd::device_addr_t& xport_args_
+) {
+ auto xport_args = xport_args_;
+
+ transport::zero_copy_xport_params default_buff_args;
+ // Create actual UHD-DPDK UDP transport
+ default_buff_args.recv_frame_size =
+ xport_args.cast<size_t>("recv_frame_size", get_mtu(uhd::RX_DIRECTION));
+ default_buff_args.send_frame_size =
+ xport_args.cast<size_t>("send_frame_size", get_mtu(uhd::TX_DIRECTION));
+ if (xport_type == usrp::device3_impl::ASYNC_MSG or
+ xport_type == usrp::device3_impl::CTRL) {
+ default_buff_args.num_recv_frames =
+ xport_args.cast<size_t>("num_recv_frames", MPMD_ETH_NUM_CTRL_FRAMES);
+ default_buff_args.num_send_frames =
+ xport_args.cast<size_t>("num_send_frames", MPMD_ETH_NUM_CTRL_FRAMES);
+ } else {
+ default_buff_args.num_recv_frames =
+ xport_args.cast<size_t>("num_recv_frames", MPMD_ETH_NUM_RECV_FRAMES);
+ default_buff_args.num_send_frames =
+ xport_args.cast<size_t>("num_send_frames", MPMD_ETH_NUM_SEND_FRAMES);
+ }
+
+ UHD_LOG_TRACE("BUFF", "num_recv_frames=" << default_buff_args.num_recv_frames
+ << ", num_send_frames=" << default_buff_args.num_send_frames
+ << ", recv_frame_size=" << default_buff_args.recv_frame_size
+ << ", send_frame_size=" << default_buff_args.send_frame_size);
+
+ int dpdk_port_id = _ctx.get_route(xport_info["ipv4"]);
+ if (dpdk_port_id < 0) {
+ throw uhd::runtime_error("Could not find a DPDK port with route to " +
+ xport_info["ipv4"]);
+ }
+ auto recv = transport::dpdk_zero_copy::make(
+ _ctx,
+ (const unsigned int) dpdk_port_id,
+ xport_info["ipv4"],
+ xport_info["port"],
+ "0",
+ default_buff_args,
+ xport_args
+ );
+ const uint16_t port = recv->get_local_port();
+ const std::string src_ip_addr = recv->get_local_addr();
+ xport_info["src_port"] = std::to_string(port);
+ xport_info["src_ipv4"] = src_ip_addr;
+
+ // Create both_xports_t object and finish:
+ both_xports_t xports;
+ xports.endianness = uhd::ENDIANNESS_BIG;
+ xports.send_sid = sid_t(xport_info["send_sid"]);
+ xports.recv_sid = xports.send_sid.reversed();
+ xports.recv_buff_size = (default_buff_args.recv_frame_size-MPMD_UDP_RESERVED_FRAME_SIZE)*default_buff_args.num_recv_frames;
+ xports.send_buff_size = (default_buff_args.send_frame_size-MPMD_UDP_RESERVED_FRAME_SIZE)*default_buff_args.num_send_frames;
+ xports.recv = recv; // Note: This is a type cast!
+ xports.send = recv; // This too
+ return xports;
+}
+
+bool mpmd_xport_ctrl_dpdk_udp::is_valid(
+ const mpmd_xport_mgr::xport_info_t& xport_info
+) const {
+ int dpdk_port_id = _ctx.get_route(xport_info.at("ipv4"));
+ return (dpdk_port_id >= 0);
+}
+
+size_t mpmd_xport_ctrl_dpdk_udp::get_mtu(const uhd::direction_t /*dir*/) const
+{
+ return _mtu;
+}
diff --git a/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp
new file mode 100644
index 000000000..1ee1549e0
--- /dev/null
+++ b/host/lib/usrp/mpmd/mpmd_xport_ctrl_dpdk_udp.hpp
@@ -0,0 +1,55 @@
+//
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_MPMD_XPORT_CTRL_DPDK_UDP_HPP
+#define INCLUDED_MPMD_XPORT_CTRL_DPDK_UDP_HPP
+
+#include "mpmd_xport_ctrl_base.hpp"
+#include "../transport/dpdk_zero_copy.hpp"
+#include <uhd/types/device_addr.hpp>
+#include "../device3/device3_impl.hpp"
+
+namespace uhd { namespace mpmd { namespace xport {
+
+/*! UDP transport manager
+ *
+ * Opens UDP sockets
+ */
+class mpmd_xport_ctrl_dpdk_udp : public mpmd_xport_ctrl_base
+{
+public:
+ mpmd_xport_ctrl_dpdk_udp(
+ const uhd::device_addr_t& mb_args
+ );
+
+ both_xports_t make_transport(
+ mpmd_xport_mgr::xport_info_t& xport_info,
+ const usrp::device3_impl::xport_type_t xport_type,
+ const uhd::device_addr_t& xport_args
+ );
+
+ bool is_valid(
+ const mpmd_xport_mgr::xport_info_t& xport_info
+ ) const;
+
+ size_t get_mtu(
+ const uhd::direction_t dir
+ ) const;
+
+private:
+ const uhd::device_addr_t _mb_args;
+ uhd::transport::uhd_dpdk_ctx &_ctx;
+ const uhd::dict<std::string, std::string> _recv_args;
+ const uhd::dict<std::string, std::string> _send_args;
+ //! A list of IP addresses we can connect our CHDR connections to
+ const std::vector<std::string> _available_addrs;
+ //! MTU
+ size_t _mtu;
+};
+
+}}} /* namespace uhd::mpmd::xport */
+
+#endif /* INCLUDED_MPMD_XPORT_CTRL_DPDK_UDP_HPP */
diff --git a/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp b/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp
index c2200c66a..d3023e3af 100644
--- a/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp
+++ b/host/lib/usrp/mpmd/mpmd_xport_mgr.cpp
@@ -11,6 +11,9 @@
#ifdef HAVE_LIBERIO
# include "mpmd_xport_ctrl_liberio.hpp"
#endif
+#ifdef HAVE_DPDK
+# include "mpmd_xport_ctrl_dpdk_udp.hpp"
+#endif
uhd::dict<std::string, std::string> uhd::mpmd::xport::filter_args(
const uhd::device_addr_t& args, const std::string& prefix)
@@ -114,6 +117,11 @@ private:
const std::string& xport_medium, const uhd::device_addr_t& mb_args) const
{
if (xport_medium == "UDP") {
+#ifdef HAVE_DPDK
+ if (mb_args.has_key("use_dpdk")) {
+ return mpmd_xport_ctrl_base::uptr(new mpmd_xport_ctrl_dpdk_udp(mb_args));
+ }
+#endif
return mpmd_xport_ctrl_base::uptr(new mpmd_xport_ctrl_udp(mb_args));
#ifdef HAVE_LIBERIO
} else if (xport_medium == "liberio") {
diff --git a/host/lib/utils/prefs.cpp b/host/lib/utils/prefs.cpp
index 2ccc538fc..88be300cb 100644
--- a/host/lib/utils/prefs.cpp
+++ b/host/lib/utils/prefs.cpp
@@ -48,6 +48,23 @@ namespace {
uhd::prefs::get_uhd_config().get<std::string>(key_str, key);
}
}
+
+ device_addr_t get_args(
+ const uhd::device_addr_t& user_args,
+ const std::vector<std::string>& keys_to_update_from
+ ) {
+ device_addr_t args;
+ for (const auto& key : keys_to_update_from) {
+ update_from_key(key, user_args.get(key, ""), args);
+ }
+
+ // Finally, copy over the original user args:
+ for (const auto& user_key : user_args.keys()) {
+ args[user_key] = user_args[user_key];
+ }
+
+ return args;
+ }
}
config_parser& uhd::prefs::get_uhd_config()
@@ -81,22 +98,28 @@ config_parser& uhd::prefs::get_uhd_config()
device_addr_t uhd::prefs::get_usrp_args(
const uhd::device_addr_t &user_args
) {
- device_addr_t usrp_args;
const std::vector<std::string> keys_to_update_from = {
"type",
"product",
"serial"
};
+ return get_args(user_args, keys_to_update_from);
+}
- for (const auto& key : keys_to_update_from) {
- update_from_key(key, user_args.get(key, ""), usrp_args);
- }
-
- // Finally, copy over the original user args:
- for (const auto &user_key : user_args.keys()) {
- usrp_args[user_key] = user_args[user_key];
- }
-
- return usrp_args;
+device_addr_t uhd::prefs::get_dpdk_args(
+ const uhd::device_addr_t &user_args
+) {
+ const std::vector<std::string> keys_to_update_from = {
+ "use_dpdk"
+ };
+ return get_args(user_args, keys_to_update_from);
}
+device_addr_t uhd::prefs::get_dpdk_nic_args(
+ const uhd::device_addr_t &user_args
+) {
+ const std::vector<std::string> keys_to_update_from = {
+ "dpdk-mac"
+ };
+ return get_args(user_args, keys_to_update_from);
+}
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index 9eb48eedb..01076540d 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -113,6 +113,10 @@ if(ENABLE_DPDK)
include_directories(${DPDK_INCLUDE_DIR})
add_executable(dpdk_test
dpdk_test.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp
${CMAKE_SOURCE_DIR}/lib/transport/dpdk_zero_copy.cpp
)
target_link_libraries(dpdk_test uhd ${Boost_LIBRARIES} ${DPDK_LIBRARIES})
diff --git a/host/tests/dpdk_test.cpp b/host/tests/dpdk_test.cpp
index 43ef3d388..b506c7b30 100644
--- a/host/tests/dpdk_test.cpp
+++ b/host/tests/dpdk_test.cpp
@@ -43,7 +43,7 @@ struct dpdk_test_args
std::string src_port;
std::string dst_ip;
std::string dst_port;
- pthread_cond_t* cond;
+ pthread_cond_t *cond;
pthread_mutex_t mutex;
bool started;
int cpu;
@@ -61,7 +61,7 @@ struct dpdk_test_stats
};
-static void process_udp(int id, uint32_t* udp_data, struct dpdk_test_stats* stats)
+static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stats)
{
if (udp_data[0] != stats[id].last_seqno + 1) {
stats[id].lasts[stats[id].dropped_packets & 0xf] = stats[id].last_seqno;
@@ -76,21 +76,21 @@ static void process_udp(int id, uint32_t* udp_data, struct dpdk_test_stats* stat
static void send_udp(uhd::transport::dpdk_zero_copy::sptr& stream,
int id,
bool fc_only,
- struct dpdk_test_stats* stats)
+ struct dpdk_test_stats *stats)
{
uhd::transport::managed_send_buffer::sptr mbuf = stream->get_send_buff(0);
if (mbuf.get() == nullptr) {
printf("Could not get TX buffer!\n");
return;
}
- auto* tx_data = mbuf->cast<uint32_t*>();
+ auto *tx_data = mbuf->cast<uint32_t *>();
tx_data[0] = fc_only ? stats[id].tx_seqno - 1 : stats[id].tx_seqno;
tx_data[1] = stats[id].last_seqno;
if (!fc_only) {
- memset(&tx_data[2], stats[id].last_seqno, 8 * BENCH_SPP);
- stats[id].tx_xfer += 8 * BENCH_SPP;
+ memset(&tx_data[2], stats[id].last_seqno, 8*BENCH_SPP);
+ stats[id].tx_xfer += 8*BENCH_SPP;
}
- size_t num_bytes = 8 + (fc_only ? 0 : 8 * BENCH_SPP);
+ size_t num_bytes = 8 + (fc_only ? 0 : 8*BENCH_SPP);
mbuf->commit(num_bytes);
mbuf.reset();
@@ -100,12 +100,11 @@ static void send_udp(uhd::transport::dpdk_zero_copy::sptr& stream,
}
static void bench(
- uhd::transport::dpdk_zero_copy::sptr* stream, uint32_t nb_ports, double timeout)
+ uhd::transport::dpdk_zero_copy::sptr *stream, uint32_t nb_ports, double timeout)
{
uint64_t total_xfer[NUM_PORTS];
uint32_t id;
- struct dpdk_test_stats* stats =
- (struct dpdk_test_stats*)malloc(sizeof(*stats) * nb_ports);
+ struct dpdk_test_stats *stats = (struct dpdk_test_stats *) malloc(sizeof(*stats)*nb_ports);
for (id = 0; id < nb_ports; id++) {
stats[id].tx_seqno = 1;
stats[id].tx_xfer = 0;
@@ -137,7 +136,7 @@ static void bench(
if (nb_rx <= 0) {
consec_no_rx++;
if (consec_no_rx >= 100000) {
- uint32_t skt_drops = stream[id]->get_drop_count();
+ // uint32_t skt_drops = stream[id]->get_drop_count();
// printf("TX seq %d, TX ack %d, RX seq %d, %d drops!\n",
// stats[id].tx_seqno, stats[id].last_ackno, stats[id].last_seqno,
// skt_drops);
@@ -151,7 +150,7 @@ static void bench(
for (unsigned int buf = 0; buf < nb_rx; buf++) {
total_xfer[id] += bufs[buf]->size();
- auto data = bufs[buf]->cast<uint32_t*>();
+ auto data = bufs[buf]->cast<uint32_t *>();
process_udp(id, data, stats);
}
@@ -185,15 +184,15 @@ static void bench(
printf("Bytes received = %ld\n", total_xfer[id]);
printf("Bytes sent = %ld\n", stats[id].tx_xfer);
printf("Time taken = %ld us\n",
- (bench_end.tv_sec - bench_start.tv_sec) * 1000000
+ (bench_end.tv_sec - bench_start.tv_sec)*1000000
+ (bench_end.tv_usec - bench_start.tv_usec));
- double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec) * 1000000
- + (bench_end.tv_usec - bench_start.tv_usec);
+ double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000
+ + (bench_end.tv_usec - bench_start.tv_usec);
elapsed_time *= 1.0e-6;
double elapsed_bytes = total_xfer[id];
- printf("RX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time);
+ printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
elapsed_bytes = stats[id].tx_xfer;
- printf("TX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time);
+ printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
uint32_t skt_drops = stream[id]->get_drop_count();
printf("Dropped %d packets\n", stats[id].dropped_packets);
printf("Socket reports dropped %d packets\n", skt_drops);
@@ -221,9 +220,9 @@ static inline void set_cpu(pthread_t t, int cpu)
}
}
-void* prepare_and_bench_blocking(void* arg)
+void *prepare_and_bench_blocking(void *arg)
{
- struct dpdk_test_args* args = (struct dpdk_test_args*)arg;
+ struct dpdk_test_args *args = (struct dpdk_test_args *) arg;
pthread_mutex_lock(&args->mutex);
pthread_t t = pthread_self();
set_cpu(t, args->cpu);
@@ -236,14 +235,16 @@ void* prepare_and_bench_blocking(void* arg)
buff_args.send_frame_size = 8000;
buff_args.num_send_frames = 8;
buff_args.num_recv_frames = 8;
- auto dev_addr = uhd::device_addr_t();
- eth_data[0] = uhd::transport::dpdk_zero_copy::make(ctx,
+ auto dev_addr = uhd::device_addr_t();
+ eth_data[0] = uhd::transport::dpdk_zero_copy::make(
+ ctx,
args->portid,
args->dst_ip,
args->src_port,
args->dst_port,
buff_args,
- dev_addr);
+ dev_addr
+ );
bench(eth_data, 1, 0.1);
return 0;
@@ -274,29 +275,31 @@ void prepare_and_bench_polling(void)
buff_args.num_recv_frames = 8;
auto dev_addr = uhd::device_addr_t();
for (unsigned int i = 0; i < NUM_PORTS; i++) {
- eth_data[i] = uhd::transport::dpdk_zero_copy::make(ctx,
+ eth_data[i] = uhd::transport::dpdk_zero_copy::make(
+ ctx,
bench_args[i].portid,
bench_args[i].dst_ip,
bench_args[i].src_port,
bench_args[i].dst_port,
buff_args,
- dev_addr);
+ dev_addr
+ );
}
bench(eth_data, NUM_PORTS, 0.0);
}
-int main(int argc, char** argv)
+int main(int argc, char **argv)
{
- int retval, io0_cpu = 1, io1_cpu = 1, user0_cpu = 0, user1_cpu = 2;
+ int retval, user0_cpu = 0, user1_cpu = 2;
std::string args, cpusets;
po::options_description desc("Allowed options");
- desc.add_options()("help", "help message")(
- "args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")(
- "polling-mode", "Use polling mode (single thread on own core)")("cpusets",
- po::value<std::string>(&cpusets)->default_value(""),
- "which core(s) to use for a given thread (specify something like "
- "\"io0=1,io1=1,user0=0,user1=2\")");
+ desc.add_options()
+ ("help", "help message")
+ ("args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")
+ ("polling-mode", "Use polling mode (single thread on own core)")
+ ("cpusets", po::value<std::string>(&cpusets)->default_value(""), "which core(s) to use for a given thread (specify something like \"user0=0,user1=2\")")
+ ;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
@@ -317,20 +320,15 @@ int main(int argc, char** argv)
auto cpuset_map = uhd::device_addr_t(cpusets);
for (std::string& key : cpuset_map.keys()) {
- if (key == "io0") {
- io0_cpu = std::stoi(cpuset_map[key], NULL, 0);
- } else if (key == "io1") {
- io1_cpu = std::stoi(cpuset_map[key], NULL, 0);
- } else if (key == "user0") {
+ if (key == "user0") {
user0_cpu = std::stoi(cpuset_map[key], NULL, 0);
} else if (key == "user1") {
user1_cpu = std::stoi(cpuset_map[key], NULL, 0);
}
}
- int port_thread_mapping[2] = {io0_cpu, io1_cpu};
auto& ctx = uhd::transport::uhd_dpdk_ctx::get();
- ctx.init(dpdk_args, 2, &port_thread_mapping[0], NUM_MBUFS, MBUF_CACHE_SIZE, 9000);
+ ctx.init(dpdk_args);
uint32_t eth_ip = htonl(0xc0a80003);
uint32_t eth_mask = htonl(0xffffff00);
@@ -391,12 +389,12 @@ int main(int argc, char** argv)
pthread_cond_broadcast(&cond);
- status = pthread_join(threads[0], (void**)&retval);
+ status = pthread_join(threads[0], (void **) &retval);
if (status) {
perror("Error while joining thread");
return status;
}
- status = pthread_join(threads[1], (void**)&retval);
+ status = pthread_join(threads[1], (void **) &retval);
if (status) {
perror("Error while joining thread");
return status;