From ad2720d7188aece35e8aa4c65118a33b8b9ae690 Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Tue, 25 Sep 2018 14:49:45 -0700 Subject: mpmd,transport,prefs: Add xport_mgr for dpdk_zero_copy Add configuration sections to the UHD config file for NIC entries. Keys are based on MAC addresses, and the entries beneath the section describe which CPU and I/O thread to use for the NIC and its IPv4 address. Make ring sizes configurable for uhd-dpdk. Ring size is now an argument for packet buffers. Note that the maximum number of available buffers is still determined at init! Add ability to receive broadcasts to uhd-dpdk. This is controllable by a boolean in the sockarg during socket creation. dpdk_zero_copy will filter broadcast packets out. Add dpdk_simple transport (to mirror udp_simple). This transport allows receiving from broadcast addresses, but it only permits one outstanding buffer at a time. Fix IP checksum handling in UHD-DPDK. TX checksums were not being calculated in the NIC, and in RX, the check for IP checksums allowed values of zero (reported as none). Now packets with bad IP checksums will be dropped. --- host/lib/transport/CMakeLists.txt | 1 + host/lib/transport/dpdk_simple.cpp | 192 ++++++++++++ host/lib/transport/dpdk_zero_copy.cpp | 405 ++++++++++++++++---------- host/lib/transport/dpdk_zero_copy.hpp | 86 +----- host/lib/transport/uhd-dpdk/uhd_dpdk.c | 49 +++- host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h | 13 +- host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c | 19 +- host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h | 3 +- host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c | 74 +++-- host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h | 1 + 10 files changed, 558 insertions(+), 285 deletions(-) create mode 100644 host/lib/transport/dpdk_simple.cpp (limited to 'host/lib/transport') diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 63d44c80b..7794f0760 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -141,6 +141,7 @@ if(ENABLE_DPDK) INCLUDE_SUBDIRECTORY(uhd-dpdk) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_zero_copy.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp ) endif(ENABLE_DPDK) diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp new file mode 100644 index 000000000..74bb979ef --- /dev/null +++ b/host/lib/transport/dpdk_simple.cpp @@ -0,0 +1,192 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include +#include +#include +#include + +namespace uhd { namespace transport { + +namespace { + constexpr uint64_t USEC = 1000000; + // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC + constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4; +} + +class dpdk_simple_impl : public dpdk_simple { +public: + dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr, + const std::string &port, bool filter_bcast) + { + UHD_ASSERT_THROW(ctx.is_init_done()); + + // Get NIC that can route to addr + int port_id = ctx.get_route(addr); + UHD_ASSERT_THROW(port_id >= 0); + + _port_id = port_id; + uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); + uint16_t dst_port = htons(std::stoi(port, NULL, 0)); + + struct uhd_dpdk_sockarg_udp sockarg = { + .is_tx = false, + .filter_bcast = filter_bcast, + .local_port = 0, + .remote_port = dst_port, + .dst_addr = dst_ipv4, + .num_bufs = 1 + }; + _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); + UHD_ASSERT_THROW(_rx_sock != nullptr); + + // Backfill the local port, in case it was auto-assigned + uhd_dpdk_udp_get_info(_rx_sock, &sockarg); + sockarg.is_tx = true; + sockarg.remote_port = dst_port; + sockarg.dst_addr = dst_ipv4; + sockarg.num_bufs = 1; + _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); + UHD_ASSERT_THROW(_tx_sock != nullptr); + UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":" + << ntohs(dst_port) << " and NIC(" << _port_id + << "):" << ntohs(sockarg.local_port)); + } + + ~dpdk_simple_impl(void) + { + if (_rx_mbuf) + uhd_dpdk_free_buf(_rx_mbuf); + if (_tx_mbuf) + uhd_dpdk_free_buf(_tx_mbuf); + } + + /*! + * Request a single send buffer of specified size. + * + * \param buf a pointer to place to write buffer location + * \return the maximum length of the buffer + */ + size_t get_tx_buf(void** buf) + { + UHD_ASSERT_THROW(!_tx_mbuf); + int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, &_tx_mbuf, 1, 0); + if (bufs != 1 || !_tx_mbuf) { + *buf = nullptr; + return 0; + } + *buf = uhd_dpdk_buf_to_data(_tx_sock, _tx_mbuf); + return _mtu - DPDK_SIMPLE_NONDATA_SIZE; + } + + /*! + * Send and release outstanding buffer + * + * \param length bytes of data to send + * \return number of bytes sent (releases buffer if sent) + */ + size_t send(size_t length) + { + UHD_ASSERT_THROW(_tx_mbuf) + _tx_mbuf->pkt_len = length; + _tx_mbuf->data_len = length; + int num_tx = uhd_dpdk_send(_tx_sock, &_tx_mbuf, 1); + if (num_tx == 0) + return 0; + _tx_mbuf = nullptr; + return length; + } + + /*! + * Receive a single packet. + * Buffer provided by transport (must be freed before next operation). + * + * \param buf a pointer to place to write buffer location + * \param timeout the timeout in seconds + * \return the number of bytes received or zero on timeout + */ + size_t recv(void **buf, double timeout) + { + UHD_ASSERT_THROW(!_rx_mbuf); + int bufs = uhd_dpdk_recv(_rx_sock, &_rx_mbuf, 1, (int) (timeout*USEC)); + if (bufs != 1 || _rx_mbuf == nullptr) { + *buf = nullptr; + return 0; + } + if ((_tx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { + uhd_dpdk_free_buf(_rx_mbuf); + _rx_mbuf = nullptr; + return 0; + } + uhd_dpdk_get_src_ipv4(_rx_sock, _rx_mbuf, &_last_recv_addr); + *buf = uhd_dpdk_buf_to_data(_rx_sock, _rx_mbuf); + return uhd_dpdk_get_len(_rx_sock, _rx_mbuf); + } + + /*! + * Return/free receive buffer + * Can also use to free un-sent TX bufs + */ + void put_rx_buf(void) + { + UHD_ASSERT_THROW(_rx_mbuf) + uhd_dpdk_free_buf(_rx_mbuf); + } + + /*! + * Get the last IP address as seen by recv(). + * Only use this with the broadcast socket. + */ + std::string get_recv_addr(void) + { + char addr_str[INET_ADDRSTRLEN]; + struct in_addr ipv4_addr; + ipv4_addr.s_addr = _last_recv_addr; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); + } + + /*! + * Get the IP address for the destination + */ + std::string get_send_addr(void) + { + struct in_addr ipv4_addr; + int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr); + UHD_ASSERT_THROW(status); + char addr_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); + } +private: + unsigned int _port_id; + size_t _mtu; + struct uhd_dpdk_socket *_tx_sock; + struct rte_mbuf *_tx_mbuf = nullptr; + struct uhd_dpdk_socket *_rx_sock; + struct rte_mbuf *_rx_mbuf = nullptr; + uint32_t _last_recv_addr; +}; + +dpdk_simple::~dpdk_simple(void) {} + +/*********************************************************************** + * DPDK simple transport public make functions + **********************************************************************/ +dpdk_simple::sptr dpdk_simple::make_connected( + struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port +){ + return sptr(new dpdk_simple_impl(ctx, addr, port, true)); +} + +dpdk_simple::sptr dpdk_simple::make_broadcast( + struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port +){ + return sptr(new dpdk_simple_impl(ctx, addr, port, false)); +} +}} // namespace uhd::transport + + diff --git a/host/lib/transport/dpdk_zero_copy.cpp b/host/lib/transport/dpdk_zero_copy.cpp index 51c9b9dba..9649c1cfb 100644 --- a/host/lib/transport/dpdk_zero_copy.cpp +++ b/host/lib/transport/dpdk_zero_copy.cpp @@ -7,24 +7,28 @@ #include "dpdk_zero_copy.hpp" #include #include -#include #include -#include -#include +#include +#include +#include #include #include +#include +#include namespace uhd { namespace transport { namespace { -static constexpr uint64_t USEC = 1000000; -// FIXME: Make configurable and have uhd-dpdk library track buffer sizes -static constexpr size_t DEFAULT_FRAME_SIZE = 8000; - -inline char* eal_add_opt( - std::vector& argv, size_t n, char* dst, const char* opt, const char* arg) +constexpr uint64_t USEC = 1000000; +constexpr size_t DEFAULT_FRAME_SIZE = 8000; +constexpr int DEFAULT_NUM_MBUFS = 4095; +constexpr int DEFAULT_MBUF_CACHE_SIZE = 315; +constexpr size_t UHD_DPDK_HEADERS_SIZE = 14 + 20 + 8; // Ethernet + IPv4 + UDP + +inline char * eal_add_opt(std::vector &argv, size_t n, + char *dst, const char *opt, const char *arg) { - char* ptr = dst; + char *ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); ptr += strlen(opt) + 1; @@ -34,6 +38,78 @@ inline char* eal_add_opt( ptr += strlen(arg) + 1; return ptr; } + +inline void uhd_dpdk_eal_init(const device_addr_t &eal_args) +{ + /* Build up argc and argv */ + std::vector argv; + argv.push_back("uhd-dpdk"); + auto args = new std::array(); + char *opt = args->data(); + char *end = args->data() + args->size(); + for (std::string &key : eal_args.keys()) { + std::string val = eal_args[key]; + if (key == "dpdk-coremask") { + opt = eal_add_opt(argv, end - opt, opt, "-c", + val.c_str()); + } else if (key == "dpdk-corelist") { + /* NOTE: This arg may have commas, so limited to config file */ + opt = eal_add_opt(argv, end - opt, opt, "-l", + val.c_str()); + } else if (key == "dpdk-coremap") { + opt = eal_add_opt(argv, end - opt, opt, "--lcores", + val.c_str()); + } else if (key == "dpdk-master-lcore") { + opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", + val.c_str()); + } else if (key == "dpdk-pci-blacklist") { + opt = eal_add_opt(argv, end - opt, opt, "-b", + val.c_str()); + } else if (key == "dpdk-pci-whitelist") { + opt = eal_add_opt(argv, end - opt, opt, "-w", + val.c_str()); + } else if (key == "dpdk-log-level") { + opt = eal_add_opt(argv, end - opt, opt, "--log-level", + val.c_str()); + } else if (key == "dpdk-huge-dir") { + opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", + val.c_str()); + } else if (key == "dpdk-file-prefix") { + opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", + val.c_str()); + } else if (key == "dpdk-driver") { + opt = eal_add_opt(argv, end - opt, opt, "-d", + val.c_str()); + } + /* TODO: Change where log goes? + int rte_openlog_stream( FILE * f) + */ + } + /* Init DPDK's EAL */ + uhd_dpdk_init(argv.size(), argv.data()); + delete args; +} + +inline std::string eth_addr_to_string(struct eth_addr mac_addr) +{ + auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); + mac_stream % (uint32_t) mac_addr.addr[0] % (uint32_t) mac_addr.addr[1] + % (uint32_t) mac_addr.addr[2] % (uint32_t) mac_addr.addr[3] + % (uint32_t) mac_addr.addr[4] % (uint32_t) mac_addr.addr[5]; + return mac_stream.str(); +} + +inline void separate_ipv4_addr(const std::string ipv4, + uint32_t &ipv4_addr, uint32_t &netmask) +{ + std::vector result; + boost::algorithm::split(result, ipv4, + [](const char &in) {return in == '/';}, boost::token_compress_on); + UHD_ASSERT_THROW(result.size() == 2); + ipv4_addr = (uint32_t) inet_addr(result[0].c_str()); + int netbits = std::atoi(result[1].c_str()); + netmask = htonl(0xffffffff << (32-netbits)); +} } // namespace uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {} @@ -41,69 +117,82 @@ uhd_dpdk_ctx::uhd_dpdk_ctx(void) : _init_done(false) {} uhd_dpdk_ctx::~uhd_dpdk_ctx(void) {} /* Initialize uhd-dpdk (and do only once) */ -void uhd_dpdk_ctx::init(const dict& eal_args, - unsigned int num_ports, - int* port_thread_mapping, - int num_mbufs, - int mbuf_cache_size, - size_t mtu) +void uhd_dpdk_ctx::init(const device_addr_t &user_args) { std::lock_guard lock(_init_mutex); if (!_init_done) { - _mtu = mtu; - /* Build up argc and argv */ - std::vector argv; - argv.push_back("uhd-dpdk"); - char* args = new char[4096]; - char* opt = args; - char* end = args + sizeof(args); - for (std::string& key : eal_args.keys()) { - std::string val = eal_args[key]; - if (key == "coremask") { - opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str()); - } else if (key == "corelist") { - /* NOTE: This arg may have commas, so limited to config file */ - opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str()); - } else if (key == "coremap") { - opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str()); - } else if (key == "master-lcore") { - opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str()); - } else if (key == "pci-blacklist") { - opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str()); - } else if (key == "pci-whitelist") { - opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str()); - } else if (key == "log-level") { - opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str()); - } else if (key == "huge-dir") { - opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str()); - } else if (key == "file-prefix") { - opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str()); + /* Gather global config, build args for EAL, and init UHD-DPDK */ + const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args); + UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl + << dpdk_args.to_pp_string()); + uhd_dpdk_eal_init(dpdk_args); + + _mtu = dpdk_args.has_key("dpdk-mtu") + ? dpdk_args.cast("dpdk-mtu", 0) + : DEFAULT_FRAME_SIZE; + const int num_mbufs = dpdk_args.has_key("dpdk-num-mbufs") + ? dpdk_args.cast("dpdk-num-mbufs", 0) + : DEFAULT_NUM_MBUFS; + const int mbuf_cache_size = dpdk_args.has_key("dpdk-mbuf-cache-size") + ? dpdk_args.cast("dpdk-mbuf-cache-size", 0) + : DEFAULT_MBUF_CACHE_SIZE; + + /* Get configuration for all the NIC ports */ + device_addrs_t args = separate_device_addr(user_args); + int num_ports = uhd_dpdk_port_count(); + std::vector io_cpu_map(num_ports); + device_addrs_t nics(num_ports); + for (ssize_t i = 0; i < num_ports; i++) { + struct eth_addr mac_addr = uhd_dpdk_get_eth_addr(i); + nics[i]["dpdk-mac"] = eth_addr_to_string(mac_addr); + for (const auto &arg: args) { + if (arg.has_key("dpdk-mac") + && arg["dpdk-mac"] == nics[i]["dpdk-mac"]) { + for (const auto& key: arg.keys()) { + nics[i][key] = arg[key]; + } + break; + } } + nics[i] = uhd::prefs::get_dpdk_nic_args(nics[i]); + if (nics[i].has_key("dpdk-ipv4") + && nics[i].has_key("dpdk-io-cpu")) { + uint32_t ipv4_addr, netmask; + io_cpu_map[i] = std::atoi(nics[i]["dpdk-io-cpu"].c_str()); + separate_ipv4_addr(nics[i]["dpdk-ipv4"], ipv4_addr, netmask); + uhd_dpdk_set_ipv4_addr((unsigned int) i, ipv4_addr, netmask); + } else { + /* Not enough configuration to use NIC */ + io_cpu_map[i] = -1; + } + UHD_LOG_TRACE("DPDK", "Found NIC(" << i << "):" << std::endl + << nics[i].to_pp_string()); } - uhd_dpdk_init(argv.size(), - argv.data(), - num_ports, - port_thread_mapping, - num_mbufs, - mbuf_cache_size, - _mtu); - delete args; + uhd_dpdk_start(num_ports, io_cpu_map.data(), num_mbufs, + mbuf_cache_size, _mtu); _init_done = true; } } -int uhd_dpdk_ctx::get_port_id(std::array mac_addr, unsigned int& port_id) +size_t uhd_dpdk_ctx::get_mtu(void) const +{ + UHD_ASSERT_THROW(is_init_done()); + return _mtu; +} + +int uhd_dpdk_ctx::get_port_id(std::array mac_addr, + unsigned int &port_id) const { UHD_ASSERT_THROW(is_init_done()); int num_ports = uhd_dpdk_port_count(); for (int i = 0; i < num_ports; i++) { - struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int)i); + struct eth_addr port_mac_addr = uhd_dpdk_get_eth_addr((unsigned int) i); for (int j = 0; j < 6; j++) { if (mac_addr[j] != port_mac_addr.addr[j]) { break; } if (j == 5) { - port_id = (unsigned int)i; + port_id = (unsigned int) i; return 0; } } @@ -111,52 +200,54 @@ int uhd_dpdk_ctx::get_port_id(std::array mac_addr, unsigned int& por return -1; } -int uhd_dpdk_ctx::get_route(const std::string& addr) const +int uhd_dpdk_ctx::get_route(const std::string &addr) const { - const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); + const uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); const unsigned int num_ports = uhd_dpdk_port_count(); for (unsigned int port = 0; port < num_ports; port++) { uint32_t src_ipv4; uint32_t netmask; + if (uhd_dpdk_port_link_status(port) < 1) + continue; uhd_dpdk_get_ipv4_addr(port, &src_ipv4, &netmask); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { - return (int)port; - } + return (int) port; + } } return -ENODEV; } -int uhd_dpdk_ctx::set_ipv4_addr( - unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask) +int uhd_dpdk_ctx::set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, + uint32_t netmask) { return uhd_dpdk_set_ipv4_addr(port_id, ipv4_addr, netmask); } -bool uhd_dpdk_ctx::is_init_done(void) +bool uhd_dpdk_ctx::is_init_done(void) const { return _init_done.load(); } -class dpdk_zero_copy_msb : public managed_send_buffer -{ +class dpdk_zero_copy_msb : public managed_send_buffer { public: - dpdk_zero_copy_msb(struct uhd_dpdk_socket* sock, - std::stack>& free_bufs) - : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){}; + dpdk_zero_copy_msb(struct uhd_dpdk_socket *sock, + std::stack> &free_bufs, + size_t frame_size) + : _sock(sock), _buf(nullptr), _free_bufs(free_bufs), + _frame_size(frame_size) {}; ~dpdk_zero_copy_msb(void) {} void release(void) { if (_buf) { - _buf->pkt_len = _length; + _buf->pkt_len = _length; _buf->data_len = _length; - int num_tx = uhd_dpdk_send(_sock, &_buf, 1); + int num_tx = uhd_dpdk_send(_sock, &_buf, 1); if (num_tx == 0) { /* Drop packet and free buffer (do not share sockets!) */ - UHD_LOG_ERROR( - "DPDK", "Invalid shared socket usage detected. Dropping packet..."); + UHD_LOG_ERROR("DPDK", "Invalid shared socket usage detected. Dropping packet..."); uhd_dpdk_free_buf(_buf); } // Push back into pool @@ -170,21 +261,22 @@ public: if (bufs != 1 || !_buf) return sptr(); - return make(this, uhd_dpdk_buf_to_data(_sock, _buf), DEFAULT_FRAME_SIZE); + return make(this, uhd_dpdk_buf_to_data(_sock, _buf), + _frame_size); } private: - struct uhd_dpdk_socket* _sock; - struct rte_mbuf* _buf; - std::stack>& _free_bufs; + struct uhd_dpdk_socket *_sock; + struct rte_mbuf *_buf; + std::stack> &_free_bufs; + size_t _frame_size; }; -class dpdk_zero_copy_mrb : public managed_recv_buffer -{ +class dpdk_zero_copy_mrb : public managed_recv_buffer { public: - dpdk_zero_copy_mrb(struct uhd_dpdk_socket* sock, - std::stack>& free_bufs) - : _sock(sock), _buf(nullptr), _free_bufs(free_bufs){}; + dpdk_zero_copy_mrb(struct uhd_dpdk_socket *sock, + std::stack> &free_bufs) + : _sock(sock), _buf(nullptr), _free_bufs(free_bufs) {}; ~dpdk_zero_copy_mrb(void) {} void release(void) @@ -197,78 +289,78 @@ public: sptr get_new(double timeout) { - int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int)(timeout * USEC)); + int bufs = uhd_dpdk_recv(_sock, &_buf, 1, (int) (timeout*USEC)); if (bufs != 1 || _buf == nullptr) { // Push back into pool if we didn't get a real buffer _free_bufs.push(this); return sptr(); } - if ((_buf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { - UHD_LOG_WARNING("DPDK", "IP checksum failure detected. Dropping packet..."); - uhd_dpdk_free_buf(_buf); - _free_bufs.push(this); - return sptr(); - } - return make( - this, uhd_dpdk_buf_to_data(_sock, _buf), uhd_dpdk_get_len(_sock, _buf)); + return make(this, uhd_dpdk_buf_to_data(_sock, _buf), + uhd_dpdk_get_len(_sock, _buf)); } private: - struct uhd_dpdk_socket* _sock; - struct rte_mbuf* _buf; - std::stack>& _free_bufs; + struct uhd_dpdk_socket *_sock; + struct rte_mbuf *_buf; + std::stack> &_free_bufs; }; -class dpdk_zero_copy_impl : public dpdk_zero_copy -{ +class dpdk_zero_copy_impl : public dpdk_zero_copy { public: - dpdk_zero_copy_impl(struct uhd_dpdk_ctx& ctx, - const unsigned int dpdk_port_id, - const std::string& addr, - const std::string& remote_port, - const std::string& local_port, - const zero_copy_xport_params& xport_params) - : _num_send_frames(xport_params.num_send_frames) - , _send_frame_size(xport_params.send_frame_size) - , _num_recv_frames(xport_params.num_recv_frames) - , _recv_frame_size(xport_params.recv_frame_size) - , _port_id(dpdk_port_id) - , _rx_empty_count(0) - , _tx_empty_count(0) + + dpdk_zero_copy_impl(const struct uhd_dpdk_ctx &ctx, + const unsigned int dpdk_port_id, + const std::string &addr, + const std::string &remote_port, + const std::string &local_port, + const zero_copy_xport_params& xport_params) + : _num_send_frames(xport_params.num_send_frames), + _send_frame_size(xport_params.send_frame_size), + _num_recv_frames(xport_params.num_recv_frames), + _recv_frame_size(xport_params.recv_frame_size), + _port_id(dpdk_port_id), + _rx_empty_count(0), + _tx_empty_count(0) { - // TODO: Handle xport_params UHD_ASSERT_THROW(xport_params.recv_frame_size > 0); UHD_ASSERT_THROW(xport_params.send_frame_size > 0); UHD_ASSERT_THROW(xport_params.num_send_frames > 0); UHD_ASSERT_THROW(xport_params.num_recv_frames > 0); UHD_ASSERT_THROW(ctx.is_init_done()); + UHD_ASSERT_THROW(xport_params.recv_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE); + UHD_ASSERT_THROW(xport_params.send_frame_size < ctx.get_mtu() - UHD_DPDK_HEADERS_SIZE); const int num_ports = uhd_dpdk_port_count(); UHD_ASSERT_THROW(num_ports > 0); - UHD_ASSERT_THROW(dpdk_port_id < (unsigned int)num_ports); + UHD_ASSERT_THROW(dpdk_port_id < (unsigned int) num_ports); // Convert ipv4 addr from string to uint32_t, network format - uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); + uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); // Convert port from string to uint16_t, network format uint16_t dst_port = htons(std::stoi(remote_port, NULL, 0)); uint16_t src_port = htons(std::stoi(local_port, NULL, 0)); // Create RX socket first - struct uhd_dpdk_sockarg_udp sockarg = {.is_tx = false, - .local_port = src_port, - .remote_port = dst_port, - .dst_addr = dst_ipv4}; + struct uhd_dpdk_sockarg_udp sockarg = { + .is_tx = false, + .filter_bcast = true, + .local_port = src_port, + .remote_port = dst_port, + .dst_addr = dst_ipv4, + .num_bufs = _num_recv_frames + }; _rx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg); UHD_ASSERT_THROW(_rx_sock != nullptr); // Backfill the local port, in case it was auto-assigned uhd_dpdk_udp_get_info(_rx_sock, &sockarg); - sockarg.is_tx = true; + sockarg.is_tx = true; + sockarg.num_bufs = _num_send_frames; sockarg.remote_port = dst_port; - sockarg.dst_addr = dst_ipv4; + sockarg.dst_addr = dst_ipv4; _tx_sock = uhd_dpdk_sock_open(dpdk_port_id, UHD_DPDK_SOCK_UDP, &sockarg); UHD_ASSERT_THROW(_tx_sock != nullptr); @@ -277,13 +369,12 @@ public: _mrb_pool.push(new dpdk_zero_copy_mrb(_rx_sock, _mrb_pool)); } for (size_t i = 0; i < _num_send_frames; i++) { - _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool)); + _msb_pool.push(new dpdk_zero_copy_msb(_tx_sock, _msb_pool, _send_frame_size)); } - UHD_LOG_TRACE("DPDK", - "Created transports between " << addr << ":" << remote_port << " and NIC(" - << dpdk_port_id - << "):" << ntohs(sockarg.local_port)); + UHD_LOG_TRACE("DPDK", "Created transports between " << addr << ":" + << remote_port << " and NIC(" << dpdk_port_id + << "):" << ntohs(sockarg.local_port)); } ~dpdk_zero_copy_impl(void) @@ -292,23 +383,18 @@ public: size_t count; uhd_dpdk_udp_get_info(_rx_sock, &sockarg); uhd_dpdk_get_drop_count(_rx_sock, &count); - UHD_LOG_TRACE("DPDK", - "Closing transports between " << sockarg.dst_addr << ":" - << ntohs(sockarg.remote_port) - << " and local:" << ntohs(sockarg.local_port)); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << " Dropped " << count << " packets"); + UHD_LOG_TRACE("DPDK", "Closing transports between " << sockarg.dst_addr << ":" + << ntohs(sockarg.remote_port) << " and local:" + << ntohs(sockarg.local_port)); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << " Dropped "<< count << " packets"); uhd_dpdk_get_xfer_count(_rx_sock, &count); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << " Received " << count << " packets"); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << "RX empty count is " << _rx_empty_count); - UHD_LOG_TRACE("DPDK", - "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " - << "TX empty count is " << _tx_empty_count); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << " Received "<< count << " packets"); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << "RX empty count is " << _rx_empty_count); + UHD_LOG_TRACE("DPDK", "(" << ntohs(sockarg.remote_port) << "," << ntohs(sockarg.local_port) << ") " + << "TX empty count is " << _tx_empty_count); uhd_dpdk_sock_close(_rx_sock); uhd_dpdk_sock_close(_tx_sock); } @@ -320,7 +406,7 @@ public: return managed_recv_buffer::sptr(); } - dpdk_zero_copy_mrb* mrb = _mrb_pool.top(); + dpdk_zero_copy_mrb *mrb = _mrb_pool.top(); _mrb_pool.pop(); managed_recv_buffer::sptr buff = mrb->get_new(timeout); if (!buff) @@ -345,7 +431,7 @@ public: return managed_send_buffer::sptr(); } - dpdk_zero_copy_msb* msb = _msb_pool.top(); + dpdk_zero_copy_msb *msb = _msb_pool.top(); _msb_pool.pop(); managed_send_buffer::sptr buff = msb->get_new(timeout); if (!buff) @@ -367,18 +453,18 @@ public: { struct uhd_dpdk_sockarg_udp sockarg; int status = uhd_dpdk_udp_get_info(_rx_sock, &sockarg); + UHD_ASSERT_THROW(status == 0); return ntohs(sockarg.local_port); } std::string get_local_addr(void) const { - uint32_t ipv4_addr; - int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr, NULL); - auto retval = std::to_string(ipv4_addr >> 0 & 0xff) + "." - + std::to_string(ipv4_addr >> 8 & 0xff) + "." - + std::to_string(ipv4_addr >> 16 & 0xff) + "." - + std::to_string(ipv4_addr >> 24 & 0xff); - return retval; + struct in_addr ipv4_addr; + int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, NULL); + UHD_ASSERT_THROW(status == 0); + char addr_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + return std::string(addr_str); } uint32_t get_drop_count(void) const @@ -387,10 +473,9 @@ public: uhd_dpdk_get_drop_count(_rx_sock, &drop_count); return drop_count; } - private: - struct uhd_dpdk_socket* _rx_sock; - struct uhd_dpdk_socket* _tx_sock; + struct uhd_dpdk_socket *_rx_sock; + struct uhd_dpdk_socket *_tx_sock; const size_t _num_send_frames; const size_t _send_frame_size; const size_t _num_recv_frames; @@ -399,20 +484,22 @@ private: unsigned int _rx_empty_count; unsigned int _tx_empty_count; - std::stack> _mrb_pool; - std::stack> _msb_pool; + std::stack> _mrb_pool; + std::stack> _msb_pool; }; -dpdk_zero_copy::sptr dpdk_zero_copy::make(struct uhd_dpdk_ctx& ctx, +dpdk_zero_copy::sptr dpdk_zero_copy::make( + const struct uhd_dpdk_ctx &ctx, const unsigned int dpdk_port_id, - const std::string& addr, - const std::string& remote_port, - const std::string& local_port, - const zero_copy_xport_params& default_buff_args, - const device_addr_t& hints) + const std::string &addr, + const std::string &remote_port, + const std::string &local_port, + const zero_copy_xport_params &default_buff_args, + const device_addr_t &/*hints*/) { - return dpdk_zero_copy::sptr(new dpdk_zero_copy_impl( - ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args)); + return dpdk_zero_copy::sptr( + new dpdk_zero_copy_impl(ctx, dpdk_port_id, addr, remote_port, local_port, default_buff_args) + ); } }} // namespace uhd::transport diff --git a/host/lib/transport/dpdk_zero_copy.hpp b/host/lib/transport/dpdk_zero_copy.hpp index f90e73d0e..8dcce6eee 100644 --- a/host/lib/transport/dpdk_zero_copy.hpp +++ b/host/lib/transport/dpdk_zero_copy.hpp @@ -7,95 +7,31 @@ #ifndef DPDK_ZERO_COPY_HPP #define DPDK_ZERO_COPY_HPP -#include -#include +#include #include -#include -#include +#include #include -#include #include -#include namespace uhd { namespace transport { -class uhd_dpdk_ctx : boost::noncopyable -{ -public: - UHD_SINGLETON_FCN(uhd_dpdk_ctx, get); - - ~uhd_dpdk_ctx(void); - - /*! - * Initialize uhd-dpdk (and do only once) - * \param eal_args Arguments to pass to DPDK rte_eal_init() function - * \param num_ports Size of port_thread_mapping array (also number in use) - * \param port_thread_mapping Map NICs to threads: index=port, value=thread - * \param num_mbufs Number of packet buffers for each port's memory pool - * \param mbuf_cache_size Size of per-core packet buffer cache from mempool - * \param mtu MTU of NIC ports - */ - void init(const dict& eal_args, - unsigned int num_ports, - int* port_thread_mapping, - int num_mbufs, - int mbuf_cache_size, - size_t mtu); - - /*! - * Get port ID from provided MAC address - * \param mac_addr MAC address - * \param port_id Int to write ID of port corresponding to MAC address - * \return 0 if match found, else no match - */ - int get_port_id(std::array mac_addr, unsigned int& port_id); - - /*! - * Get port ID for routing packet destined for given address - * \param addr Destination address - * \return port ID from routing table - */ - int get_route(const std::string& addr) const; - - /*! - * Set IPv4 address and subnet mask of given NIC port - * Not thread-safe. Should only be written before ports are in use. - * \param port_id NIC port ID - * \param ipv4_addr IPv4 address to write - * \param netmask Subnet mask identifying network number in ipv4_addr - * \return 0 if successful, else error - */ - int set_ipv4_addr(unsigned int port_id, uint32_t ipv4_addr, uint32_t netmask); - - /*! - * \return whether init() has been called - */ - bool is_init_done(void); - -private: - uhd_dpdk_ctx(void); - - size_t _mtu; - std::mutex _init_mutex; - std::atomic _init_done; -}; - /*! * A zero copy transport interface to the dpdk DMA library. */ -class dpdk_zero_copy : public virtual zero_copy_if -{ +class dpdk_zero_copy : public virtual zero_copy_if { public: typedef boost::shared_ptr sptr; - static sptr make(struct uhd_dpdk_ctx& ctx, + static sptr make( + const struct uhd_dpdk_ctx &ctx, const unsigned int dpdk_port_id, - const std::string& addr, - const std::string& remote_port, - const std::string& local_port, /* 0 = auto-assign */ - const zero_copy_xport_params& default_buff_args, - const device_addr_t& hints); + const std::string &addr, + const std::string &remote_port, + const std::string &local_port, /* 0 = auto-assign */ + const zero_copy_xport_params &default_buff_args, + const device_addr_t &hints + ); virtual uint16_t get_local_port(void) const = 0; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk.c b/host/lib/transport/uhd-dpdk/uhd_dpdk.c index 1be6b2335..09897eb11 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk.c @@ -34,6 +34,20 @@ int uhd_dpdk_port_count(void) return ctx->num_ports; } +int uhd_dpdk_port_link_status(unsigned int portid) +{ + if (!ctx) + return -ENODEV; + + struct uhd_dpdk_port *p = find_port(portid); + if (p) { + struct rte_eth_link link; + rte_eth_link_get_nowait(p->id, &link); + return link.link_status; + } + return -ENODEV; +} + struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) { struct eth_addr retval; @@ -43,6 +57,7 @@ struct eth_addr uhd_dpdk_get_eth_addr(unsigned int portid) if (p) { memcpy(retval.addr, p->mac_addr.addr_bytes, ETHER_ADDR_LEN); } + return retval; } @@ -211,19 +226,12 @@ static int uhd_dpdk_thread_init(struct uhd_dpdk_thread *thread, unsigned int lco return 0; } - -int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, - int *port_thread_mapping, int num_mbufs, int mbuf_cache_size, - int mtu) +int uhd_dpdk_init(int argc, const char **argv) { /* Init context only once */ if (ctx) return 1; - if ((num_ports == 0) || (port_thread_mapping == NULL)) { - return -EINVAL; - } - /* Grabs arguments intended for DPDK's EAL */ int ret = rte_eal_init(argc, (char **) argv); if (ret < 0) @@ -241,8 +249,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, ctx->num_ports = rte_eth_dev_count(); if (ctx->num_ports < 1) rte_exit(EXIT_FAILURE, "Error: Found no ports\n"); - if (ctx->num_ports < num_ports) - rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); /* Get memory for thread and port data structures */ ctx->threads = rte_zmalloc("uhd_dpdk_thread", RTE_MAX_LCORE*sizeof(struct uhd_dpdk_thread), 0); @@ -252,6 +258,28 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, if (!ctx->ports) rte_exit(EXIT_FAILURE, "Error: Could not allocate memory for port data\n"); + for (size_t i = 0; i < ctx->num_ports; i++) { + struct uhd_dpdk_port *port = &ctx->ports[i]; + port->id = i; + rte_eth_macaddr_get(port->id, &port->mac_addr); + } + + return 0; +} + +int uhd_dpdk_start(unsigned int num_ports, int *port_thread_mapping, + int num_mbufs, int mbuf_cache_size, int mtu) +{ + if (!ctx) + return -EIO; + + if ((num_ports == 0) || (port_thread_mapping == NULL)) { + return -EINVAL; + } + + if (ctx->num_ports < num_ports) + rte_exit(EXIT_FAILURE, "Error: User requested more ports than available\n"); + /* Initialize the thread data structures */ for (int i = rte_get_next_lcore(-1, 1, 0); (i < RTE_MAX_LCORE); @@ -306,7 +334,6 @@ int uhd_dpdk_init(int argc, const char **argv, unsigned int num_ports, rte_exit(EXIT_FAILURE, "Requested inactive lcore %u for port %u\n", (unsigned int) thread_id, i); struct uhd_dpdk_port *port = &ctx->ports[i]; - port->id = i; port->parent = &ctx->threads[thread_id]; ctx->threads[thread_id].num_ports++; LIST_INSERT_HEAD(&ctx->threads[thread_id].port_list, port, port_entry); diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h index d497e5d5a..6f43ae1cf 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_ctx.h @@ -24,7 +24,7 @@ #define UHD_DPDK_MAX_WAITERS UHD_DPDK_MAX_SOCKET_CNT #define UHD_DPDK_TXQ_SIZE 64 #define UHD_DPDK_TX_BURST_SIZE (UHD_DPDK_TXQ_SIZE - 1) -#define UHD_DPDK_RXQ_SIZE 64 +#define UHD_DPDK_RXQ_SIZE 128 #define UHD_DPDK_RX_BURST_SIZE (UHD_DPDK_RXQ_SIZE - 1) struct uhd_dpdk_port; @@ -256,13 +256,10 @@ static inline struct uhd_dpdk_port * find_port(unsigned int portid) if (!ctx) return NULL; - for (unsigned int i = 0; i < ctx->num_threads; i++) { - struct uhd_dpdk_thread *t = &ctx->threads[i]; - struct uhd_dpdk_port *p; - LIST_FOREACH(p, &t->port_list, port_entry) { - if (p->id == portid) { - return p; - } + for (unsigned int i = 0; i < ctx->num_ports; i++) { + struct uhd_dpdk_port *p = &ctx->ports[i]; + if (p->id == portid) { + return p; } } return NULL; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c index f603f1f8f..61ed836df 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.c @@ -43,7 +43,6 @@ int _uhd_dpdk_arp_reply(struct uhd_dpdk_port *port, struct arp_hdr *arp_req) mbuf->pkt_len = 42; mbuf->data_len = 42; - mbuf->ol_flags = PKT_TX_IP_CKSUM; if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); @@ -125,7 +124,6 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) mbuf->pkt_len = 42; mbuf->data_len = 42; - mbuf->ol_flags = PKT_TX_IP_CKSUM; if (rte_eth_tx_burst(port->id, 0, &mbuf, 1) != 1) { RTE_LOG(WARNING, RING, "%s: TX descriptor ring is full\n", __func__); @@ -135,7 +133,8 @@ int _uhd_dpdk_arp_request(struct uhd_dpdk_port *port, uint32_t ip) return 0; } -int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt) +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct udp_hdr *pkt, bool bcast) { int status = 0; struct uhd_dpdk_ipv4_5tuple ht_key = { @@ -155,6 +154,10 @@ int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, str } struct uhd_dpdk_udp_priv *pdata = (struct uhd_dpdk_udp_priv *) entry->sock->priv; + if (bcast && pdata->filter_bcast) { + // Filter broadcast packets if not listening + goto udp_rx_drop; + } status = rte_ring_enqueue(entry->sock->rx_ring, mbuf); if (entry->waiter) { _uhd_dpdk_waiter_wake(entry->waiter, port->parent); @@ -172,14 +175,16 @@ udp_rx_drop: return status; } -int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt) +int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct ipv4_hdr *pkt) { - if (pkt->dst_addr != port->ipv4_addr) { + bool bcast = is_broadcast(port, pkt->dst_addr); + if (pkt->dst_addr != port->ipv4_addr && !bcast) { rte_pktmbuf_free(mbuf); return -ENODEV; } if (pkt->next_proto_id == 0x11) { - return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1]); + return _uhd_dpdk_process_udp(port, mbuf, (struct udp_hdr *) &pkt[1], bcast); } rte_pktmbuf_free(mbuf); return -EINVAL; @@ -417,7 +422,7 @@ static inline void _uhd_dpdk_rx_burst(struct uhd_dpdk_port *port) rte_pktmbuf_free(bufs[buf]); break; case ETHER_TYPE_IPv4: - if (ol_flags == PKT_RX_IP_CKSUM_BAD) { /* TODO: Track IP cksum errors? */ + if (ol_flags & PKT_RX_IP_CKSUM_BAD) { RTE_LOG(WARNING, RING, "Buf %d: Bad IP cksum\n", buf); } else { _uhd_dpdk_process_ipv4(port, bufs[buf], (struct ipv4_hdr *) l2_data); diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h index b0d3e42cd..f94a678ba 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_driver.h @@ -20,7 +20,8 @@ static inline bool is_broadcast(struct uhd_dpdk_port *port, uint32_t dst_ipv4_ad int _uhd_dpdk_process_arp(struct uhd_dpdk_port *port, struct arp_hdr *arp_frame); -int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct udp_hdr *pkt); +int _uhd_dpdk_process_udp(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, + struct udp_hdr *pkt, bool bcast); int _uhd_dpdk_process_ipv4(struct uhd_dpdk_port *port, struct rte_mbuf *mbuf, struct ipv4_hdr *pkt); int _uhd_dpdk_send_udp(struct uhd_dpdk_port *port, struct uhd_dpdk_socket *sock, diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c index 6ea77b930..4fc375b77 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.c @@ -19,7 +19,8 @@ * I/O thread ONLY */ -static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk_tx_queue **queue) +static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, + struct uhd_dpdk_tx_queue **queue, size_t num_bufs) { *queue = NULL; struct uhd_dpdk_tx_queue *q = rte_zmalloc(NULL, sizeof(*q), 0); @@ -31,25 +32,25 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk LIST_INIT(&q->tx_list); char name[32]; - snprintf(name, sizeof(name), "tx_ring_udp_%u.%u", port->id, tid); + snprintf(name, sizeof(name), "tx_q%u.%0lx", port->id, (unsigned long) q); q->queue = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); - snprintf(name, sizeof(name), "buffer_ring_udp_%u.%u", port->id, tid); + snprintf(name, sizeof(name), "free_q%u.%0lx", port->id, (unsigned long) q); q->freebufs = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); /* Set up retry queue */ - snprintf(name, sizeof(name), "retry_queue_%u", port->id); + snprintf(name, sizeof(name), "redo_q%u.%0lx", port->id, (unsigned long) q); q->retry_queue = rte_ring_create( name, - UHD_DPDK_TXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); @@ -65,24 +66,39 @@ static int _alloc_txq(struct uhd_dpdk_port *port, pthread_t tid, struct uhd_dpdk rte_free(q); return -ENOMEM; } - struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; - unsigned int num_bufs = rte_ring_free_count(q->freebufs); - int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); - if (buf_stat) { - rte_ring_free(q->freebufs); - rte_ring_free(q->queue); - rte_ring_free(q->retry_queue); - rte_free(q); - RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); - return -ENOENT; - } - unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); - if (enqd != num_bufs) { - RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); - } + + do { + struct rte_mbuf *bufs[UHD_DPDK_TXQ_SIZE]; + num_bufs = rte_ring_free_count(q->freebufs); + if (num_bufs > 0) { + num_bufs = num_bufs > UHD_DPDK_TXQ_SIZE ? UHD_DPDK_TXQ_SIZE : num_bufs; + int buf_stat = rte_pktmbuf_alloc_bulk(port->parent->tx_pktbuf_pool, bufs, num_bufs); + if (buf_stat) { + RTE_LOG(ERR, USER1, "%s: Cannot allocate packet buffers\n", __func__); + goto unwind_txq; + } + unsigned int enqd = rte_ring_enqueue_bulk(q->freebufs, (void **) bufs, num_bufs, NULL); + if (enqd != num_bufs) { + RTE_LOG(ERR, USER1, "%s: Cannot enqueue freebufs\n", __func__); + goto unwind_txq; + } + } + } while (num_bufs > 0); LIST_INSERT_HEAD(&port->txq_list, q, entry); *queue = q; return 0; + +unwind_txq: + while (!rte_ring_empty(q->freebufs)) { + struct rte_mbuf *buf; + if (rte_ring_dequeue(q->freebufs, (void **) &buf) == 0) + rte_free(buf); + } + rte_ring_free(q->freebufs); + rte_ring_free(q->queue); + rte_ring_free(q->retry_queue); + rte_free(q); + return -ENOENT; } /* Finish setting up UDP socket (unless ARP needs to be done) @@ -134,11 +150,14 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) return -EADDRINUSE; } + size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_RX_BURST_SIZE + 1)) ? + UHD_DPDK_RX_BURST_SIZE + 1 : pdata->xferd_pkts; + pdata->xferd_pkts = 0; char name[32]; snprintf(name, sizeof(name), "rx_ring_udp_%u.%u", port->id, ntohs(pdata->dst_port)); sock->rx_ring = rte_ring_create( name, - UHD_DPDK_RXQ_SIZE, + num_bufs, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ ); @@ -172,6 +191,9 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) /* Are we doing TX? */ if (sock->tx_queue) { + size_t num_bufs = (pdata->xferd_pkts < (UHD_DPDK_TX_BURST_SIZE + 1)) ? + UHD_DPDK_TX_BURST_SIZE + 1 : pdata->xferd_pkts; + pdata->xferd_pkts = 0; sock->tx_queue = NULL; struct uhd_dpdk_tx_queue *q = NULL; // FIXME Not sharing txq across all thread's sockets for now @@ -184,7 +206,7 @@ int _uhd_dpdk_udp_setup(struct uhd_dpdk_config_req *req) // } //} if (!sock->tx_queue) { - retval = _alloc_txq(port, sock->tid, &q); + retval = _alloc_txq(port, sock->tid, &q, num_bufs); if (retval) { _uhd_dpdk_config_req_compl(req, retval); return retval; @@ -385,10 +407,13 @@ void uhd_dpdk_udp_open(struct uhd_dpdk_config_req *req, data->src_port = arg->local_port; data->dst_port = arg->remote_port; sock->tx_queue = (struct uhd_dpdk_tx_queue *) sock; + data->xferd_pkts = arg->num_bufs; } else { data->src_port = arg->remote_port; data->dst_port = arg->local_port; sock->rx_ring = (struct rte_ring *) sock; + data->xferd_pkts = arg->num_bufs; + data->filter_bcast = arg->filter_bcast; } /* TODO: Add support for I/O thread calling (skip locking and sleep) */ @@ -433,6 +458,7 @@ static void uhd_dpdk_ipv4_prep(struct uhd_dpdk_port *port, ip_hdr->time_to_live = 64; ip_hdr->next_proto_id = proto_id; ip_hdr->hdr_checksum = 0; /* FIXME: Assuming hardware can offload */ + mbuf->ol_flags |= PKT_TX_IP_CKSUM; ip_hdr->src_addr = port->ipv4_addr; ip_hdr->dst_addr = dst_ipv4_addr; diff --git a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h index 39fcb8597..d7ca5609b 100644 --- a/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h +++ b/host/lib/transport/uhd-dpdk/uhd_dpdk_udp.h @@ -15,6 +15,7 @@ struct uhd_dpdk_udp_priv { uint32_t dst_ipv4_addr; size_t dropped_pkts; size_t xferd_pkts; + bool filter_bcast; /* TODO: Cache destination address ptr to avoid ARP table lookup cost? */ //struct uhd_dpdk_arp_entry *arp_entry; }; -- cgit v1.2.3