diff options
author | Alex Williams <alex.williams@ni.com> | 2019-12-01 21:58:13 -0800 |
---|---|---|
committer | Brent Stapleton <brent.stapleton@ettus.com> | 2019-12-20 16:32:22 -0800 |
commit | 4e38eef817813c1bbd8a9cf972e4cf0134d24308 (patch) | |
tree | f6200a048a7da5b7b588a4a9aae881ce7551825e /host/lib/transport | |
parent | 797d54bc2573688eebcb2c639cb07e4ab6d5ab9d (diff) | |
download | uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.gz uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.bz2 uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.zip |
dpdk: Add new DPDK stack to integrate with I/O services
docs: Update DPDK docs with new parameters:
Parameter names have had their hyphens changed to underscores, and
the I/O CPU argument is now named after the lcores and reflects
the naming used by DPDK.
transport: Add new udp_dpdk_link, based atop the new APIs:
This link is tightly coupled with the DPDK I/O service. The link class
carries all the address information to communicate with the other
host, and it can send packets directly through the DPDK NIC ports.
However, for receiving packets, the I/O service must pull the packets
from the DMA queue and attach them to the appropriate link object.
The link object merely formats the frame_buff object underneath, which
is embedded in the rte_mbuf container. For get_recv_buff, the link
will pull buffers only from its internal queue (the one filled by the
I/O service).
transport: Add DPDK-specific I/O service:
The I/O service is split into two parts, the user threads and the
I/O worker threads. The user threads submit requests through
various appropriate queues, and the I/O threads perform all the
I/O on their behalf. This includes routing UDP packets to the
correct receiver and getting the MAC address of a destination (by
performing the ARP request and handling the ARP replies).
The DPDK context stores I/O services. The context spawns all I/O
services on init(), and I/O services can be fetched from the dpdk_ctx
object by using a port ID.
I/O service clients:
The clients have two lockless ring buffers. One is to get a buffer
from the I/O service; the other is to release a buffer back to the
I/O service. Threads sleeping on buffer I/O are kept in a separate
list from the service queue and are processed in the course of doing
RX or TX.
The list nodes are embedded in the dpdk_io_if, and the head of the
list is on the dpdk_io_service. The I/O service will transfer the
embedded wait_req to the list if it cannot acquire the mutex to
complete the condition for waking.
Co-authored-by: Martin Braun <martin.braun@ettus.com>
Co-authored-by: Ciro Nishiguchi <ciro.nishiguchi@ni.com>
Co-authored-by: Brent Stapleton <brent.stapleton@ettus.com>
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 9 | ||||
-rw-r--r-- | host/lib/transport/dpdk_simple.cpp | 272 | ||||
-rw-r--r-- | host/lib/transport/udp_dpdk_link.cpp | 198 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_common.cpp | 258 | ||||
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_io_service.cpp | 950 |
6 files changed, 1436 insertions, 253 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 646b2837e..d88631ae3 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -143,5 +143,14 @@ endif(ENABLE_LIBERIO) if(ENABLE_DPDK) INCLUDE_SUBDIRECTORY(uhd-dpdk) + + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp + ) + set_source_files_properties( + ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp + PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" + ) endif(ENABLE_DPDK) diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp index 001775934..50855f36a 100644 --- a/host/lib/transport/dpdk_simple.cpp +++ b/host/lib/transport/dpdk_simple.cpp @@ -1,179 +1,203 @@ // -// Copyright 2019 Ettus Research, a National Instruments Company +// Copyright 2019 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // +#include <uhd/transport/frame_buff.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> #include <uhdlib/transport/dpdk_simple.hpp> -#include <uhdlib/transport/uhd-dpdk.h> +#include <uhdlib/transport/links.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> #include <arpa/inet.h> + namespace uhd { namespace transport { namespace { - constexpr uint64_t USEC = 1000000; - // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC - constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4; +constexpr double SEND_TIMEOUT_MS = 500; // seconds } class dpdk_simple_impl : public dpdk_simple { public: - dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr, - const std::string &port, bool filter_bcast) + dpdk_simple_impl(const std::string& addr, const std::string& port) { - UHD_ASSERT_THROW(ctx.is_init_done()); - - // Get NIC that can route to addr - int port_id = ctx.get_route(addr); - UHD_ASSERT_THROW(port_id >= 0); - - _port_id = port_id; - uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str()); - uint16_t dst_port = htons(std::stoi(port, NULL, 0)); - - struct uhd_dpdk_sockarg_udp sockarg = { - .is_tx = false, - .filter_bcast = filter_bcast, - .local_port = 0, - .remote_port = dst_port, - .dst_addr = dst_ipv4, - .num_bufs = 1 + link_params_t link_params = _get_default_link_params(); + _link = + uhd::transport::udp_dpdk_link::make(addr, port, link_params); + UHD_LOG_TRACE("DPDK::SIMPLE", + "Creating simple UDP object for " << addr << ":" << port + << ", DPDK port index " + << _link->get_port()->get_port_id()); + // The context must be initialized, or we'd never get here + auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); + UHD_ASSERT_THROW(ctx->is_init_done()); + + // Init I/O service + _port_id = _link->get_port()->get_port_id(); + _io_service = ctx->get_io_service(_port_id); + // This is normally done by the I/O service manager, but with DPDK, this + // is all it does so we skip that step + UHD_LOG_TRACE("DPDK::SIMPLE", "Attaching link to I/O service..."); + _io_service->attach_recv_link(_link); + _io_service->attach_send_link(_link); + + auto recv_cb = [this](buff_t::uptr& buff, recv_link_if*, send_link_if*) { + return this->_recv_callback(buff); + }; + + auto fc_cb = [this](buff_t::uptr buff, recv_link_if*, send_link_if*) { + this->_recv_fc_callback(std::move(buff)); }; - _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); - UHD_ASSERT_THROW(_rx_sock != nullptr); - - // Backfill the local port, in case it was auto-assigned - uhd_dpdk_udp_get_info(_rx_sock, &sockarg); - sockarg.is_tx = true; - sockarg.remote_port = dst_port; - sockarg.dst_addr = dst_ipv4; - sockarg.num_bufs = 1; - _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg); - UHD_ASSERT_THROW(_tx_sock != nullptr); - UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":" - << ntohs(dst_port) << " and NIC(" << _port_id - << "):" << ntohs(sockarg.local_port)); + + _recv_io = _io_service->make_recv_client(_link, + link_params.num_recv_frames, + recv_cb, + nullptr, // No send/fc link + 0, // No send frames + fc_cb); + + auto send_cb = [this](buff_t::uptr buff, transport::send_link_if*) { + this->_send_callback(std::move(buff)); + }; + _send_io = _io_service->make_send_client(_link, + link_params.num_send_frames, + send_cb, + nullptr, // no FC link + 0, + nullptr, // No receive callback necessary + [](const size_t) { return true; } // We can always send + ); + UHD_LOG_TRACE("DPDK::SIMPLE", "Constructor complete"); } - ~dpdk_simple_impl(void) {} + ~dpdk_simple_impl(void) + { + UHD_LOG_TRACE("DPDK::SIMPLE", + "~dpdk_simple_impl(), DPDK port index " << _link->get_port()->get_port_id()); + // Disconnect the clients from the I/O service + _send_io.reset(); + _recv_io.reset(); + // Disconnect the link from the I/O service + _io_service->detach_recv_link(_link); + _io_service->detach_send_link(_link); + } - /*! - * Send and release outstanding buffer + /*! Send and release outstanding buffer * * \param length bytes of data to send * \return number of bytes sent (releases buffer if sent) */ - size_t send(const boost::asio::const_buffer& buff) + size_t send(const boost::asio::const_buffer& user_buff) { - struct rte_mbuf* tx_mbuf; - size_t frame_size = _get_tx_buf(&tx_mbuf); - UHD_ASSERT_THROW(tx_mbuf) - size_t nbytes = boost::asio::buffer_size(buff); - UHD_ASSERT_THROW(nbytes <= frame_size) - const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(buff); - - uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_tx_sock, tx_mbuf); - std::memcpy(pkt_data, user_data, nbytes); - tx_mbuf->pkt_len = nbytes; - tx_mbuf->data_len = nbytes; - - int num_tx = uhd_dpdk_send(_tx_sock, &tx_mbuf, 1); - if (num_tx == 0) - return 0; + // Extract buff and sanity check + const size_t nbytes = boost::asio::buffer_size(user_buff); + UHD_ASSERT_THROW(nbytes <= _link->get_send_frame_size()) + const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(user_buff); + + // Get send buff + auto buff = _send_io->get_send_buff(SEND_TIMEOUT_MS); + UHD_ASSERT_THROW(buff); + buff->set_packet_size(nbytes); + std::memcpy(buff->data(), user_data, nbytes); + + // Release send buff (send the packet) + _send_io->release_send_buff(std::move(buff)); return nbytes; } - /*! - * Receive a single packet. + /*! Receive a single packet. + * * Buffer provided by transport (must be freed before next operation). * * \param buf a pointer to place to write buffer location * \param timeout the timeout in seconds * \return the number of bytes received or zero on timeout */ - size_t recv(const boost::asio::mutable_buffer& buff, double timeout) + size_t recv(const boost::asio::mutable_buffer& user_buff, double timeout) { - struct rte_mbuf *rx_mbuf; - size_t buff_size = boost::asio::buffer_size(buff); - uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(buff); + size_t user_buff_size = boost::asio::buffer_size(user_buff); + uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(user_buff); - int bufs = uhd_dpdk_recv(_rx_sock, &rx_mbuf, 1, (int) (timeout*USEC)); - if (bufs != 1 || rx_mbuf == nullptr) { - return 0; - } - if ((rx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { - uhd_dpdk_free_buf(rx_mbuf); + auto buff = _recv_io->get_recv_buff(static_cast<int32_t>(timeout * 1e3)); + if (!buff) { return 0; } - uhd_dpdk_get_src_ipv4(_rx_sock, rx_mbuf, &_last_recv_addr); - const size_t nbytes = uhd_dpdk_get_len(_rx_sock, rx_mbuf); - UHD_ASSERT_THROW(nbytes <= buff_size); + // Extract the sender's address. This is only possible because we know + // the memory layout of the buff + struct udp_hdr* udp_hdr_end = (struct udp_hdr*)buff->data(); + struct ipv4_hdr* ip_hdr_end = (struct ipv4_hdr*)(&udp_hdr_end[-1]); + struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)(&ip_hdr_end[-1]); + _last_recv_addr = ip_hdr->src_addr; + + // Extract the buffer data + const size_t copy_len = std::min(user_buff_size, buff->packet_size()); + if (copy_len < buff->packet_size()) { + UHD_LOG_WARNING("DPDK", "Truncating recv packet"); + } + std::memcpy(user_data, buff->data(), copy_len); - uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_rx_sock, rx_mbuf); - std::memcpy(user_data, pkt_data, nbytes); - _put_rx_buf(rx_mbuf); - return nbytes; + // Housekeeping + _recv_io->release_recv_buff(std::move(buff)); + return copy_len; } - /*! - * Get the last IP address as seen by recv(). - * Only use this with the broadcast socket. - */ std::string get_recv_addr(void) { - char addr_str[INET_ADDRSTRLEN]; - struct in_addr ipv4_addr; - ipv4_addr.s_addr = _last_recv_addr; - inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); - return std::string(addr_str); + return dpdk::ipv4_num_to_str(_last_recv_addr); } - /*! - * Get the IP address for the destination - */ std::string get_send_addr(void) { - struct in_addr ipv4_addr; - int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr); - UHD_ASSERT_THROW(status); - char addr_str[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); - return std::string(addr_str); + return dpdk::ipv4_num_to_str(_link->get_remote_ipv4()); } + private: - /*! - * Request a single send buffer of specified size. - * - * \param buf a pointer to place to write buffer location - * \return the maximum length of the buffer - */ - size_t _get_tx_buf(struct rte_mbuf** buf) + using buff_t = frame_buff; + + link_params_t _get_default_link_params() { - int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, buf, 1, 0); - if (bufs != 1) { - *buf = nullptr; - return 0; - } - return _mtu - DPDK_SIMPLE_NONDATA_SIZE; + link_params_t link_params; + link_params.recv_frame_size = 8000; + link_params.send_frame_size = 8000; + link_params.num_recv_frames = 1; + link_params.num_send_frames = 1; + link_params.recv_buff_size = 8000; + link_params.send_buff_size = 8000; + return link_params; } - /*! - * Return/free receive buffer - * Can also use to free un-sent TX bufs - */ - void _put_rx_buf(struct rte_mbuf *rx_mbuf) + void _send_callback(buff_t::uptr buff) + { + _link->release_send_buff(std::move(buff)); + } + + bool _recv_callback(buff_t::uptr&) { - UHD_ASSERT_THROW(rx_mbuf) - uhd_dpdk_free_buf(rx_mbuf); + // Queue it up + return true; } + void _recv_fc_callback(buff_t::uptr buff) + { + _link->release_recv_buff(std::move(buff)); + } + + /*** Attributes **********************************************************/ unsigned int _port_id; - size_t _mtu; - struct uhd_dpdk_socket *_tx_sock; - struct uhd_dpdk_socket *_rx_sock; uint32_t _last_recv_addr; + + udp_dpdk_link::sptr _link; + + dpdk_io_service::sptr _io_service; + + send_io_if::sptr _send_io; + + recv_io_if::sptr _recv_io; }; dpdk_simple::~dpdk_simple(void) {} @@ -182,16 +206,16 @@ dpdk_simple::~dpdk_simple(void) {} * DPDK simple transport public make functions **********************************************************************/ udp_simple::sptr dpdk_simple::make_connected( - struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port -){ - return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, true)); + const std::string& addr, const std::string& port) +{ + return udp_simple::sptr(new dpdk_simple_impl(addr, port)); } +// For DPDK, this is not special and the same as make_connected udp_simple::sptr dpdk_simple::make_broadcast( - struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port -){ - return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, false)); + const std::string& addr, const std::string& port) +{ + return udp_simple::sptr(new dpdk_simple_impl(addr, port)); } }} // namespace uhd::transport - diff --git a/host/lib/transport/udp_dpdk_link.cpp b/host/lib/transport/udp_dpdk_link.cpp new file mode 100644 index 000000000..dc56de43c --- /dev/null +++ b/host/lib/transport/udp_dpdk_link.cpp @@ -0,0 +1,198 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/config.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/static.hpp> +#include <uhdlib/transport/adapter.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> +#include <arpa/inet.h> +#include <memory> + +using namespace uhd::transport; +using namespace uhd::transport::dpdk; + +udp_dpdk_link::udp_dpdk_link(dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params) + : _num_recv_frames(params.num_recv_frames) + , _recv_frame_size(params.recv_frame_size) + , _num_send_frames(params.num_send_frames) + , _send_frame_size(params.send_frame_size) +{ + // Get a reference to the context, since this class manages DPDK memory + _ctx = dpdk_ctx::get(); + UHD_ASSERT_THROW(_ctx); + + // Fill in remote IPv4 address and UDP port + // NOTE: Remote MAC address is filled in later by I/O service + int status = inet_pton(AF_INET, remote_addr.c_str(), &_remote_ipv4); + if (status != 1) { + UHD_LOG_ERROR("DPDK", std::string("Invalid destination address ") + remote_addr); + throw uhd::runtime_error( + std::string("DPDK: Invalid destination address ") + remote_addr); + } + _remote_port = rte_cpu_to_be_16(std::stoul(remote_port)); + + // Grab the port with a route to the remote host + _port = _ctx->get_port(port_id); + + uint16_t local_port_num = rte_cpu_to_be_16(std::stoul(local_port)); + // Get an unused UDP port for listening + _local_port = _port->alloc_udp_port(local_port_num); + + // Validate params + const size_t max_frame_size = _port->get_mtu() - dpdk::HDR_SIZE_UDP_IPV4; + UHD_ASSERT_THROW(params.send_frame_size <= max_frame_size); + UHD_ASSERT_THROW(params.recv_frame_size <= max_frame_size); + + // Register the adapter + auto info = _port->get_adapter_info(); + auto& adap_ctx = adapter_ctx::get(); + _adapter_id = adap_ctx.register_adapter(info); + UHD_LOGGER_TRACE("DPDK") << boost::format("Created udp_dpdk_link to (%s:%s)") + % remote_addr % remote_port; + UHD_LOGGER_TRACE("DPDK") + << boost::format("num_recv_frames=%d, recv_frame_size=%d, num_send_frames=%d, " + "send_frame_size=%d") + % params.num_recv_frames % params.recv_frame_size % params.num_send_frames + % params.send_frame_size; +} + +udp_dpdk_link::~udp_dpdk_link() {} + +udp_dpdk_link::sptr udp_dpdk_link::make(const std::string& remote_addr, + const std::string& remote_port, + const link_params_t& params) +{ + auto ctx = dpdk::dpdk_ctx::get(); + auto port = ctx->get_route(remote_addr); + if (!port) { + UHD_LOG_ERROR("DPDK", + std::string("Could not find route to destination address ") + remote_addr); + throw uhd::runtime_error( + std::string("DPDK: Could not find route to destination address ") + + remote_addr); + } + return make(port->get_port_id(), remote_addr, remote_port, "0", params); +} + +udp_dpdk_link::sptr udp_dpdk_link::make(const dpdk::port_id_t port_id, + const std::string& remote_addr, + const std::string& remote_port, + const std::string& local_port, + const link_params_t& params) +{ + UHD_ASSERT_THROW(params.recv_frame_size > 0); + UHD_ASSERT_THROW(params.send_frame_size > 0); + UHD_ASSERT_THROW(params.num_send_frames > 0); + UHD_ASSERT_THROW(params.num_recv_frames > 0); + + return std::make_shared<udp_dpdk_link>( + port_id, remote_addr, remote_port, local_port, params); +} + +void udp_dpdk_link::enqueue_recv_mbuf(struct rte_mbuf* mbuf) +{ + // Get packet size + struct udp_hdr* hdr = rte_pktmbuf_mtod_offset( + mbuf, struct udp_hdr*, sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr)); + size_t packet_size = rte_be_to_cpu_16(hdr->dgram_len) - sizeof(struct udp_hdr); + // Prepare the dpdk_frame_buff + auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf); + buff->header_jump( + sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + buff->set_packet_size(packet_size); + // Add the dpdk_frame_buff to the list + if (_recv_buff_head) { + buff->prev = _recv_buff_head->prev; + buff->next = _recv_buff_head; + _recv_buff_head->prev->next = buff; + _recv_buff_head->prev = buff; + } else { + _recv_buff_head = buff; + buff->next = buff; + buff->prev = buff; + } +} + +frame_buff::uptr udp_dpdk_link::get_recv_buff(int32_t /*timeout_ms*/) +{ + auto buff = _recv_buff_head; + if (buff) { + if (_recv_buff_head->next == buff) { + /* Only had the one buff, so the list is empty */ + _recv_buff_head = nullptr; + } else { + /* Make the next buff the new list head */ + _recv_buff_head->next->prev = _recv_buff_head->prev; + _recv_buff_head->prev->next = _recv_buff_head->next; + _recv_buff_head = _recv_buff_head->next; + } + buff->next = nullptr; + buff->prev = nullptr; + return frame_buff::uptr(buff); + } + return frame_buff::uptr(); +} + +void udp_dpdk_link::release_recv_buff(frame_buff::uptr buff) +{ + dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release(); + assert(buff_ptr); + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); +} + +frame_buff::uptr udp_dpdk_link::get_send_buff(int32_t /*timeout_ms*/) +{ + auto mbuf = rte_pktmbuf_alloc(_port->get_tx_pktbuf_pool()); + if (mbuf) { + auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf); + buff->header_jump( + sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr)); + return frame_buff::uptr(buff); + } + return frame_buff::uptr(); +} + +void udp_dpdk_link::release_send_buff(frame_buff::uptr buff) +{ + dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release(); + assert(buff_ptr); + auto mbuf = buff_ptr->get_pktmbuf(); + if (buff_ptr->packet_size()) { + // Fill in L2 header + auto local_mac = _port->get_mac_addr(); + struct ether_hdr* l2_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + ether_addr_copy(&_remote_mac, &l2_hdr->d_addr); + ether_addr_copy(&local_mac, &l2_hdr->s_addr); + l2_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + // Fill in L3 and L4 headers + dpdk::fill_udp_hdr(mbuf, + _port, + _remote_ipv4, + _local_port, + _remote_port, + buff_ptr->packet_size()); + // Prepare the packet buffer and send it out + int status = rte_eth_tx_prepare(_port->get_port_id(), _queue, &mbuf, 1); + if (status != 1) { + throw uhd::runtime_error("DPDK: Failed to prepare TX buffer for send"); + } + status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1); + while (status != 1) { + status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1); + // FIXME: Should we make available retrying? + // throw uhd::runtime_error("DPDK: Failed to send TX buffer"); + } + } else { + // Release the buffer if there is nothing in it + rte_pktmbuf_free(mbuf); + } +} diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt index ea78aa1e8..a13886653 100644 --- a/host/lib/transport/uhd-dpdk/CMakeLists.txt +++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt @@ -19,9 +19,11 @@ if(ENABLE_DPDK) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp ) set_source_files_properties( ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE" ) include_directories(${DPDK_INCLUDE_DIR}) diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp index 43a1507bb..46818e973 100644 --- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp +++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp @@ -3,8 +3,13 @@ // // SPDX-License-Identifier: GPL-3.0-or-later // + +#include <uhd/utils/algorithm.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> #include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> #include <uhdlib/utils/prefs.hpp> #include <arpa/inet.h> #include <rte_arp.h> @@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512; inline char* eal_add_opt( std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg) { + UHD_LOG_TRACE("DPDK", opt << " " << arg); char* ptr = dst; strncpy(ptr, opt, n); argv.push_back(ptr); @@ -34,15 +40,6 @@ inline char* eal_add_opt( return ptr; } -inline std::string eth_addr_to_string(const struct ether_addr mac_addr) -{ - auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx"); - mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1] - % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3] - % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5]; - return mac_stream.str(); -} - inline void separate_ipv4_addr( const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask) { @@ -59,28 +56,29 @@ inline void separate_ipv4_addr( dpdk_port::uptr dpdk_port::make(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) { return std::make_unique<dpdk_port>( - port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); + port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address); } dpdk_port::dpdk_port(port_id_t port, size_t mtu, uint16_t num_queues, - size_t num_mbufs, + uint16_t num_desc, struct rte_mempool* rx_pktbuf_pool, struct rte_mempool* tx_pktbuf_pool, std::string ipv4_address) : _port(port) , _mtu(mtu) + , _num_queues(num_queues) , _rx_pktbuf_pool(rx_pktbuf_pool) , _tx_pktbuf_pool(tx_pktbuf_pool) { - /* 1. Set MTU and IPv4 address */ + /* Set MTU and IPv4 address */ int retval; retval = rte_eth_dev_set_mtu(_port, _mtu); @@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port, separate_ipv4_addr(ipv4_address, _ipv4, _netmask); - /* 2. Set hardware offloads */ + /* Set hardware offloads */ struct rte_eth_dev_info dev_info; rte_eth_dev_info_get(_port, &dev_info); uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM; @@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the device"); } - /* 3. Set descriptor ring sizes */ - uint16_t rx_desc = num_mbufs; + /* Set descriptor ring sizes */ + uint16_t rx_desc = num_desc; if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.rx_desc_lim.nb_min + % _port % num_desc % dev_info.rx_desc_lim.nb_min % dev_info.rx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num RX descriptors must also be aligned to 0x%x") @@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors"); } - uint16_t tx_desc = num_mbufs; + uint16_t tx_desc = num_desc; if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) { UHD_LOGGER_ERROR("DPDK") << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]") - % _port % num_mbufs % dev_info.tx_desc_lim.nb_min + % _port % num_desc % dev_info.tx_desc_lim.nb_min % dev_info.tx_desc_lim.nb_max; UHD_LOGGER_ERROR("DPDK") << boost::format("Num TX descriptors must also be aligned to 0x%x") @@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port, throw uhd::runtime_error("DPDK: Failed to configure the DMA queues"); } - /* 4. Set up the RX and TX DMA queues (May not be generally supported after + /* Set up the RX and TX DMA queues (May not be generally supported after * eth_dev_start) */ unsigned int cpu_socket = rte_eth_dev_socket_id(_port); for (uint16_t i = 0; i < _num_queues; i++) { @@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port, } } - /* 5. Set up initial flow table */ + /* TODO: Enable multiple queues (only support 1 right now) */ - /* Append all free queues except 0, which is reserved for ARP */ - _free_queues.reserve(_num_queues - 1); - for (unsigned int i = 1; i < _num_queues; i++) { - _free_queues.push_back(i); - } - - // struct rte_flow_attr flow_attr; - // flow_attr.group = 0; - // flow_attr.priority = 1; - // flow_attr.ingress = 1; - // flow_attr.egress = 0; - // flow_attr.transfer = 0; - // flow_attr.reserved = 0; - - // struct rte_flow_item[] flow_pattern = { - //}; - // int rte_flow_validate(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action actions[], - // struct rte_flow_error *error); - // struct rte_flow * rte_flow_create(uint16_t port_id, - // const struct rte_flow_attr *attr, - // const struct rte_flow_item pattern[], - // const struct rte_flow_action *actions[], - // struct rte_flow_error *error); - - /* 6. Start the Ethernet device */ + /* Start the Ethernet device */ retval = rte_eth_dev_start(_port); if (retval < 0) { UHD_LOGGER_ERROR("DPDK") @@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port, << " MAC: " << eth_addr_to_string(_mac_addr); } -/* TODO: Do flow directions */ -queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[]) +dpdk_port::~dpdk_port() { - std::lock_guard<std::mutex> lock(_mutex); - UHD_ASSERT_THROW(_free_queues.size() != 0); - auto queue = _free_queues.back(); - _free_queues.pop_back(); - return queue; + rte_eth_dev_stop(_port); + rte_spinlock_lock(&_spinlock); + for (auto kv : _arp_table) { + for (auto req : kv.second->reqs) { + req->cond.notify_one(); + } + rte_free(kv.second); + } + _arp_table.clear(); + rte_spinlock_unlock(&_spinlock); } -void dpdk_port::free_queue(queue_id_t queue) +uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port) { + uint16_t port_selected; std::lock_guard<std::mutex> lock(_mutex); - auto flow = _flow_rules.at(queue); - int status = rte_flow_destroy(_port, flow, NULL); - if (status) { - UHD_LOGGER_ERROR("DPDK") - << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port - % queue; - throw uhd::runtime_error("DPDK: Failed to destroy flow rule"); + if (udp_port) { + if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) { + return 0; + } + port_selected = rte_be_to_cpu_16(udp_port); } else { - _flow_rules.erase(queue); + if (_udp_ports.size() >= 65535) { + UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain"); + return 0; + } + port_selected = _next_udp_port; + while (true) { + if (port_selected == 0) { + continue; + } + if (_udp_ports.count(port_selected) == 0) { + _next_udp_port = port_selected - 1; + break; + } + if (port_selected - 1 == _next_udp_port) { + return 0; + } + port_selected--; + } } - _free_queues.push_back(queue); + _udp_ports.insert(port_selected); + return rte_cpu_to_be_16(port_selected); } -int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req) +int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req) { struct rte_mbuf* mbuf; struct ether_hdr* hdr; struct arp_hdr* arp_frame; - mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool); + mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool); if (unlikely(mbuf == NULL)) { UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response"); return -ENOMEM; @@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar mbuf->pkt_len = 42; mbuf->data_len = 42; - // ARP replies always on queue 0 - if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) { + if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) { UHD_LOGGER_WARNING("DPDK") << boost::format("%s: TX descriptor ring is full") % __func__; rte_pktmbuf_free(mbuf); @@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar return 0; } -// TODO: ARP processing for queue 0 -// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr -// *arp_frame) -//{ -// std::lock_guard<std::mutex> lock(_mutex); -// uint32_t dest_ip = arp_frame->arp_data.arp_sip; -// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; -// -// /* Add entry to ARP table */ -// struct uhd_dpdk_arp_entry *entry = NULL; -// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry); -// if (!entry) { -// entry = rte_zmalloc(NULL, sizeof(*entry), 0); -// if (!entry) { -// return -ENOMEM; -// } -// LIST_INIT(&entry->pending_list); -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) { -// rte_free(entry); -// return -ENOSPC; -// } -// } else { -// struct uhd_dpdk_config_req *req = NULL; -// ether_addr_copy(&dest_addr, &entry->mac_addr); -// /* Now wake any config reqs waiting for the ARP */ -// LIST_FOREACH(req, &entry->pending_list, entry) { -// _uhd_dpdk_config_req_compl(req, 0); -// } -// while (entry->pending_list.lh_first != NULL) { -// LIST_REMOVE(entry->pending_list.lh_first, entry); -// } -// } -// /* Respond if this was an ARP request */ -// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) && -// arp_frame->arp_data.arp_tip == port->ipv4_addr) { -// _arp_reply(tx_pktbuf_pool, arp_frame); -// } -// -// return 0; -// -//} - -static dpdk_ctx* global_ctx = nullptr; +static dpdk_ctx::sptr global_ctx = nullptr; static std::mutex global_ctx_mutex; dpdk_ctx::sptr dpdk_ctx::get() { std::lock_guard<std::mutex> lock(global_ctx_mutex); if (!global_ctx) { - auto new_ctx = std::make_shared<dpdk_ctx>(); - global_ctx = new_ctx.get(); - return new_ctx; + global_ctx = std::make_shared<dpdk_ctx>(); } - return global_ctx->shared_from_this(); + return global_ctx; } dpdk_ctx::dpdk_ctx(void) : _init_done(false) {} @@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args) auto args = new std::array<char, 4096>(); char* opt = args->data(); char* end = args->data() + args->size(); + UHD_LOG_TRACE("DPDK", "EAL init options: "); for (std::string& key : eal_args.keys()) { std::string val = eal_args[key]; if (key == "dpdk_coremask") { @@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args) _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS); _mbuf_cache_size = dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE); + UHD_LOG_TRACE("DPDK", + "mtu: " << _mtu << " num_mbufs: " << _num_mbufs + << " mbuf_cache_size: " << _mbuf_cache_size); /* Get device info for all the NIC ports */ int num_dpdk_ports = rte_eth_dev_count_avail(); - UHD_ASSERT_THROW(num_dpdk_ports > 0); + if (num_dpdk_ports == 0) { + UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!"); + throw uhd::runtime_error("No available DPDK devices (ports) found!"); + } device_addrs_t nics(num_dpdk_ports); RTE_ETH_FOREACH_DEV(i) { @@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args) } /* Now combine user args with conf file */ auto conf = uhd::prefs::get_dpdk_nic_args(nic); + // TODO: Enable the use of multiple DMA queues + conf["dpdk_num_queues"] = "1"; /* Update config, and remove ports that aren't fully configured */ if (conf.has_key("dpdk_ipv4")) { @@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args) } } + std::map<size_t, std::vector<size_t>> lcore_to_port_id_map; RTE_ETH_FOREACH_DEV(i) { auto& conf = nics.at(i); if (conf.has_key("dpdk_ipv4")) { + UHD_ASSERT_THROW(conf.has_key("dpdk_lcore")); + const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0); + if (!lcore_to_port_id_map.count(lcore_id)) { + lcore_to_port_id_map.insert({lcore_id, {}}); + } + // Allocating enough buffers for all DMA queues for each CPU socket // - This is a bit inefficient for larger systems, since NICs may not // all be on one socket @@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args) _ports[i] = dpdk_port::make(i, _mtu, conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()), - _num_mbufs, + conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE), rx_pool, tx_pool, conf["dpdk_ipv4"]); + + // Remember all port IDs that map to an lcore + lcore_to_port_id_map.at(lcore_id).push_back(i); } } @@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args) rte_eth_link_get(portid, &link); unsigned int link_status = link.link_status; unsigned int link_speed = link.link_speed; - UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n") - % portid % link_status % link_speed; + UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid + % link_status % link_speed; } - UHD_LOG_TRACE("DPDK", "Init DONE!"); - + UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services"); _init_done = true; + + // Links are up, now create one IO service per lcore + for (auto& lcore_portids_pair : lcore_to_port_id_map) { + const size_t lcore_id = lcore_portids_pair.first; + std::vector<dpdk_port*> dpdk_ports; + dpdk_ports.reserve(lcore_portids_pair.second.size()); + for (const size_t port_id : lcore_portids_pair.second) { + dpdk_ports.push_back(get_port(port_id)); + } + const size_t servq_depth = 32; // FIXME + UHD_LOG_TRACE("DPDK", + "Creating I/O service for lcore " + << lcore_id << ", servicing " << dpdk_ports.size() + << " ports, service queue depth " << servq_depth); + _io_srv_portid_map.insert( + {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth), + lcore_portids_pair.second}); + } } } @@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const return link.link_status; } -int dpdk_ctx::get_route(const std::string& addr) const +dpdk_port* dpdk_ctx::get_route(const std::string& addr) const { const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str()); for (const auto& port : _ports) { @@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const uint32_t src_ipv4 = port.second->get_ipv4(); uint32_t netmask = port.second->get_netmask(); if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) { - return (int)port.first; + return port.second.get(); } } - return -ENODEV; + return NULL; } @@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const return _init_done.load(); } +uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id) +{ + for (auto& io_srv_portid_pair : _io_srv_portid_map) { + if (uhd::has(io_srv_portid_pair.second, port_id)) { + return io_srv_portid_pair.first; + } + } + + std::string err_msg = std::string("Cannot look up I/O service for port ID: ") + + std::to_string(port_id) + ". No such port ID!"; + UHD_LOG_ERROR("DPDK", err_msg); + throw uhd::lookup_error(err_msg); +} + struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( unsigned int cpu_socket, size_t num_bufs) { @@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool( const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM; char name[32]; snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket); - _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create( - name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY); + _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name, + num_bufs, + _mbuf_cache_size, + DPDK_MBUF_PRIV_SIZE, + mbuf_size, + SOCKET_ID_ANY); if (!_rx_pktbuf_pools.at(cpu_socket)) { UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool"); throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool"); diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp new file mode 100644 index 000000000..1fcedca51 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp @@ -0,0 +1,950 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> +#include <uhdlib/utils/narrow.hpp> +#include <cmath> + +/* + * Memory management + * + * Every object that allocates and frees DPDK memory has a reference to the + * dpdk_ctx. + * + * Ownership hierarchy: + * + * dpdk_io_service_mgr (1) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * xport (1) => + * dpdk_send_io::sptr + * dpdk_recv_io::sptr + * + * usrp link_mgr (1) => + * udp_dpdk_link::sptr + * + * dpdk_send_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_recv_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_io_service (3) => + * dpdk_ctx::wptr (weak_ptr) + * udp_dpdk_link::sptr + * + * udp_dpdk_link (4) => + * dpdk_ctx::sptr + */ + +using namespace uhd::transport; + +dpdk_io_service::dpdk_io_service( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) + : _ctx(dpdk::dpdk_ctx::get()) + , _lcore_id(lcore_id) + , _ports(ports) + , _servq(servq_depth, lcore_id) +{ + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id); + for (auto port : _ports) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id()); + _tx_queues[port->get_port_id()] = std::list<dpdk_send_io*>(); + _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>(); + } + int status = rte_eal_remote_launch(_io_worker, this, lcore_id); + if (status) { + throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore"); + } +} + +dpdk_io_service::sptr dpdk_io_service::make( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +{ + return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth)); +} + +dpdk_io_service::~dpdk_io_service() +{ + UHD_LOG_TRACE( + "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id); + dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL); + if (!req) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "Could not allocate request for lcore termination for lcore " << _lcore_id); + return; + } + dpdk::wait_req_get(req); + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); +} + +void dpdk_io_service::attach_recv_link(recv_link_if::sptr link) +{ + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link.get()); + data.is_recv = true; + assert(data.link); + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.push_back(link); + } +} + +void dpdk_io_service::attach_send_link(send_link_if::sptr link) +{ + udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get()); + assert(dpdk_link); + + // First, fill in destination MAC address + struct dpdk::arp_request arp_data; + arp_data.tpa = dpdk_link->get_remote_ipv4(); + arp_data.port = dpdk_link->get_port()->get_port_id(); + if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) { + // If a broadcast IP, skip the ARP and fill with broadcast MAC addr + memset(arp_data.tha.addr_bytes, 0xFF, 6); + } else { + auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (!arp_req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request"); + } + if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) { + // Try one more time... + auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) { + wait_req_put(arp_req); + wait_req_put(arp_req2); + throw uhd::io_error("DPDK: Could not reach host"); + } + wait_req_put(arp_req2); + } + wait_req_put(arp_req); + } + dpdk_link->set_remote_mac(arp_data.tha); + + // Then, submit the link to the I/O service thread + struct dpdk_flow_data data; + data.link = dpdk_link; + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.push_back(link); + } +} + +void dpdk_io_service::detach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = true; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.remove_if( + [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +void dpdk_io_service::detach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.remove_if( + [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr /*fc_link*/, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(data_link.get()); + auto recv_io = std::make_shared<dpdk_recv_io>( + shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb); + + // Register with I/O service + recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return recv_io; +} + +send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr /*recv_link*/, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(send_link.get()); + auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(), + link, + num_send_frames, + send_cb, + num_recv_frames, + recv_cb, + fc_cb); + + // Register with I/O service + send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return send_io; +} + + +int dpdk_io_service::_io_worker(void* arg) +{ + if (!arg) + return -EINVAL; + dpdk_io_service* srv = (dpdk_io_service*)arg; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + if (srv->_ports.size() == 0) + return -ENODEV; + + char name[16]; + snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id); + rte_thread_setname(pthread_self(), name); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "I/O service thread '" << name << "' started on lcore " << lcore_id); + + uhd::set_thread_priority_safe(); + + snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id); + struct rte_hash_parameters hash_params = {.name = name, + .entries = MAX_FLOWS, + .reserved = 0, + .key_len = sizeof(struct dpdk::ipv4_5tuple), + .hash_func = NULL, + .hash_func_init_val = 0, + .socket_id = uhd::narrow_cast<int>(rte_socket_id()), + .extra_flag = 0}; + srv->_rx_table = rte_hash_create(&hash_params); + if (srv->_rx_table == NULL) { + return rte_errno; + } + + int status = 0; + while (!status) { + /* For each port, attempt to receive packets and process */ + for (auto port : srv->_ports) { + srv->_rx_burst(port, 0); + } + /* For each port's TX queues, do TX */ + for (auto port : srv->_ports) { + srv->_tx_burst(port); + } + /* For each port's RX release queues, release buffers */ + for (auto port : srv->_ports) { + srv->_rx_release(port); + } + /* Retry waking clients */ + if (srv->_retry_head) { + dpdk_io_if* node = srv->_retry_head; + dpdk_io_if* end = srv->_retry_head->prev; + while (true) { + dpdk_io_if* next = node->next; + srv->_wake_client(node); + if (node == end) { + break; + } else { + node = next; + next = node->next; + } + } + } + /* Check for open()/close()/term() requests and service 1 at a time + * Leave this last so we immediately terminate if requested + */ + status = srv->_service_requests(); + } + + return status; +} + +int dpdk_io_service::_service_requests() +{ + for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) { + /* Dequeue */ + dpdk::wait_req* req = _servq.pop(); + if (!req) { + break; + } + switch (req->reason) { + case dpdk::wait_type::WAIT_SIMPLE: + while (_servq.complete(req) == -ENOBUFS) + ; + break; + case dpdk::wait_type::WAIT_RX: + case dpdk::wait_type::WAIT_TX_BUF: + throw uhd::not_implemented_error( + "DPDK: _service_requests(): DPDK is still a WIP"); + case dpdk::wait_type::WAIT_FLOW_OPEN: + _service_flow_open(req); + break; + case dpdk::wait_type::WAIT_FLOW_CLOSE: + _service_flow_close(req); + break; + case dpdk::wait_type::WAIT_XPORT_CONNECT: + _service_xport_connect(req); + break; + case dpdk::wait_type::WAIT_XPORT_DISCONNECT: + _service_xport_disconnect(req); + break; + case dpdk::wait_type::WAIT_ARP: { + assert(req->data != NULL); + int arp_status = _service_arp_request(req); + assert(arp_status != -ENOMEM); + if (arp_status == 0) { + while (_servq.complete(req) == -ENOBUFS) + ; + } + break; + } + case dpdk::wait_type::WAIT_LCORE_TERM: + rte_free(_rx_table); + while (_servq.complete(req) == -ENOBUFS) + ; + // Return a positive value to indicate we should terminate + return 1; + default: + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Invalid reason associated with wait request"); + while (_servq.complete(req) == -ENOBUFS) + ; + break; + } + } + return 0; +} + +void dpdk_io_service::_service_flow_open(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, add to RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + // Check the UDP port isn't in use + if (rte_hash_lookup(_rx_table, &ht_key) > 0) { + req->retval = -EADDRINUSE; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add xport list for this UDP port + auto rx_entry = new std::list<dpdk_io_if*>(); + if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table"); + delete rx_entry; + req->retval = -ENOMEM; + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_flow_close(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, remove from RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + std::list<dpdk_io_if*>* xport_list; + + if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) { + UHD_ASSERT_THROW(xport_list->empty()); + delete xport_list; + rte_hash_del_key(_rx_table, &ht_key); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req) +{ + auto dpdk_io = static_cast<dpdk_io_if*>(req->data); + UHD_ASSERT_THROW(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Add to RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + req->retval = -ENOENT; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add to xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->push_back(dpdk_io); + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request..."); + // Add to xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request..."); + dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Add to xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.push_back(send_io); + for (size_t i = 0; i < send_io->_num_send_frames; i++) { + auto buff_ptr = + (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + break; + } + if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + break; + } + send_io->_num_frames_in_use++; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req) +{ + auto dpdk_io = (struct dpdk_io_if*)req->data; + assert(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Remove from RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) { + // Remove from xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->remove(dpdk_io); + } else { + req->retval = -EINVAL; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table"); + } + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request..."); + dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.remove(recv_client); + while (!rte_ring_empty(recv_client->_recv_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(recv_client->_release_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request..."); + dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.remove(send_client); + while (!rte_ring_empty(send_client->_send_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(send_client->_buffer_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + } + // Now remove the node if it's on the retry list + if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) { + _retry_head = NULL; + } else if (_retry_head) { + dpdk_io_if* node = _retry_head->next; + while (node != _retry_head) { + if (node == dpdk_io) { + dpdk_io->prev->next = dpdk_io->next; + dpdk_io->next->prev = dpdk_io->prev; + break; + } + node = node->next; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +int dpdk_io_service::_service_arp_request(dpdk::wait_req* req) +{ + int status = 0; + auto arp_req_data = (struct dpdk::arp_request*)req->data; + dpdk::ipv4_addr dst_addr = arp_req_data->tpa; + auto ctx_sptr = _ctx.lock(); + UHD_ASSERT_THROW(ctx_sptr); + dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr)); + + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dst_addr) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + status = -ENOMEM; + goto arp_end; + } + entry = new (entry) dpdk::arp_entry(); + entry->reqs.push_back(req); + port->_arp_table[dst_addr] = entry; + status = -EAGAIN; + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request."); + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + entry = port->_arp_table.at(dst_addr); + if (is_zero_ether_addr(&entry->mac_addr)) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Address in table, but not populated yet. Resending ARP request."); + port->_arp_table.at(dst_addr)->reqs.push_back(req); + status = -EAGAIN; + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table."); + ether_addr_copy(&entry->mac_addr, &arp_req_data->tha); + status = 0; + } + } +arp_end: + rte_spinlock_unlock(&port->_spinlock); + return status; +} + +int dpdk_io_service::_send_arp_request( + dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip) +{ + struct rte_mbuf* mbuf; + struct ether_hdr* hdr; + struct arp_hdr* arp_frame; + + mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool()); + if (unlikely(mbuf == NULL)) { + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + arp_frame = (struct arp_hdr*)&hdr[1]; + + memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + hdr->s_addr = port->get_mac_addr(); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST); + arp_frame->arp_data.arp_sha = port->get_mac_addr(); + arp_frame->arp_data.arp_sip = port->get_ipv4(); + memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); + arp_frame->arp_data.arp_tip = ip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + + if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full"); + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + +/* Do a burst of RX on port */ +int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue) +{ + struct ether_hdr* hdr; + char* l2_data; + struct rte_mbuf* bufs[RX_BURST_SIZE]; + const uint16_t num_rx = + rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + return 0; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*); + l2_data = (char*)&hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _process_arp(port, queue, (struct arp_hdr*)l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum"); + } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum"); + } else { + _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } + return num_rx; +} + +int dpdk_io_service::_process_arp( + dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame) +{ + uint32_t dest_ip = arp_frame->arp_data.arp_sip; + struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> " + << dpdk::eth_addr_to_string(dest_addr)); + /* Add entry to ARP table */ + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dest_ip) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + return -ENOMEM; + } + entry = new (entry) dpdk::arp_entry(); + ether_addr_copy(&dest_addr, &entry->mac_addr); + port->_arp_table[dest_ip] = entry; + } else { + entry = port->_arp_table.at(dest_ip); + ether_addr_copy(&dest_addr, &entry->mac_addr); + for (auto req : entry->reqs) { + auto arp_data = (struct dpdk::arp_request*)req->data; + ether_addr_copy(&dest_addr, &arp_data->tha); + while (_servq.complete(req) == -ENOBUFS) + ; + } + entry->reqs.clear(); + } + rte_spinlock_unlock(&port->_spinlock); + + /* Respond if this was an ARP request */ + if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) + && arp_frame->arp_data.arp_tip == port->get_ipv4()) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply."); + port->_arp_reply(queue_id, arp_frame); + } + + return 0; +} + +int dpdk_io_service::_process_ipv4( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt) +{ + bool bcast = port->dst_is_broadcast(pkt->dst_addr); + if (pkt->dst_addr != port->get_ipv4() && !bcast) { + rte_pktmbuf_free(mbuf); + return -ENODEV; + } + if (pkt->next_proto_id == IPPROTO_UDP) { + return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast); + } + rte_pktmbuf_free(mbuf); + return -EINVAL; +} + + +int dpdk_io_service::_process_udp( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/) +{ + // Get the link + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = pkt->dst_port}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Get xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + if (rx_entry->empty()) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Turn rte_mbuf -> dpdk_frame_buff + auto link = rx_entry->front()->link; + link->enqueue_recv_mbuf(mbuf); + auto buff = link->get_recv_buff(0); + bool rcvr_found = false; + for (auto client_if : *rx_entry) { + // Check all the muxed receivers... + if (client_if->recv_cb(buff, link, link)) { + rcvr_found = true; + if (buff) { + assert(client_if->is_recv); + auto recv_io = (dpdk_recv_io*)client_if->io_client; + auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); + if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue"); + } else { + recv_io->_num_frames_in_use++; + assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames); + _wake_client(client_if); + } + } + break; + } + } + if (!rcvr_found) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found"); + // Release the buffer if no receiver found + link->release_recv_buff(std::move(buff)); + return -ENOENT; + } + return 0; +} + +/* Do a burst of TX on port's tx queues */ +int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port) +{ + unsigned int total_tx = 0; + auto& queues = _tx_queues.at(port->get_port_id()); + + for (auto& send_io : queues) { + unsigned int num_tx = rte_ring_count(send_io->_send_queue); + num_tx = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE; + bool replaced_buffers = false; + for (unsigned int i = 0; i < num_tx; i++) { + size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size(); + if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) { + break; + } + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual"); + break; + } + send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link); + // Attempt to replace buffer + buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0) + .release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + send_io->_num_frames_in_use--; + } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + send_io->_num_frames_in_use--; + } else { + replaced_buffers = true; + } + } + if (replaced_buffers) { + _wake_client(&send_io->_dpdk_io_if); + } + total_tx += num_tx; + } + + return total_tx; +} + +int dpdk_io_service::_rx_release(dpdk::dpdk_port* port) +{ + unsigned int total_bufs = 0; + auto& queues = _recv_xport_map.at(port->get_port_id()); + + for (auto& recv_io : queues) { + unsigned int num_buf = rte_ring_count(recv_io->_release_queue); + num_buf = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE; + for (unsigned int i = 0; i < num_buf; i++) { + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual"); + break; + } + recv_io->_fc_cb(frame_buff::uptr(buff_ptr), + recv_io->_dpdk_io_if.link, + recv_io->_dpdk_io_if.link); + recv_io->_num_frames_in_use--; + } + total_bufs += num_buf; + } + + return total_bufs; +} + +uint16_t dpdk_io_service::_get_unique_client_id() +{ + std::lock_guard<std::mutex> lock(_mutex); + if (_client_id_set.size() >= MAX_CLIENTS) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients"); + throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients"); + } + + uint16_t id = _next_client_id++; + while (_client_id_set.count(id)) { + id = _next_client_id++; + } + _client_id_set.insert(id); + return id; +} + +void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io) +{ + dpdk::wait_req* req; + if (dpdk_io->is_recv) { + auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + req = recv_io->_waiter; + } else { + auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + req = send_io->_waiter; + } + bool stat = req->mutex.try_lock(); + if (stat) { + bool active_req = !req->complete; + if (dpdk_io->next) { + // On the list: Take it off + if (dpdk_io->next == dpdk_io) { + // Only node on the list + _retry_head = NULL; + } else { + // Other nodes are on the list + if (_retry_head == dpdk_io) { + // Move list head to next + _retry_head = dpdk_io->next; + } + dpdk_io->next->prev = dpdk_io->prev; + dpdk_io->prev->next = dpdk_io->next; + } + dpdk_io->next = NULL; + dpdk_io->prev = NULL; + } + if (active_req) { + req->complete = true; + req->cond.notify_one(); + } + req->mutex.unlock(); + if (active_req) { + wait_req_put(req); + } + } else { + // Put on the retry list, if it isn't already + if (!dpdk_io->next) { + if (_retry_head) { + dpdk_io->next = _retry_head; + dpdk_io->prev = _retry_head->prev; + _retry_head->prev->next = dpdk_io; + _retry_head->prev = dpdk_io; + } else { + _retry_head = dpdk_io; + dpdk_io->next = dpdk_io; + dpdk_io->prev = dpdk_io; + } + } + } +} |