aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/dpdk_simple.cpp192
-rw-r--r--host/lib/transport/dpdk_zero_copy.cpp405
-rw-r--r--host/lib/transport/dpdk_zero_copy.hpp86
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk.c49
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h13
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c19
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h3
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c74
-rw-r--r--host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h1
10 files changed, 558 insertions, 285 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 63d44c80b..7794f0760 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -141,6 +141,7 @@ if(ENABLE_DPDK)
INCLUDE_SUBDIRECTORY(uhd-dpdk)
LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/dpdk_zero_copy.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp
)
endif(ENABLE_DPDK)
diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp
new file mode 100644
index 000000000..74bb979ef
--- /dev/null
+++ b/host/lib/transport/dpdk_simple.cpp
@@ -0,0 +1,192 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/dpdk_simple.hpp>
+#include <uhdlib/transport/uhd-dpdk.h>
+#include <arpa/inet.h>
+
+namespace uhd { namespace transport {
+
+namespace {
+ constexpr uint64_t USEC = 1000000;
+ // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC
+ constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4;
+}
+
+class dpdk_simple_impl : public dpdk_simple {
+public:
+ dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr,
+ const std::string &port, bool filter_bcast)
+ {
+ UHD_ASSERT_THROW(ctx.is_init_done());
+
+ // Get NIC that can route to addr
+ int port_id = ctx.get_route(addr);
+ UHD_ASSERT_THROW(port_id >= 0);
+
+ _port_id = port_id;
+ uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
+ uint16_t dst_port = htons(std::stoi(port, NULL, 0));
+
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .filter_bcast = filter_bcast,
+ .local_port = 0,
+ .remote_port = dst_port,
+ .dst_addr = dst_ipv4,
+ .num_bufs = 1
+ };
+ _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
+ UHD_ASSERT_THROW(_rx_sock != nullptr);
+
+ // Backfill the local port, in case it was auto-assigned
+ uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+ sockarg.is_tx = true;
+ sockarg.remote_port = dst_port;
+ sockarg.dst_addr = dst_ipv4;
+ sockarg.num_bufs = 1;
+ _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
+ UHD_ASSERT_THROW(_tx_sock != nullptr);
+ UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":"
+ << ntohs(dst_port) << " and NIC(" << _port_id
+ << "):" << ntohs(sockarg.local_port));
+ }
+
+ ~dpdk_simple_impl(void)
+ {
+ if (_rx_mbuf)
+ uhd_dpdk_free_buf(_rx_mbuf);
+ if (_tx_mbuf)
+ uhd_dpdk_free_buf(_tx_mbuf);
+ }
+
+ /*!
+ * Request a single send buffer of specified size.
+ *
+ * \param buf a pointer to place to write buffer location
+ * \return the maximum length of the buffer
+ */
+ size_t get_tx_buf(void** buf)
+ {
+ UHD_ASSERT_THROW(!_tx_mbuf);
+ int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, &_tx_mbuf, 1, 0);
+ if (bufs != 1 || !_tx_mbuf) {
+ *buf = nullptr;
+ return 0;
+ }
+ *buf = uhd_dpdk_buf_to_data(_tx_sock, _tx_mbuf);
+ return _mtu - DPDK_SIMPLE_NONDATA_SIZE;
+ }
+
+ /*!
+ * Send and release outstanding buffer
+ *
+ * \param length bytes of data to send
+ * \return number of bytes sent (releases buffer if sent)
+ */
+ size_t send(size_t length)
+ {
+ UHD_ASSERT_THROW(_tx_mbuf)
+ _tx_mbuf->pkt_len = length;
+ _tx_mbuf->data_len = length;
+ int num_tx = uhd_dpdk_send(_tx_sock, &_tx_mbuf, 1);
+ if (num_tx == 0)
+ return 0;
+ _tx_mbuf = nullptr;
+ return length;
+ }
+
+ /*!
+ * Receive a single packet.
+ * Buffer provided by transport (must be freed before next operation).
+ *
+ * \param buf a pointer to place to write buffer location
+ * \param timeout the timeout in seconds
+ * \return the number of bytes received or zero on timeout
+ */
+ size_t recv(void **buf, double timeout)
+ {
+ UHD_ASSERT_THROW(!_rx_mbuf);
+ int bufs = uhd_dpdk_recv(_rx_sock, &_rx_mbuf, 1, (int) (timeout*USEC));
+ if (bufs != 1 || _rx_mbuf == nullptr) {
+ *buf = nullptr;
+ return 0;
+ }
+ if ((_tx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
+ uhd_dpdk_free_buf(_rx_mbuf);
+ _rx_mbuf = nullptr;
+ return 0;
+ }
+ uhd_dpdk_get_src_ipv4(_rx_sock, _rx_mbuf, &_last_recv_addr);
+ *buf = uhd_dpdk_buf_to_data(_rx_sock, _rx_mbuf);
+ return uhd_dpdk_get_len(_rx_sock, _rx_mbuf);
+ }
+
+ /*!
+ * Return/free receive buffer
+ * Can also use to free un-sent TX bufs
+ */
+ void put_rx_buf(void)
+ {
+ UHD_ASSERT_THROW(_rx_mbuf)
+ uhd_dpdk_free_buf(_rx_mbuf);
+ }
+
+ /*!
+ * Get the last IP address as seen by recv().
+ * Only use this with the broadcast socket.
+ */
+ std::string get_recv_addr(void)
+ {
+ char addr_str[INET_ADDRSTRLEN];
+ struct in_addr ipv4_addr;
+ ipv4_addr.s_addr = _last_recv_addr;
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
+ }
+
+ /*!
+ * Get the IP address for the destination
+ */
+ std::string get_send_addr(void)
+ {
+ struct in_addr ipv4_addr;
+ int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr);
+ UHD_ASSERT_THROW(status);
+ char addr_str[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
+ }
+private:
+ unsigned int _port_id;
+ size_t _mtu;
+ struct uhd_dpdk_socket *_tx_sock;
+ struct rte_mbuf *_tx_mbuf = nullptr;
+ struct uhd_dpdk_socket *_rx_sock;
+ struct rte_mbuf *_rx_mbuf = nullptr;
+ uint32_t _last_recv_addr;
+};
+
+dpdk_simple::~dpdk_simple(void) {}
+
+/***********************************************************************
+ * DPDK simple transport public make functions
+ **********************************************************************/
+dpdk_simple::sptr dpdk_simple::make_connected(
+ struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port
+){
+ return sptr(new dpdk_simple_impl(ctx, addr, port, true));
+}
+
+dpdk_simple::sptr dpdk_simple::make_broadcast(
+ struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port
+){
+ return sptr(new dpdk_simple_impl(ctx, addr, port, false));
+}
+}} // namespace uhd::transport
+
+
diff --git a/host/lib/transport/dpdk_zero_copy.cpp b/host/lib/transport/dpdk_zero_copy.cpp
index 51c9b9dba..9649c1cfb 100644
--- a/host/lib/transport/dpdk_zero_copy.cpp
+++ b/host/lib/transport/dpdk_zero_copy.cpp
@@ -7,24 +7,28 @@
#include "dpdk_zero_copy.hpp"
#include <uhd/config.hpp>
#include <uhd/utils/log.hpp>
-#include <uhd/utils/static.hpp>
#include <uhdlib/transport/uhd-dpdk.h>
-#include <arpa/inet.h>
-#include <sys/syslog.h>
+#include <uhdlib/utils/prefs.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/format.hpp>
#include <boost/make_shared.hpp>
#include <stack>
+#include <sys/syslog.h>
+#include <arpa/inet.h>
namespace uhd { namespace transport {
namespace {
-static constexpr uint64_t USEC = 1000000;
-// FIXME: Make configurable and have uhd-dpdk library track buffer sizes
-static constexpr size_t DEFAULT_FRAME_SIZE = 8000;
-
-inline char* eal_add_opt(
- std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)
+constexpr uint64_t USEC = 1000000;
+constexpr size_t DEFAULT_FRAME_SIZE = 8000;
+constexpr int DEFAULT_NUM_MBUFS = 4095;
+constexpr int DEFAULT_MBUF_CACHE_SIZE = 315;
+constexpr size_t UHD_DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP
+
+inline char * eal_add_opt(std::vector<const char*> &argv, size_t n,
+ char *dst, const char *opt, const char *arg)
{
- char* ptr = dst;
+ char *ptr = dst;
strncpy(ptr, opt, n);
argv.push_back(ptr);
ptr += strlen(opt) + 1;
@@ -34,6 +38,78 @@ inline char* eal_add_opt(
ptr += strlen(arg) + 1;
return ptr;
}
+
+inline void uhd_dpdk_eal_init(const device_addr_t &eal_args)
+{
+ /* Build up argc and argv */
+ std::vector<const char *> argv;
+ argv.push_back("uhd-dpdk");
+ auto args = new std::array<char, 4096>();
+ char *opt = args->data();
+ char *end = args->data() + args->size();
+ for (std::string &key : eal_args.keys()) {
+ std::string val = eal_args[key];
+ if (key == "dpdk-coremask") {
+ opt = eal_add_opt(argv, end - opt, opt, "-c",
+ val.c_str());
+ } else if (key == "dpdk-corelist") {
+ /* NOTE: This arg may have commas, so limited to config file */
+ opt = eal_add_opt(argv, end - opt, opt, "-l",
+ val.c_str());
+ } else if (key == "dpdk-coremap") {
+ opt = eal_add_opt(argv, end - opt, opt, "--lcores",
+ val.c_str());
+ } else if (key == "dpdk-master-lcore") {
+ opt = eal_add_opt(argv, end - opt, opt, "--master-lcore",
+ val.c_str());
+ } else if (key == "dpdk-pci-blacklist") {
+ opt = eal_add_opt(argv, end - opt, opt, "-b",
+ val.c_str());
+ } else if (key == "dpdk-pci-whitelist") {
+ opt = eal_add_opt(argv, end - opt, opt, "-w",
+ val.c_str());
+ } else if (key == "dpdk-log-level") {
+ opt = eal_add_opt(argv, end - opt, opt, "--log-level",
+ val.c_str());
+ } else if (key == "dpdk-huge-dir") {
+ opt = eal_add_opt(argv, end - opt, opt, "--huge-dir",
+ val.c_str());
+ } else if (key == "dpdk-file-prefix") {
+ opt = eal_add_opt(argv, end - opt, opt, "--file-prefix",
+ val.c_str());
+ } else if (key == "dpdk-driver") {
+ opt = eal_add_opt(argv, end - opt, opt, "-d",
+ val.c_str());
+ }
+ /* TODO: Change where log goes?
+ int rte_openlog_stream( FILE * f)
+ */
+ }
+ /* Init DPDK's EAL */
+ uhd_dpdk_init(argv.size(), argv.data());
+ delete args;
+}
+
+inline std::string eth_addr_to_string(struct eth_addr mac_addr)
+{
+ auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx");
+ mac_stream % (uint32_t) mac_addr.addr[0] % (uint32_t) mac_addr.addr[1]
+ % (uint32_t) mac_addr.addr[2] % (uint32_t) mac_addr.addr[3]
+ % (uint32_t) mac_addr.addr[4] % (uint32_t) mac_addr.addr[5];
+ return mac_stream.str();
+}
+
+inline void separate_ipv4_addr(const std::string ipv4,
+ uint32_t &ipv4_addr, uint32_t &netmask)
+{
+ std::vector<std::string> result;
+ boost::algorithm::split(result, ipv4,
+ [](const char &in) {return in == '/';}, boost::token_compress_on);
+ UHD_ASSERT_THROW(result.size() == 2);
+ ipv4_addr = (uint32_t) inet_addr(result[0].c_str());
+ int netbits = std::atoi(result[1].c_str());
+ netmask = htonl(0xffffffff << (32-netbits));
+}
} // namespace
uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {}
@@ -41,69 +117,82 @@ uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {}
uhd_dpdk_ctx::~uhd_dpdk_ctx(void) {}
/* Initialize uhd-dpdk (and do only once) */
-void uhd_dpdk_ctx::init(const dict<std::string, std::string>& eal_args,
- unsigned int num_ports,
- int* port_thread_mapping,
- int num_mbufs,
- int mbuf_cache_size,
- size_t mtu)
+void uhd_dpdk_ctx::init(const device_addr_t &user_args)
{
std::lock_guard<std::mutex> lock(_init_mutex);
if (!_init_done) {
- _mtu = mtu;
- /* Build up argc and argv */
- std::vector<const char*> argv;
- argv.push_back("uhd-dpdk");
- char* args = new char[4096];
- char* opt = args;
- char* end = args + sizeof(args);
- for (std::string& key : eal_args.keys()) {
- std::string val = eal_args[key];
- if (key == "coremask") {
- opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str());
- } else if (key == "corelist") {
- /* NOTE: This arg may have commas, so limited to config file */
- opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str());
- } else if (key == "coremap") {
- opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str());
- } else if (key == "master-lcore") {
- opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str());
- } else if (key == "pci-blacklist") {
- opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str());
- } else if (key == "pci-whitelist") {
- opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str());
- } else if (key == "log-level") {
- opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str());
- } else if (key == "huge-dir") {
- opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str());
- } else if (key == "file-prefix") {
- opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str());
+ /* Gather global config, build args for EAL, and init UHD-DPDK */
+ const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args);
+ UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl
+ << dpdk_args.to_pp_string());
+ uhd_dpdk_eal_init(dpdk_args);
+
+ _mtu = dpdk_args.has_key("dpdk-mtu")
+ ? dpdk_args.cast<size_t>("dpdk-mtu", 0)
+ : DEFAULT_FRAME_SIZE;
+ const int num_mbufs = dpdk_args.has_key("dpdk-num-mbufs")
+ ? dpdk_args.cast<int>("dpdk-num-mbufs", 0)
+ : DEFAULT_NUM_MBUFS;
+ const int mbuf_cache_size = dpdk_args.has_key("dpdk-mbuf-cache-size")
+ ? dpdk_args.cast<int>("dpdk-mbuf-cache-size", 0)
+ : DEFAULT_MBUF_CACHE_SIZE;
+
+ /* Get configuration for all the NIC ports */
+ device_addrs_t args = separate_device_addr(user_args);
+ int num_ports = uhd_dpdk_port_count();
+ std::vector<int> io_cpu_map(num_ports);
+ device_addrs_t nics(num_ports);
+ for (ssize_t i = 0; i < num_ports; i++) {
+ struct eth_addr mac_addr = uhd_dpdk_get_eth_addr(i);
+ nics[i]["dpdk-mac"] = eth_addr_to_string(mac_addr);
+ for (const auto &arg: args) {
+ if (arg.has_key("dpdk-mac")
+ && arg["dpdk-mac"] == nics[i]["dpdk-mac"]) {
+ for (const auto& key: arg.keys()) {
+ nics[i][key] = arg[key];
+ }
+ break;
+ }
}
+ nics[i] = uhd::prefs::get_dpdk_nic_args(nics[i]);
+ if (nics[i].has_key("dpdk-ipv4")
+ && nics[i].has_key("dpdk-io-cpu")) {
+ uint32_t ipv4_addr, netmask;
+ io_cpu_map[i] = std::atoi(nics[i]["dpdk-io-cpu"].c_str());
+ separate_ipv4_addr(nics[i]["dpdk-ipv4"], ipv4_addr, netmask);
+ uhd_dpdk_set_ipv4_addr((unsigned int) i, ipv4_addr, netmask);
+ } else {
+ /* Not enough configuration to use NIC */
+ io_cpu_map[i] = -1;
+ }
+ UHD_LOG_TRACE("DPDK", "Found NIC(" << i << "):" << std::endl
+ << nics[i].to_pp_string());
}
- uhd_dpdk_init(argv.size(),
- argv.data(),
- num_ports,
- port_thread_mapping,
- num_mbufs,
- mbuf_cache_size,
- _mtu);
- delete args;
+ uhd_dpdk_start(num_ports, io_cpu_map.data(), num_mbufs,
+ mbuf_cache_size, _mtu);
_init_done = true;
}
}
-int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& port_id)
+size_t uhd_dpdk_ctx::get_mtu(void) const
+{
+ UHD_ASSERT_THROW(is_init_done());
+ return _mtu;
+}
+
+int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr,
+ unsigned int &port_id) const
{
UHD_ASSERT_THROW(is_init_done());
int num_ports = uhd_dpdk_port_count();
for (int i = 0; i < num_ports; i++) {
- struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int)i);
+ struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int) i);
for (int j = 0; j < 6; j++) {
if (mac_addr[j] != port_mac_addr.addr[j]) {
break;
}
if (j == 5) {
- port_id = (unsigned int)i;
+ port_id = (unsigned int) i;
return 0;
}
}
@@ -111,52 +200,54 @@ int uhd_dpdk_ctx::get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& por
return -1;
}
-int uhd_dpdk_ctx::get_route(const std::string& addr) const
+int uhd_dpdk_ctx::get_route(const std::string &addr) const
{
- const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
+ const uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
const unsigned int num_ports = uhd_dpdk_port_count();
for (unsigned int port = 0; port < num_ports; port++) {
uint32_t src_ipv4;
uint32_t netmask;
+ if (uhd_dpdk_port_link_status(port) < 1)
+ continue;
uhd_dpdk_get_ipv4_addr(port, &src_ipv4, &netmask);
if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) {
- return (int)port;
- }
+ return (int) port;
+ }
}
return -ENODEV;
}
-int uhd_dpdk_ctx::set_ipv4_addr(
- unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask)
+int uhd_dpdk_ctx::set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr,
+ uint32_t netmask)
{
return uhd_dpdk_set_ipv4_addr(port_id, ipv4_addr, netmask);
}
-bool uhd_dpdk_ctx::is_init_done(void)
+bool uhd_dpdk_ctx::is_init_done(void) const
{
return _init_done.load();
}
-class dpdk_zero_copy_msb : public managed_send_buffer
-{
+class dpdk_zero_copy_msb : public managed_send_buffer {
public:
- dpdk_zero_copy_msb(struct uhd_dpdk_socket* sock,
- std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>>& free_bufs)
- : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){};
+ dpdk_zero_copy_msb(struct uhd_dpdk_socket *sock,
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &free_bufs,
+ size_t frame_size)
+ : _sock(sock), _buf(nullptr), _free_bufs(free_bufs),
+ _frame_size(frame_size) {};
~dpdk_zero_copy_msb(void) {}
void release(void)
{
if (_buf) {
- _buf->pkt_len = _length;
+ _buf->pkt_len = _length;
_buf->data_len = _length;
- int num_tx = uhd_dpdk_send(_sock, &_buf, 1);
+ int num_tx = uhd_dpdk_send(_sock, &_buf, 1);
if (num_tx == 0) {
/* Drop packet and free buffer (do not share sockets!) */
- UHD_LOG_ERROR(
- "DPDK", "Invalid shared socket usage detected. Dropping packet...");
+ UHD_LOG_ERROR("DPDK", "Invalid shared socket usage detected. Dropping packet...");
uhd_dpdk_free_buf(_buf);
}
// Push back into pool
@@ -170,21 +261,22 @@ public:
if (bufs != 1 || !_buf)
return sptr();
- return make(this, uhd_dpdk_buf_to_data(_sock, _buf), DEFAULT_FRAME_SIZE);
+ return make(this, uhd_dpdk_buf_to_data(_sock, _buf),
+ _frame_size);
}
private:
- struct uhd_dpdk_socket* _sock;
- struct rte_mbuf* _buf;
- std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>>& _free_bufs;
+ struct uhd_dpdk_socket *_sock;
+ struct rte_mbuf *_buf;
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> &_free_bufs;
+ size_t _frame_size;
};
-class dpdk_zero_copy_mrb : public managed_recv_buffer
-{
+class dpdk_zero_copy_mrb : public managed_recv_buffer {
public:
- dpdk_zero_copy_mrb(struct uhd_dpdk_socket* sock,
- std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>>& free_bufs)
- : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){};
+ dpdk_zero_copy_mrb(struct uhd_dpdk_socket *sock,
+ std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> &free_bufs)
+ : _sock(sock), _buf(nullptr), _free_bufs(free_bufs) {};
~dpdk_zero_copy_mrb(void) {}
void release(void)
@@ -197,78 +289,78 @@ public:
sptr get_new(double timeout)
{
- int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int)(timeout * USEC));
+ int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int) (timeout*USEC));
if (bufs != 1 || _buf == nullptr) {
// Push back into pool if we didn't get a real buffer
_free_bufs.push(this);
return sptr();
}
- if ((_buf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
- UHD_LOG_WARNING("DPDK", "IP checksum failure detected. Dropping packet...");
- uhd_dpdk_free_buf(_buf);
- _free_bufs.push(this);
- return sptr();
- }
- return make(
- this, uhd_dpdk_buf_to_data(_sock, _buf), uhd_dpdk_get_len(_sock, _buf));
+ return make(this, uhd_dpdk_buf_to_data(_sock, _buf),
+ uhd_dpdk_get_len(_sock, _buf));
}
private:
- struct uhd_dpdk_socket* _sock;
- struct rte_mbuf* _buf;
- std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>>& _free_bufs;
+ struct uhd_dpdk_socket *_sock;
+ struct rte_mbuf *_buf;
+ std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> &_free_bufs;
};
-class dpdk_zero_copy_impl : public dpdk_zero_copy
-{
+class dpdk_zero_copy_impl : public dpdk_zero_copy {
public:
- dpdk_zero_copy_impl(struct uhd_dpdk_ctx& ctx,
- const unsigned int dpdk_port_id,
- const std::string& addr,
- const std::string& remote_port,
- const std::string& local_port,
- const zero_copy_xport_params& xport_params)
- : _num_send_frames(xport_params.num_send_frames)
- , _send_frame_size(xport_params.send_frame_size)
- , _num_recv_frames(xport_params.num_recv_frames)
- , _recv_frame_size(xport_params.recv_frame_size)
- , _port_id(dpdk_port_id)
- , _rx_empty_count(0)
- , _tx_empty_count(0)
+
+ dpdk_zero_copy_impl(const struct uhd_dpdk_ctx &ctx,
+ const unsigned int dpdk_port_id,
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port,
+ const zero_copy_xport_params& xport_params)
+ : _num_send_frames(xport_params.num_send_frames),
+ _send_frame_size(xport_params.send_frame_size),
+ _num_recv_frames(xport_params.num_recv_frames),
+ _recv_frame_size(xport_params.recv_frame_size),
+ _port_id(dpdk_port_id),
+ _rx_empty_count(0),
+ _tx_empty_count(0)
{
- // TODO: Handle xport_params
UHD_ASSERT_THROW(xport_params.recv_frame_size > 0);
UHD_ASSERT_THROW(xport_params.send_frame_size > 0);
UHD_ASSERT_THROW(xport_params.num_send_frames > 0);
UHD_ASSERT_THROW(xport_params.num_recv_frames > 0);
UHD_ASSERT_THROW(ctx.is_init_done());
+ UHD_ASSERT_THROW(xport_params.recv_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE);
+ UHD_ASSERT_THROW(xport_params.send_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE);
const int num_ports = uhd_dpdk_port_count();
UHD_ASSERT_THROW(num_ports > 0);
- UHD_ASSERT_THROW(dpdk_port_id < (unsigned int)num_ports);
+ UHD_ASSERT_THROW(dpdk_port_id < (unsigned int) num_ports);
// Convert ipv4 addr from string to uint32_t, network format
- uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
+ uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
// Convert port from string to uint16_t, network format
uint16_t dst_port = htons(std::stoi(remote_port, NULL, 0));
uint16_t src_port = htons(std::stoi(local_port, NULL, 0));
// Create RX socket first
- struct uhd_dpdk_sockarg_udp sockarg = {.is_tx = false,
- .local_port = src_port,
- .remote_port = dst_port,
- .dst_addr = dst_ipv4};
+ struct uhd_dpdk_sockarg_udp sockarg = {
+ .is_tx = false,
+ .filter_bcast = true,
+ .local_port = src_port,
+ .remote_port = dst_port,
+ .dst_addr = dst_ipv4,
+ .num_bufs = _num_recv_frames
+ };
_rx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
UHD_ASSERT_THROW(_rx_sock != nullptr);
// Backfill the local port, in case it was auto-assigned
uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
- sockarg.is_tx = true;
+ sockarg.is_tx = true;
+ sockarg.num_bufs = _num_send_frames;
sockarg.remote_port = dst_port;
- sockarg.dst_addr = dst_ipv4;
+ sockarg.dst_addr = dst_ipv4;
_tx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
UHD_ASSERT_THROW(_tx_sock != nullptr);
@@ -277,13 +369,12 @@ public:
_mrb_pool.push(new dpdk_zero_copy_mrb(_rx_sock, _mrb_pool));
}
for (size_t i = 0; i < _num_send_frames; i++) {
- _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool));
+ _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool, _send_frame_size));
}
- UHD_LOG_TRACE("DPDK",
- "Created transports between " << addr << ":" << remote_port << " and NIC("
- << dpdk_port_id
- << "):" << ntohs(sockarg.local_port));
+ UHD_LOG_TRACE("DPDK", "Created transports between " << addr << ":"
+ << remote_port << " and NIC(" << dpdk_port_id
+ << "):" << ntohs(sockarg.local_port));
}
~dpdk_zero_copy_impl(void)
@@ -292,23 +383,18 @@ public:
size_t count;
uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
uhd_dpdk_get_drop_count(_rx_sock, &count);
- UHD_LOG_TRACE("DPDK",
- "Closing transports between " << sockarg.dst_addr << ":"
- << ntohs(sockarg.remote_port)
- << " and local:" << ntohs(sockarg.local_port));
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << " Dropped " << count << " packets");
+ UHD_LOG_TRACE("DPDK", "Closing transports between " << sockarg.dst_addr << ":"
+ << ntohs(sockarg.remote_port) << " and local:"
+ << ntohs(sockarg.local_port));
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << " Dropped "<< count << " packets");
uhd_dpdk_get_xfer_count(_rx_sock, &count);
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << " Received " << count << " packets");
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << "RX empty count is " << _rx_empty_count);
- UHD_LOG_TRACE("DPDK",
- "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
- << "TX empty count is " << _tx_empty_count);
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << " Received "<< count << " packets");
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << "RX empty count is " << _rx_empty_count);
+ UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") "
+ << "TX empty count is " << _tx_empty_count);
uhd_dpdk_sock_close(_rx_sock);
uhd_dpdk_sock_close(_tx_sock);
}
@@ -320,7 +406,7 @@ public:
return managed_recv_buffer::sptr();
}
- dpdk_zero_copy_mrb* mrb = _mrb_pool.top();
+ dpdk_zero_copy_mrb *mrb = _mrb_pool.top();
_mrb_pool.pop();
managed_recv_buffer::sptr buff = mrb->get_new(timeout);
if (!buff)
@@ -345,7 +431,7 @@ public:
return managed_send_buffer::sptr();
}
- dpdk_zero_copy_msb* msb = _msb_pool.top();
+ dpdk_zero_copy_msb *msb = _msb_pool.top();
_msb_pool.pop();
managed_send_buffer::sptr buff = msb->get_new(timeout);
if (!buff)
@@ -367,18 +453,18 @@ public:
{
struct uhd_dpdk_sockarg_udp sockarg;
int status = uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
+ UHD_ASSERT_THROW(status == 0);
return ntohs(sockarg.local_port);
}
std::string get_local_addr(void) const
{
- uint32_t ipv4_addr;
- int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr, NULL);
- auto retval = std::to_string(ipv4_addr >> 0 & 0xff) + "."
- + std::to_string(ipv4_addr >> 8 & 0xff) + "."
- + std::to_string(ipv4_addr >> 16 & 0xff) + "."
- + std::to_string(ipv4_addr >> 24 & 0xff);
- return retval;
+ struct in_addr ipv4_addr;
+ int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, NULL);
+ UHD_ASSERT_THROW(status == 0);
+ char addr_str[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
}
uint32_t get_drop_count(void) const
@@ -387,10 +473,9 @@ public:
uhd_dpdk_get_drop_count(_rx_sock, &drop_count);
return drop_count;
}
-
private:
- struct uhd_dpdk_socket* _rx_sock;
- struct uhd_dpdk_socket* _tx_sock;
+ struct uhd_dpdk_socket *_rx_sock;
+ struct uhd_dpdk_socket *_tx_sock;
const size_t _num_send_frames;
const size_t _send_frame_size;
const size_t _num_recv_frames;
@@ -399,20 +484,22 @@ private:
unsigned int _rx_empty_count;
unsigned int _tx_empty_count;
- std::stack<dpdk_zero_copy_mrb*, std::vector<dpdk_zero_copy_mrb*>> _mrb_pool;
- std::stack<dpdk_zero_copy_msb*, std::vector<dpdk_zero_copy_msb*>> _msb_pool;
+ std::stack<dpdk_zero_copy_mrb *, std::vector<dpdk_zero_copy_mrb *>> _mrb_pool;
+ std::stack<dpdk_zero_copy_msb *, std::vector<dpdk_zero_copy_msb *>> _msb_pool;
};
-dpdk_zero_copy::sptr dpdk_zero_copy::make(struct uhd_dpdk_ctx& ctx,
+dpdk_zero_copy::sptr dpdk_zero_copy::make(
+ const struct uhd_dpdk_ctx &ctx,
const unsigned int dpdk_port_id,
- const std::string& addr,
- const std::string& remote_port,
- const std::string& local_port,
- const zero_copy_xport_params& default_buff_args,
- const device_addr_t& hints)
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port,
+ const zero_copy_xport_params &default_buff_args,
+ const device_addr_t &/*hints*/)
{
- return dpdk_zero_copy::sptr(new dpdk_zero_copy_impl(
- ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args));
+ return dpdk_zero_copy::sptr(
+ new dpdk_zero_copy_impl(ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args)
+ );
}
}} // namespace uhd::transport
diff --git a/host/lib/transport/dpdk_zero_copy.hpp b/host/lib/transport/dpdk_zero_copy.hpp
index f90e73d0e..8dcce6eee 100644
--- a/host/lib/transport/dpdk_zero_copy.hpp
+++ b/host/lib/transport/dpdk_zero_copy.hpp
@@ -7,95 +7,31 @@
#ifndef DPDK_ZERO_COPY_HPP
#define DPDK_ZERO_COPY_HPP
-#include <uhd/config.hpp>
-#include <uhd/transport/zero_copy.hpp>
+#include <uhdlib/transport/dpdk_common.hpp>
#include <uhd/types/device_addr.hpp>
-#include <uhd/utils/log.hpp>
-#include <uhd/utils/static.hpp>
+#include <uhd/transport/zero_copy.hpp>
#include <boost/shared_ptr.hpp>
-#include <mutex>
#include <string>
-#include <vector>
namespace uhd { namespace transport {
-class uhd_dpdk_ctx : boost::noncopyable
-{
-public:
- UHD_SINGLETON_FCN(uhd_dpdk_ctx, get);
-
- ~uhd_dpdk_ctx(void);
-
- /*!
- * Initialize uhd-dpdk (and do only once)
- * \param eal_args Arguments to pass to DPDK rte_eal_init() function
- * \param num_ports Size of port_thread_mapping array (also number in use)
- * \param port_thread_mapping Map NICs to threads: index=port, value=thread
- * \param num_mbufs Number of packet buffers for each port's memory pool
- * \param mbuf_cache_size Size of per-core packet buffer cache from mempool
- * \param mtu MTU of NIC ports
- */
- void init(const dict<std::string, std::string>& eal_args,
- unsigned int num_ports,
- int* port_thread_mapping,
- int num_mbufs,
- int mbuf_cache_size,
- size_t mtu);
-
- /*!
- * Get port ID from provided MAC address
- * \param mac_addr MAC address
- * \param port_id Int to write ID of port corresponding to MAC address
- * \return 0 if match found, else no match
- */
- int get_port_id(std::array<uint8_t, 6> mac_addr, unsigned int& port_id);
-
- /*!
- * Get port ID for routing packet destined for given address
- * \param addr Destination address
- * \return port ID from routing table
- */
- int get_route(const std::string& addr) const;
-
- /*!
- * Set IPv4 address and subnet mask of given NIC port
- * Not thread-safe. Should only be written before ports are in use.
- * \param port_id NIC port ID
- * \param ipv4_addr IPv4 address to write
- * \param netmask Subnet mask identifying network number in ipv4_addr
- * \return 0 if successful, else error
- */
- int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask);
-
- /*!
- * \return whether init() has been called
- */
- bool is_init_done(void);
-
-private:
- uhd_dpdk_ctx(void);
-
- size_t _mtu;
- std::mutex _init_mutex;
- std::atomic<bool> _init_done;
-};
-
/*!
* A zero copy transport interface to the dpdk DMA library.
*/
-class dpdk_zero_copy : public virtual zero_copy_if
-{
+class dpdk_zero_copy : public virtual zero_copy_if {
public:
typedef boost::shared_ptr<dpdk_zero_copy> sptr;
- static sptr make(struct uhd_dpdk_ctx& ctx,
+ static sptr make(
+ const struct uhd_dpdk_ctx &ctx,
const unsigned int dpdk_port_id,
- const std::string& addr,
- const std::string& remote_port,
- const std::string& local_port, /* 0 = auto-assign */
- const zero_copy_xport_params& default_buff_args,
- const device_addr_t& hints);
+ const std::string &addr,
+ const std::string &remote_port,
+ const std::string &local_port, /* 0 = auto-assign */
+ const zero_copy_xport_params &default_buff_args,
+ const device_addr_t &hints
+ );
virtual uint16_t get_local_port(void) const = 0;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
index 1be6b2335..09897eb11 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c
@@ -34,6 +34,20 @@ int uhd_dpdk_port_count(void)
return ctx->num_ports;
}
+int uhd_dpdk_port_link_status(unsigned int portid)
+{
+ if (!ctx)
+ return -ENODEV;
+
+ struct uhd_dpdk_port *p = find_port(portid);
+ if (p) {
+ struct rte_eth_link link;
+ rte_eth_link_get_nowait(p->id, &link);
+ return link.link_status;
+ }
+ return -ENODEV;
+}
+
struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid)
{
struct eth_addr retval;
@@ -43,6 +57,7 @@ struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid)
if (p) {
memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN);
}
+
return retval;
}
@@ -211,19 +226,12 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lco
return 0;
}
-
-int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
- int *port_thread_mapping, int num_mbufs, int mbuf_cache_size,
- int mtu)
+int uhd_dpdk_init(int argc, const char **argv)
{
/* Init context only once */
if (ctx)
return 1;
- if ((num_ports == 0) || (port_thread_mapping == NULL)) {
- return -EINVAL;
- }
-
/* Grabs arguments intended for DPDK's EAL */
int ret = rte_eal_init(argc, (char **) argv);
if (ret < 0)
@@ -241,8 +249,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
ctx->num_ports = rte_eth_dev_count();
if (ctx->num_ports < 1)
rte_exit(EXIT_FAILURE, "Error: Found no ports\n");
- if (ctx->num_ports < num_ports)
- rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n");
/* Get memory for thread and port data structures */
ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0);
@@ -252,6 +258,28 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
if (!ctx->ports)
rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n");
+ for (size_t i = 0; i < ctx->num_ports; i++) {
+ struct uhd_dpdk_port *port = &ctx->ports[i];
+ port->id = i;
+ rte_eth_macaddr_get(port->id, &port->mac_addr);
+ }
+
+ return 0;
+}
+
+int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping,
+ int num_mbufs, int mbuf_cache_size, int mtu)
+{
+ if (!ctx)
+ return -EIO;
+
+ if ((num_ports == 0) || (port_thread_mapping == NULL)) {
+ return -EINVAL;
+ }
+
+ if (ctx->num_ports < num_ports)
+ rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n");
+
/* Initialize the thread data structures */
for (int i = rte_get_next_lcore(-1, 1, 0);
(i < RTE_MAX_LCORE);
@@ -306,7 +334,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports,
rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i);
struct uhd_dpdk_port *port = &ctx->ports[i];
- port->id = i;
port->parent = &ctx->threads[thread_id];
ctx->threads[thread_id].num_ports++;
LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry);
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
index d497e5d5a..6f43ae1cf 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h
@@ -24,7 +24,7 @@
#define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT
#define UHD_DPDK_TXQ_SIZE 64
#define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1)
-#define UHD_DPDK_RXQ_SIZE 64
+#define UHD_DPDK_RXQ_SIZE 128
#define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1)
struct uhd_dpdk_port;
@@ -256,13 +256,10 @@ static inline struct uhd_dpdk_port * find_port(unsigned int portid)
if (!ctx)
return NULL;
- for (unsigned int i = 0; i < ctx->num_threads; i++) {
- struct uhd_dpdk_thread *t = &ctx->threads[i];
- struct uhd_dpdk_port *p;
- LIST_FOREACH(p, &t->port_list, port_entry) {
- if (p->id == portid) {
- return p;
- }
+ for (unsigned int i = 0; i < ctx->num_ports; i++) {
+ struct uhd_dpdk_port *p = &ctx->ports[i];
+ if (p->id == portid) {
+ return p;
}
}
return NULL;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
index f603f1f8f..61ed836df 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c
@@ -43,7 +43,6 @@ int _uhd_dpdk_arp_reply(struct uhd_dpdk_port *port, struct arp_hdr *arp_req)
mbuf->pkt_len = 42;
mbuf->data_len = 42;
- mbuf->ol_flags = PKT_TX_IP_CKSUM;
if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) {
RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__);
@@ -125,7 +124,6 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip)
mbuf->pkt_len = 42;
mbuf->data_len = 42;
- mbuf->ol_flags = PKT_TX_IP_CKSUM;
if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) {
RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__);
@@ -135,7 +133,8 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip)
return 0;
}
-int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt)
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf,
+ struct udp_hdr *pkt, bool bcast)
{
int status = 0;
struct uhd_dpdk_ipv4_5tuple ht_key = {
@@ -155,6 +154,10 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str
}
struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv;
+ if (bcast && pdata->filter_bcast) {
+ // Filter broadcast packets if not listening
+ goto udp_rx_drop;
+ }
status = rte_ring_enqueue(entry->sock->rx_ring, mbuf);
if (entry->waiter) {
_uhd_dpdk_waiter_wake(entry->waiter, port->parent);
@@ -172,14 +175,16 @@ udp_rx_drop:
return status;
}
-int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt)
+int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf,
+ struct ipv4_hdr *pkt)
{
- if (pkt->dst_addr != port->ipv4_addr) {
+ bool bcast = is_broadcast(port, pkt->dst_addr);
+ if (pkt->dst_addr != port->ipv4_addr && !bcast) {
rte_pktmbuf_free(mbuf);
return -ENODEV;
}
if (pkt->next_proto_id == 0x11) {
- return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]);
+ return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1], bcast);
}
rte_pktmbuf_free(mbuf);
return -EINVAL;
@@ -417,7 +422,7 @@ static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port)
rte_pktmbuf_free(bufs[buf]);
break;
case ETHER_TYPE_IPv4:
- if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */
+ if (ol_flags & PKT_RX_IP_CKSUM_BAD) {
RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf);
} else {
_uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data);
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
index b0d3e42cd..f94a678ba 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h
@@ -20,7 +20,8 @@ static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_ad
int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame);
-int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt);
+int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf,
+ struct udp_hdr *pkt, bool bcast);
int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt);
int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port,
struct uhd_dpdk_socket *sock,
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
index 6ea77b930..4fc375b77 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c
@@ -19,7 +19,8 @@
* I/O thread ONLY
*/
-static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue)
+static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid,
+ struct uhd_dpdk_tx_queue **queue, size_t num_bufs)
{
*queue = NULL;
struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0);
@@ -31,25 +32,25 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk
LIST_INIT(&q->tx_list);
char name[32];
- snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid);
+ snprintf(name, sizeof(name), "tx_q%u.%0lx", port->id, (unsigned long) q);
q->queue = rte_ring_create(
name,
- UHD_DPDK_TXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
- snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid);
+ snprintf(name, sizeof(name), "free_q%u.%0lx", port->id, (unsigned long) q);
q->freebufs = rte_ring_create(
name,
- UHD_DPDK_TXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
/* Set up retry queue */
- snprintf(name, sizeof(name), "retry_queue_%u", port->id);
+ snprintf(name, sizeof(name), "redo_q%u.%0lx", port->id, (unsigned long) q);
q->retry_queue = rte_ring_create(
name,
- UHD_DPDK_TXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
@@ -65,24 +66,39 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk
rte_free(q);
return -ENOMEM;
}
- struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
- unsigned int num_bufs = rte_ring_free_count(q->freebufs);
- int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
- if (buf_stat) {
- rte_ring_free(q->freebufs);
- rte_ring_free(q->queue);
- rte_ring_free(q->retry_queue);
- rte_free(q);
- RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
- return -ENOENT;
- }
- unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
- if (enqd != num_bufs) {
- RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
- }
+
+ do {
+ struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE];
+ num_bufs = rte_ring_free_count(q->freebufs);
+ if (num_bufs > 0) {
+ num_bufs = num_bufs > UHD_DPDK_TXQ_SIZE ? UHD_DPDK_TXQ_SIZE : num_bufs;
+ int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs);
+ if (buf_stat) {
+ RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__);
+ goto unwind_txq;
+ }
+ unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL);
+ if (enqd != num_bufs) {
+ RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__);
+ goto unwind_txq;
+ }
+ }
+ } while (num_bufs > 0);
LIST_INSERT_HEAD(&port->txq_list, q, entry);
*queue = q;
return 0;
+
+unwind_txq:
+ while (!rte_ring_empty(q->freebufs)) {
+ struct rte_mbuf *buf;
+ if (rte_ring_dequeue(q->freebufs, (void **) &buf) == 0)
+ rte_free(buf);
+ }
+ rte_ring_free(q->freebufs);
+ rte_ring_free(q->queue);
+ rte_ring_free(q->retry_queue);
+ rte_free(q);
+ return -ENOENT;
}
/* Finish setting up UDP socket (unless ARP needs to be done)
@@ -134,11 +150,14 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
return -EADDRINUSE;
}
+ size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_RX_BURST_SIZE + 1)) ?
+ UHD_DPDK_RX_BURST_SIZE + 1 : pdata->xferd_pkts;
+ pdata->xferd_pkts = 0;
char name[32];
snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port));
sock->rx_ring = rte_ring_create(
name,
- UHD_DPDK_RXQ_SIZE,
+ num_bufs,
rte_socket_id(),
RING_F_SC_DEQ | RING_F_SP_ENQ
);
@@ -172,6 +191,9 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
/* Are we doing TX? */
if (sock->tx_queue) {
+ size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_TX_BURST_SIZE + 1)) ?
+ UHD_DPDK_TX_BURST_SIZE + 1 : pdata->xferd_pkts;
+ pdata->xferd_pkts = 0;
sock->tx_queue = NULL;
struct uhd_dpdk_tx_queue *q = NULL;
// FIXME Not sharing txq across all thread's sockets for now
@@ -184,7 +206,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req)
// }
//}
if (!sock->tx_queue) {
- retval = _alloc_txq(port, sock->tid, &q);
+ retval = _alloc_txq(port, sock->tid, &q, num_bufs);
if (retval) {
_uhd_dpdk_config_req_compl(req, retval);
return retval;
@@ -385,10 +407,13 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req,
data->src_port = arg->local_port;
data->dst_port = arg->remote_port;
sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock;
+ data->xferd_pkts = arg->num_bufs;
} else {
data->src_port = arg->remote_port;
data->dst_port = arg->local_port;
sock->rx_ring = (struct rte_ring *) sock;
+ data->xferd_pkts = arg->num_bufs;
+ data->filter_bcast = arg->filter_bcast;
}
/* TODO: Add support for I/O thread calling (skip locking and sleep) */
@@ -433,6 +458,7 @@ static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port,
ip_hdr->time_to_live = 64;
ip_hdr->next_proto_id = proto_id;
ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */
+ mbuf->ol_flags |= PKT_TX_IP_CKSUM;
ip_hdr->src_addr = port->ipv4_addr;
ip_hdr->dst_addr = dst_ipv4_addr;
diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
index 39fcb8597..d7ca5609b 100644
--- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
+++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h
@@ -15,6 +15,7 @@ struct uhd_dpdk_udp_priv {
uint32_t dst_ipv4_addr;
size_t dropped_pkts;
size_t xferd_pkts;
+ bool filter_bcast;
/* TODO: Cache destination address ptr to avoid ARP table lookup cost? */
//struct uhd_dpdk_arp_entry *arp_entry;
};