aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2019-12-01 21:58:13 -0800
committerBrent Stapleton <brent.stapleton@ettus.com>2019-12-20 16:32:22 -0800
commit4e38eef817813c1bbd8a9cf972e4cf0134d24308 (patch)
treef6200a048a7da5b7b588a4a9aae881ce7551825e /host
parent797d54bc2573688eebcb2c639cb07e4ab6d5ab9d (diff)
downloaduhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.gz
uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.tar.bz2
uhd-4e38eef817813c1bbd8a9cf972e4cf0134d24308.zip
dpdk: Add new DPDK stack to integrate with I/O services
docs: Update DPDK docs with new parameters: Parameter names have had their hyphens changed to underscores, and the I/O CPU argument is now named after the lcores and reflects the naming used by DPDK. transport: Add new udp_dpdk_link, based atop the new APIs: This link is tightly coupled with the DPDK I/O service. The link class carries all the address information to communicate with the other host, and it can send packets directly through the DPDK NIC ports. However, for receiving packets, the I/O service must pull the packets from the DMA queue and attach them to the appropriate link object. The link object merely formats the frame_buff object underneath, which is embedded in the rte_mbuf container. For get_recv_buff, the link will pull buffers only from its internal queue (the one filled by the I/O service). transport: Add DPDK-specific I/O service: The I/O service is split into two parts, the user threads and the I/O worker threads. The user threads submit requests through various appropriate queues, and the I/O threads perform all the I/O on their behalf. This includes routing UDP packets to the correct receiver and getting the MAC address of a destination (by performing the ARP request and handling the ARP replies). The DPDK context stores I/O services. The context spawns all I/O services on init(), and I/O services can be fetched from the dpdk_ctx object by using a port ID. I/O service clients: The clients have two lockless ring buffers. One is to get a buffer from the I/O service; the other is to release a buffer back to the I/O service. Threads sleeping on buffer I/O are kept in a separate list from the service queue and are processed in the course of doing RX or TX. The list nodes are embedded in the dpdk_io_if, and the head of the list is on the dpdk_io_service. The I/O service will transfer the embedded wait_req to the list if it cannot acquire the mutex to complete the condition for waking. Co-authored-by: Martin Braun <martin.braun@ettus.com> Co-authored-by: Ciro Nishiguchi <ciro.nishiguchi@ni.com> Co-authored-by: Brent Stapleton <brent.stapleton@ettus.com>
Diffstat (limited to 'host')
-rw-r--r--host/docs/dpdk.dox38
-rw-r--r--host/lib/include/uhdlib/transport/dpdk/arp.hpp29
-rw-r--r--host/lib/include/uhdlib/transport/dpdk/common.hpp202
-rw-r--r--host/lib/include/uhdlib/transport/dpdk/service_queue.hpp30
-rw-r--r--host/lib/include/uhdlib/transport/dpdk/udp.hpp115
-rw-r--r--host/lib/include/uhdlib/transport/dpdk_io_service.hpp246
-rw-r--r--host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp285
-rw-r--r--host/lib/include/uhdlib/transport/dpdk_simple.hpp33
-rw-r--r--host/lib/include/uhdlib/transport/link_base.hpp4
-rw-r--r--host/lib/include/uhdlib/transport/udp_dpdk_link.hpp267
-rw-r--r--host/lib/transport/CMakeLists.txt9
-rw-r--r--host/lib/transport/dpdk_simple.cpp272
-rw-r--r--host/lib/transport/udp_dpdk_link.cpp198
-rw-r--r--host/lib/transport/uhd-dpdk/CMakeLists.txt2
-rw-r--r--host/lib/transport/uhd-dpdk/dpdk_common.cpp258
-rw-r--r--host/lib/transport/uhd-dpdk/dpdk_io_service.cpp950
-rw-r--r--host/tests/CMakeLists.txt5
-rw-r--r--host/tests/dpdk_port_test.cpp42
18 files changed, 2633 insertions, 352 deletions
diff --git a/host/docs/dpdk.dox b/host/docs/dpdk.dox
index e840fc235..3f71231a5 100644
--- a/host/docs/dpdk.dox
+++ b/host/docs/dpdk.dox
@@ -75,7 +75,23 @@ load that driver with the following command:
modprobe vfio-pci
For NICs that require vfio-pci (like Intel's X520), you'll want to use the
-`dpdk-devbind.py` script to the vfio-pci driver.
+`dpdk-devbind.py` script to the vfio-pci driver. This script is shipped with
+DPDK and installed to `$prefix/share/dpdk/usertools`. If the NIC uses the
+vfio-pci driver, and the package was installed apt-get, then a typical invocation
+might be
+
+ /usr/share/dpdk/usertools/dpdk-devbind.py --bind=vfio-pci ens6f0
+
+If successful, the script might provide an updated status like this:
+
+ /usr/share/dpdk/usertools/dpdk-devbind.py -s
+
+ Network devices using DPDK-compatible driver
+ ============================================
+ 0000:02:00.0 '82599ES 10-Gigabit SFI/SFP+ Network Connection 10fb' drv=vfio-pci unused=ixgbe
+ [...]
+
+
See https://doc.dpdk.org/guides-18.11/linux_gsg/linux_drivers.html#binding-and-unbinding-network-ports-to-from-the-kernel-modules
for more details.
@@ -102,7 +118,7 @@ options:
;instead and swap between them
[use_dpdk=1]
;dpdk_mtu is the NIC's MTU setting
- ;This is separate from MPM's maximum packet size--tops out at 4000
+ ;This is separate from MPM's maximum packet size
dpdk_mtu=9000
;dpdk_driver is the -d flag for the DPDK EAL. If DPDK doesn't pick up the driver for your NIC
;automatically, you may need this argument to point it to the folder where it can find the drivers
@@ -115,8 +131,8 @@ options:
;dpdk_num_mbufs is the total number of packet buffers allocated
;to each direction's packet buffer pool
;This will be multiplied by the number of NICs, but NICs on the same
- ;CPU socket share a pool
- dpdk_num_mbufs=512
+ ;CPU socket share a pool.
+ dpdk_num_mbufs=4096
;dpdk_mbuf_cache_size is the number of buffers to cache for a CPU
;The cache reduces the interaction with the global pool
dpdk_mbuf_cache_size=64
@@ -127,20 +143,24 @@ address, and it must be in a particular format. Hex digits must all be lower
case, and octets must be separated by colons. Here is an example:
[dpdk_mac=3c:fd:fe:a2:a9:09]
- ;dpdk_io_cpu selects the CPU that this NIC's driver will run on
- ;Multiple NICs may occupy one CPU, but the I/O thread will completely
- ;consume that CPU. Also, 0 is reserved for the master thread (i.e.
+ ;dpdk_lcore selects the lcore that this NIC's driver will run on
+ ;Multiple NICs may occupy one lcore, but the I/O thread will completely
+ ;consume that lcore's CPU. Also, 0 is reserved for the master thread (i.e.
;the initial UHD thread that calls init() for DPDK). Attempting to
;use it as an I/O thread will only result in hanging.
- dpdk_io_cpu = 1
+ ;Note also that by default, the lcore ID will be the same as the CPU ID.
+ dpdk_lcore = 1
;dpdk_ipv4 specifies the IPv4 address, and both the address and
;subnet mask are required (and in this format!). DPDK uses the
;netmask to create a basic routing table. Routing to other networks
;(i.e. via gateways) is not permitted.
dpdk_ipv4 = 192.168.10.1/24
+ ;dpdk_num_desc is the number of descriptors in each DMA ring.
+ ;Must be a power of 2.
+ dpdk_num_desc=4096
[dpdk_mac=3c:fd:fe:a2:a9:0a]
- dpdk_io_cpu = 1
+ dpdk_lcore = 1
dpdk_ipv4 = 192.168.20.1/24
\section dpdk_using Using DPDK in UHD
diff --git a/host/lib/include/uhdlib/transport/dpdk/arp.hpp b/host/lib/include/uhdlib/transport/dpdk/arp.hpp
new file mode 100644
index 000000000..e71119bb3
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk/arp.hpp
@@ -0,0 +1,29 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_
+#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_
+
+#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/dpdk/service_queue.hpp>
+#include <rte_arp.h>
+
+namespace uhd { namespace transport { namespace dpdk {
+
+struct arp_request
+{
+ struct ether_addr tha;
+ port_id_t port;
+ ipv4_addr tpa;
+};
+
+struct arp_entry
+{
+ struct ether_addr mac_addr;
+ std::vector<wait_req*> reqs;
+};
+
+}}} /* namespace uhd::transport::dpdk */
+#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_ARP_HPP_ */
diff --git a/host/lib/include/uhdlib/transport/dpdk/common.hpp b/host/lib/include/uhdlib/transport/dpdk/common.hpp
index 1f4466a0f..ac3526e49 100644
--- a/host/lib/include/uhdlib/transport/dpdk/common.hpp
+++ b/host/lib/include/uhdlib/transport/dpdk/common.hpp
@@ -1,44 +1,130 @@
//
-// Copyright 2019 Ettus Research, a National Instruments brand
+// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
+
#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_
#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_
#include <uhd/config.hpp>
+#include <uhd/transport/frame_buff.hpp>
#include <uhd/types/device_addr.hpp>
-#include <uhd/utils/noncopyable.hpp>
-#include <uhd/utils/static.hpp>
+#include <uhdlib/transport/adapter_info.hpp>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_flow.h>
#include <rte_mbuf.h>
#include <rte_mempool.h>
+#include <rte_spinlock.h>
#include <rte_version.h>
#include <unordered_map>
#include <array>
#include <atomic>
#include <mutex>
+#include <set>
#include <string>
-/* NOTE: There are changes to rte_eth_addr in 19.x */
+/* NOTE: There are changes to all the network standard fields in 19.x */
+
+
+namespace uhd { namespace transport {
+
+class dpdk_io_service;
+
+namespace dpdk {
-namespace uhd { namespace transport { namespace dpdk {
+struct arp_entry;
using queue_id_t = uint16_t;
using port_id_t = uint16_t;
using ipv4_addr = uint32_t;
+class dpdk_adapter_info : public adapter_info
+{
+public:
+ dpdk_adapter_info(port_id_t port) : _port(port) {}
+ ~dpdk_adapter_info() {}
+
+ std::string to_string()
+ {
+ return std::string("DPDK:") + std::to_string(_port);
+ }
+
+ bool operator==(const dpdk_adapter_info& rhs) const
+ {
+ return (_port == rhs._port);
+ }
+
+private:
+ // Port ID
+ port_id_t _port;
+};
+
+
+/*!
+ * Packet/Frame buffer class for DPDK
+ *
+ * This class is intended to be placed in the private area of the rte_mbuf, and
+ * its memory is part of the rte_mbuf, so its life is tied to the underlying
+ * buffer (or more precisely, the encapsulating one).
+ */
+class dpdk_frame_buff : public frame_buff
+{
+public:
+ dpdk_frame_buff(struct rte_mbuf* mbuf) : _mbuf(mbuf)
+ {
+ _data = rte_pktmbuf_mtod(mbuf, void*);
+ _packet_size = 0;
+ }
+
+ ~dpdk_frame_buff() = default;
+
+ /*!
+ * Simple getter for the underlying rte_mbuf.
+ * The rte_mbuf may need further modification before sending packets,
+ * like adjusting the IP and UDP lengths.
+ */
+ inline struct rte_mbuf* get_pktmbuf()
+ {
+ return _mbuf;
+ }
+
+ /*!
+ * Move the data pointer by the indicated size, to some desired
+ * encapsulated frame.
+ *
+ * \param hdr_size Size (in bytes) of the headers to skip. Can be negative
+ * to pull the header back.
+ */
+ inline void header_jump(ssize_t hdr_size)
+ {
+ _data = (void*)((uint8_t*)_data + hdr_size);
+ }
+
+ //! Embedded list node's next ptr
+ dpdk_frame_buff* next = nullptr;
+ //! Embedded list node's prev ptr
+ dpdk_frame_buff* prev = nullptr;
+
+private:
+ struct rte_mbuf* _mbuf;
+};
+
+
+/*!
+ * The size (in bytes) of the private area reserved within the rte_mbuf.
+ * This portion of the rte_mbuf is used for the embedded dpdk_frame_buff data
+ * structure.
+ */
+constexpr size_t DPDK_MBUF_PRIV_SIZE =
+ RTE_ALIGN(sizeof(struct dpdk_frame_buff), RTE_MBUF_PRIV_ALIGN);
+
/*!
* Class representing a DPDK NIC port
*
* The dpdk_port object possesses all the data needed to send and receive
- * packets between this port and a remote host. A logical link should specify
- * which packets are destined for it and allocate a DMA queue with the
- * dpdk_port::alloc_queue() function. A logical link should not, however,
- * specify ARP packets for its set of received packets. That functionality is
- * reserved for the special queue 0.
+ * packets between this port and a remote host.
*
* The logical link can then get the packet buffer pools associated with this
* NIC port and use them to send and receive packets.
@@ -55,7 +141,7 @@ public:
* \param port The port ID
* \param mtu The intended MTU for the port
* \param num_queues Number of DMA queues to reserve for this port
- * \param num_mbufs The number of packet buffers per queue
+ * \param num_desc The number of descriptors per DMA queue
* \param rx_pktbuf_pool A pointer to the port's RX packet buffer pool
* \param tx_pktbuf_pool A pointer to the port's TX packet buffer pool
* \param ipv4_address The IPv4 network address (w/ netmask)
@@ -64,7 +150,7 @@ public:
static dpdk_port::uptr make(port_id_t port,
size_t mtu,
uint16_t num_queues,
- size_t num_mbufs,
+ uint16_t num_desc,
struct rte_mempool* rx_pktbuf_pool,
struct rte_mempool* tx_pktbuf_pool,
std::string ipv4_address);
@@ -72,11 +158,13 @@ public:
dpdk_port(port_id_t port,
size_t mtu,
uint16_t num_queues,
- size_t num_mbufs,
+ uint16_t num_desc,
struct rte_mempool* rx_pktbuf_pool,
struct rte_mempool* tx_pktbuf_pool,
std::string ipv4_address);
+ ~dpdk_port();
+
/*! Getter for this port's ID
* \return this port's ID
*/
@@ -85,6 +173,19 @@ public:
return _port;
}
+ inline dpdk_adapter_info get_adapter_info() const
+ {
+ return dpdk_adapter_info(_port);
+ }
+
+ /*! Getter for this port's MAC address
+ * \return this port's MAC address
+ */
+ inline ether_addr get_mac_addr() const
+ {
+ return _mac_addr;
+ }
+
/*! Getter for this port's MTU
* \return this port's MTU
*/
@@ -141,68 +242,45 @@ public:
* \param dst_ipv4_addr The destination IPv4 address (in network order)
* \return whether the destination address matches this port's broadcast address
*/
- inline bool dst_is_broadcast(const uint32_t dst_ipv4_addr) const
+ inline bool dst_is_broadcast(const ipv4_addr dst_ipv4_addr) const
{
uint32_t network = _netmask | ((~_netmask) & dst_ipv4_addr);
return (network == 0xffffffff);
}
- /*! Allocate a DMA queue (TX/RX pair) and use the specified flow pattern
- * to route packets to the RX queue.
- *
- * \pattern recv_pattern The flow pattern to use for directing traffic to
- * the allocated RX queue.
- * \return The queue ID for the allocated queue
- * \throw uhd::runtime_error when there are no free queues
- */
- queue_id_t alloc_queue(struct rte_flow_pattern recv_pattern[]);
-
- /*! Free a previously allocated queue and tear down the associated flow rule
- * \param queue The ID of the queue to free
- * \throw std::out_of_range when the queue ID is not currently allocated
- */
- void free_queue(queue_id_t queue);
-
/*!
- * Process ARP request/reply
+ * Allocate a UDP port and return it in network order
+ *
+ * \param udp_port UDP port to attempt to allocate. Use 0 for no preference.
+ * \return 0 for failure, else the allocated UDP port in network order.
*/
- // int process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr *arp_frame);
+ uint16_t alloc_udp_port(uint16_t udp_port);
private:
+ friend uhd::transport::dpdk_io_service;
+
/*!
* Construct and transmit an ARP reply (for the given ARP request)
*/
- int _arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req);
+ int _arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req);
port_id_t _port;
size_t _mtu;
+ size_t _num_queues;
struct rte_mempool* _rx_pktbuf_pool;
struct rte_mempool* _tx_pktbuf_pool;
struct ether_addr _mac_addr;
ipv4_addr _ipv4;
ipv4_addr _netmask;
- size_t _num_queues;
- std::vector<queue_id_t> _free_queues;
- std::unordered_map<queue_id_t, struct rte_flow*> _flow_rules;
- /* Need ARP table
- * To implement ARP service, maybe create ARP xport
- * Then need dpdk_udp_link and dpdk_raw_link
- *
- * ...Or just do it inline with dpdk_ctx
- *
- * And link can just save the result (do it during constructor)
- *
- *
- * But what about the service that _responds_ to ARP requests?!
- *
- * Maybe have to connect a DPDK link in stages:
- * First, create the ARP service and attach it to the dpdk_ctx
- * dpdk_ctx must own the links...?
- * Or! Always burn a DMA engine for ARP
- *
- * Maybe have a shared_ptr to an ARP service here?
- */
+
+ // Structures protected by mutex
std::mutex _mutex;
+ std::set<uint16_t> _udp_ports;
+ uint16_t _next_udp_port = 0xffff;
+
+ // Structures protected by spin lock
+ rte_spinlock_t _spinlock = RTE_SPINLOCK_INITIALIZER;
+ std::unordered_map<ipv4_addr, struct arp_entry*> _arp_table;
};
@@ -267,17 +345,21 @@ public:
int get_port_link_status(port_id_t portid) const;
/*!
- * Get port ID for routing packet destined for given address
+ * Get port for routing packet destined for given address
* \param addr Destination address
- * \return port ID from routing table
+ * \return pointer to the port from routing table
*/
- int get_route(const std::string& addr) const;
+ dpdk_port* get_route(const std::string& addr) const;
/*!
* \return whether init() has been called
*/
bool is_init_done(void) const;
+ /*! Return a reference to an IO service given a port ID
+ */
+ std::shared_ptr<uhd::transport::dpdk_io_service> get_io_service(const size_t port_id);
+
private:
/*! Convert the args to DPDK's EAL args and Initialize the EAL
*
@@ -312,8 +394,12 @@ private:
std::unordered_map<port_id_t, dpdk_port::uptr> _ports;
std::vector<struct rte_mempool*> _rx_pktbuf_pools;
std::vector<struct rte_mempool*> _tx_pktbuf_pools;
+ // Store all the I/O services, and also store the corresponding port ID
+ std::map<std::shared_ptr<uhd::transport::dpdk_io_service>, std::vector<size_t>>
+ _io_srv_portid_map;
};
-}}} // namespace uhd::transport::dpdk
+} // namespace dpdk
+}} // namespace uhd::transport
#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_COMMON_HPP_ */
diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp
index 7c9917079..c95786864 100644
--- a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp
+++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp
@@ -28,6 +28,14 @@ enum wait_type {
WAIT_FLOW_OPEN,
//! Wake once the flow/socket is destroyed
WAIT_FLOW_CLOSE,
+ //! Wake once the new transport is connected
+ WAIT_XPORT_CONNECT,
+ //! Wake once the transport is disconnected
+ WAIT_XPORT_DISCONNECT,
+ //! Wake when MAC address found for IP address
+ WAIT_ARP,
+ //! Wake once the I/O worker terminates
+ WAIT_LCORE_TERM,
//! Number of possible reasons for waiting
WAIT_TYPE_COUNT
};
@@ -50,6 +58,8 @@ struct wait_req
std::mutex mutex;
//! Whether the request was completed
bool complete;
+ //! The status or error code associated with the request
+ int retval;
//! An atomic reference counter for managing this request object's memory
rte_atomic32_t refcnt;
};
@@ -110,7 +120,7 @@ class service_queue
public:
/*!
* Create a service queue
- * \param depth Must be a power of 2
+ * \param depth Must be a power of 2. Actual size is less.
* \param lcore_id The DPDK lcore_id associated with this service queue
*/
service_queue(size_t depth, unsigned int lcore_id)
@@ -227,6 +237,24 @@ public:
return stat;
}
+ /*!
+ * Get the size of the service queue
+ * \return size of the service queue
+ */
+ inline size_t get_size()
+ {
+ return rte_ring_get_size(_waiter_ring);
+ }
+
+ /*!
+ * Get the capacity of the service queue
+ * \return capacity of the service queue
+ */
+ inline size_t get_capacity()
+ {
+ return rte_ring_get_capacity(_waiter_ring);
+ }
+
private:
//! Multi-producer, single-consumer ring for requests
struct rte_ring* _waiter_ring;
diff --git a/host/lib/include/uhdlib/transport/dpdk/udp.hpp b/host/lib/include/uhdlib/transport/dpdk/udp.hpp
new file mode 100644
index 000000000..65e561315
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk/udp.hpp
@@ -0,0 +1,115 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_
+#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_
+
+#include <uhdlib/transport/dpdk/common.hpp>
+#include <arpa/inet.h>
+#include <netinet/udp.h>
+#include <rte_ip.h>
+#include <rte_udp.h>
+#include <boost/format.hpp>
+
+namespace uhd { namespace transport { namespace dpdk {
+
+constexpr size_t HDR_SIZE_UDP_IPV4 =
+ (sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr));
+
+/*!
+ * An enumerated type representing the type of flow for an IPv4 client
+ * Currently, only UDP is supported (FLOW_TYPE_UDP)
+ */
+enum flow_type {
+ FLOW_TYPE_UDP,
+ FLOW_TYPE_COUNT,
+};
+
+/*!
+ * A tuple for IPv4 flows that can be used for hashing
+ */
+struct ipv4_5tuple
+{
+ enum flow_type flow_type;
+ ipv4_addr src_ip;
+ ipv4_addr dst_ip;
+ uint16_t src_port;
+ uint16_t dst_port;
+};
+
+inline void fill_ipv4_hdr(struct rte_mbuf* mbuf,
+ const dpdk_port* port,
+ uint32_t dst_ipv4_addr,
+ uint8_t proto_id,
+ uint32_t payload_len)
+{
+ struct ether_hdr* eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
+ struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)&eth_hdr[1];
+
+ ip_hdr->version_ihl = 0x40 | 5;
+ ip_hdr->type_of_service = 0;
+ ip_hdr->total_length = rte_cpu_to_be_16(20 + payload_len);
+ ip_hdr->packet_id = 0;
+ ip_hdr->fragment_offset = rte_cpu_to_be_16(IPV4_HDR_DF_FLAG);
+ ip_hdr->time_to_live = 64;
+ ip_hdr->next_proto_id = proto_id;
+ ip_hdr->hdr_checksum = 0; // Require HW offload
+ ip_hdr->src_addr = port->get_ipv4();
+ ip_hdr->dst_addr = dst_ipv4_addr;
+
+ mbuf->ol_flags = PKT_TX_IP_CKSUM | PKT_TX_IPV4;
+ mbuf->l2_len = sizeof(struct ether_hdr);
+ mbuf->l3_len = sizeof(struct ipv4_hdr);
+ mbuf->pkt_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+ mbuf->data_len = sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + payload_len;
+}
+
+/* All values except payload length must be in network order */
+inline void fill_udp_hdr(struct rte_mbuf* mbuf,
+ const dpdk_port* port,
+ uint32_t dst_ipv4_addr,
+ uint16_t src_port,
+ uint16_t dst_port,
+ uint32_t payload_len)
+{
+ struct ether_hdr* eth_hdr;
+ struct ipv4_hdr* ip_hdr;
+ struct udp_hdr* tx_hdr;
+
+ fill_ipv4_hdr(
+ mbuf, port, dst_ipv4_addr, IPPROTO_UDP, sizeof(struct udp_hdr) + payload_len);
+
+ eth_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
+ ip_hdr = (struct ipv4_hdr*)&eth_hdr[1];
+ tx_hdr = (struct udp_hdr*)&ip_hdr[1];
+
+ tx_hdr->src_port = src_port;
+ tx_hdr->dst_port = dst_port;
+ tx_hdr->dgram_len = rte_cpu_to_be_16(8 + payload_len);
+ tx_hdr->dgram_cksum = 0;
+ mbuf->l4_len = sizeof(struct udp_hdr);
+}
+
+//! Return an IPv4 address (numeric, in network order) into a string
+inline std::string ipv4_num_to_str(const uint32_t ip_addr)
+{
+ char addr_str[INET_ADDRSTRLEN];
+ struct in_addr ipv4_addr;
+ ipv4_addr.s_addr = ip_addr;
+ inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ return std::string(addr_str);
+}
+
+inline std::string eth_addr_to_string(const struct ether_addr mac_addr)
+{
+ auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx");
+ mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1]
+ % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3]
+ % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5];
+ return mac_stream.str();
+}
+
+}}} /* namespace uhd::transport::dpdk */
+#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_UDP_HPP_ */
diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp
new file mode 100644
index 000000000..8e1fb29d0
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk_io_service.hpp
@@ -0,0 +1,246 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_
+#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_
+
+#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/dpdk/service_queue.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <rte_arp.h>
+#include <rte_hash.h>
+#include <rte_ip.h>
+#include <rte_mbuf.h>
+#include <rte_udp.h>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+class dpdk_send_io;
+class dpdk_recv_io;
+struct dpdk_io_if;
+
+class dpdk_io_service : public virtual io_service,
+ public std::enable_shared_from_this<dpdk_io_service>
+{
+public:
+ using sptr = std::shared_ptr<dpdk_io_service>;
+
+ static sptr make(
+ unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth);
+
+ ~dpdk_io_service();
+
+ // Add entry to RX flow table
+ // This yields a link, which is then used for attaching to a buffer
+ // We yank from the link immediately following, then process at the transport level
+ // (so two tables here, one for transports, one for links)
+ void attach_recv_link(recv_link_if::sptr link);
+
+ // Create object to hold set of queues, to go in TX table
+ void attach_send_link(send_link_if::sptr link);
+
+ void detach_recv_link(recv_link_if::sptr link);
+
+ void detach_send_link(send_link_if::sptr link);
+
+ recv_io_if::sptr make_recv_client(recv_link_if::sptr data_link,
+ size_t num_recv_frames,
+ recv_callback_t cb,
+ send_link_if::sptr fc_link,
+ size_t num_send_frames,
+ recv_io_if::fc_callback_t fc_cb);
+
+ send_io_if::sptr make_send_client(send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_io_if::send_callback_t send_cb,
+ recv_link_if::sptr recv_link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb);
+
+
+private:
+ friend class dpdk_recv_io;
+ friend class dpdk_send_io;
+
+ dpdk_io_service(
+ unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth);
+ dpdk_io_service(const dpdk_io_service&) = delete;
+
+ /*!
+ * I/O worker function to be passed to the DPDK lcore
+ *
+ * The argument must be a pointer to *this* dpdk_io_service
+ *
+ * \param arg a pointer to this dpdk_io_service
+ * \return 0 for normal termination, else nonzero
+ */
+ static int _io_worker(void* arg);
+
+ /*!
+ * Helper function for I/O thread to process requests on its service queue
+ */
+ int _service_requests();
+
+ /*!
+ * Helper function for I/O thread to service a WAIT_FLOW_OPEN request
+ *
+ * \param req The requester's wait_req object
+ */
+ void _service_flow_open(dpdk::wait_req* req);
+
+ /*!
+ * Helper function for I/O thread to service a WAIT_FLOW_CLOSE request
+ *
+ * \param req The requester's wait_req object
+ */
+ void _service_flow_close(dpdk::wait_req* req);
+
+ /*!
+ * Helper function for I/O thread to service a WAIT_XPORT_CONNECT request
+ *
+ * \param req The requester's wait_req object
+ */
+ void _service_xport_connect(dpdk::wait_req* req);
+
+ /*!
+ * Helper function for I/O thread to service a WAIT_XPORT_DISCONNECT request
+ *
+ * \param req The requester's wait_req object
+ */
+ void _service_xport_disconnect(dpdk::wait_req* req);
+
+ /*!
+ * Get Ethernet MAC address for the given IPv4 address, and wake the
+ * requester when finished.
+ * This may only be called by an I/O service, on behalf of a requester's
+ * WAIT_ARP request.
+ *
+ * \param req The requester's wait_req object
+ * \return 0 if address was written, -EAGAIN if request was queued for
+ * later completion, -ENOMEM if ran out of memory to complete
+ * request
+ */
+ int _service_arp_request(dpdk::wait_req* req);
+
+ /*!
+ * Helper function for I/O thread to do a burst of packet retrieval and
+ * processing on an RX queue
+ *
+ * \param port the DPDK NIC port used for RX
+ * \param queue the DMA queue on the port to recv from
+ */
+ int _rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue);
+
+ /*!
+ * Helper function for I/O thread to do a burst of packet transmission on a
+ * TX queue
+ *
+ * \param port the DPDK NIC port used for TX
+ * \return number of buffers transmitted
+ */
+ int _tx_burst(dpdk::dpdk_port* port);
+
+ /*!
+ * Helper function for I/O thread to release a burst of buffers from an RX
+ * release queue
+ *
+ * \param port the DPDK NIC port used for RX
+ * \return number of buffers released
+ */
+ int _rx_release(dpdk::dpdk_port* port);
+
+ /*!
+ * Helper function for I/O thread to do send an ARP request
+ *
+ * \param port the DPDK NIC port to send the ARP request through
+ * \param queue the DMA queue on the port to send to
+ * \param ip the IPv4 address for which the caller is seeking a MAC address
+ */
+ int _send_arp_request(
+ dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip);
+
+ /*!
+ * Helper function for I/O thread to process an ARP request/reply
+ *
+ * \param port the DPDK NIC port to send any ARP replies from
+ * \param queue the DMA queue on the port to send ARP replies to
+ * \param arp_frame a pointer to the ARP frame
+ */
+ int _process_arp(
+ dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame);
+
+ /*!
+ * Helper function for I/O thread to process an IPv4 packet
+ *
+ * \param port the DPDK NIC port to send any ARP replies from
+ * \param mbuf a pointer to the packet buffer container
+ * \param pkt a pointer to the IPv4 header of the packet
+ */
+ int _process_ipv4(dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt);
+
+ /*!
+ * Helper function for I/O thread to process an IPv4 packet
+ *
+ * \param port the DPDK NIC port to send any ARP replies from
+ * \param mbuf a pointer to the packet buffer container
+ * \param pkt a pointer to the UDP header of the packet
+ * \param bcast whether this packet was destined for the port's broadcast
+ * IPv4 address
+ */
+ int _process_udp(
+ dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool bcast);
+
+ /*!
+ * Helper function to get a unique client ID
+ *
+ * \return a unique client ID
+ */
+ uint16_t _get_unique_client_id();
+
+ /*!
+ * Attempt to wake client
+ */
+ void _wake_client(dpdk_io_if* dpdk_io);
+
+ //! The reference to the DPDK context
+ std::weak_ptr<dpdk::dpdk_ctx> _ctx;
+ //! The lcore running this dpdk_io_service's work routine
+ unsigned int _lcore_id;
+ //! The NIC ports served by this dpdk_io_service
+ std::vector<dpdk::dpdk_port*> _ports;
+ //! The set of TX queues associated with a given port
+ std::unordered_map<dpdk::port_id_t, std::list<dpdk_send_io*>> _tx_queues;
+ //! The list of recv_io for each port
+ std::unordered_map<dpdk::port_id_t, std::list<dpdk_recv_io*>> _recv_xport_map;
+ //! The RX table, which provides lists of dpdk_recv_io for an IPv4 tuple
+ struct rte_hash* _rx_table;
+ //! Service queue for clients to make requests
+ dpdk::service_queue _servq;
+ //! Retry list for waking clients
+ dpdk_io_if* _retry_head = NULL;
+
+ //! Mutex to protect below data structures
+ std::mutex _mutex;
+ //! The recv links attached to this I/O service (managed client side)
+ std::list<recv_link_if::sptr> _recv_links;
+ //! The send links attached to this I/O service (managed client side)
+ std::list<send_link_if::sptr> _send_links;
+ //! Set of IDs for new clients
+ std::set<uint16_t> _client_id_set;
+ //! Next ID to try
+ uint16_t _next_client_id;
+
+ static constexpr int MAX_PENDING_SERVICE_REQS = 32;
+ static constexpr int MAX_FLOWS = 128;
+ static constexpr int MAX_CLIENTS = 2048;
+ static constexpr int RX_BURST_SIZE = 16;
+ static constexpr int TX_BURST_SIZE = 16;
+};
+
+}} // namespace uhd::transport
+
+#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_HPP_ */
diff --git a/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp
new file mode 100644
index 000000000..451cc1531
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk_io_service_client.hpp
@@ -0,0 +1,285 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_
+#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_
+
+#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/dpdk/service_queue.hpp>
+#include <uhdlib/transport/dpdk/udp.hpp>
+#include <uhdlib/transport/dpdk_io_service.hpp>
+#include <uhdlib/transport/udp_dpdk_link.hpp>
+#include <rte_ring.h>
+#include <chrono>
+
+namespace uhd { namespace transport {
+
+struct dpdk_flow_data
+{
+ //! The UDP DPDK link
+ udp_dpdk_link* link;
+ //! Is it a recv_link_if? Or a send_link_if?
+ bool is_recv;
+};
+
+/*! DPDK I/O interface for service requests
+ *
+ * This is used to pass around information about the I/O clients. It is what is
+ * passed into the data portion of a request, for connect and disconnect
+ * requests.
+ *
+ */
+struct dpdk_io_if
+{
+ dpdk_io_if(bool is_recv,
+ udp_dpdk_link* link,
+ dpdk_io_service::sptr io_srv,
+ recv_callback_t recv_cb)
+ : is_recv(is_recv), link(link), io_srv(io_srv), recv_cb(recv_cb)
+ {
+ }
+
+ bool is_recv;
+ udp_dpdk_link* link;
+ dpdk_io_service::sptr io_srv;
+ recv_callback_t recv_cb;
+ void* io_client;
+ //! Embedded list node
+ dpdk_io_if* next = NULL;
+ dpdk_io_if* prev = NULL;
+};
+
+// This must be tied to a link for frame_buffs
+// so need map of dpdk_send_io to udp_dpdk_link
+// Have queue pair: buffs to send + free buffs
+// There used to be a retry queue: Still needed? (Maybe 1 per port?)
+class dpdk_send_io : public virtual send_io_if
+{
+public:
+ using sptr = std::shared_ptr<dpdk_send_io>;
+
+ dpdk_send_io(dpdk_io_service::sptr io_srv,
+ udp_dpdk_link* link,
+ size_t num_send_frames,
+ send_callback_t send_cb,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb,
+ fc_callback_t fc_cb)
+ : _dpdk_io_if(false, link, io_srv, recv_cb)
+ , _servq(io_srv->_servq)
+ , _send_cb(send_cb)
+ , _fc_cb(fc_cb)
+ {
+ // Get reference to DPDK context, since this owns some DPDK memory
+ _ctx = dpdk::dpdk_ctx::get();
+ _num_send_frames = num_send_frames;
+ _num_recv_frames = num_recv_frames;
+
+ // Create the free buffer and send queues
+ // Must be power of 2, and add one since usable ring is size-1
+ size_t queue_size = (size_t)exp2(ceil(log2(num_send_frames + 1)));
+ dpdk::port_id_t nic_port = link->get_port()->get_port_id();
+ uint16_t id = io_srv->_get_unique_client_id();
+ char name[16];
+ snprintf(name, sizeof(name), "tx%hu-%hu", nic_port, id);
+ _buffer_queue = rte_ring_create(
+ name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
+ snprintf(name, sizeof(name), "~tx%hu-%hu", nic_port, id);
+ _send_queue = rte_ring_create(
+ name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
+ UHD_LOG_TRACE("DPDK::SEND_IO", "dpdk_send_io() " << _buffer_queue->name);
+
+ // Create the wait_request object that gets passed around
+ _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_TX_BUF, (void*)&_dpdk_io_if);
+ _waiter->complete = true;
+ }
+
+ ~dpdk_send_io()
+ {
+ UHD_LOG_TRACE("DPDK::SEND_IO", "~dpdk_send_io() " << _buffer_queue->name);
+ // Deregister with I/O service
+ auto xport_req = dpdk::wait_req_alloc(
+ dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if);
+ _servq.submit(xport_req, std::chrono::microseconds(-1));
+
+ // Clean up
+ wait_req_put(xport_req);
+ rte_ring_free(_send_queue);
+ rte_ring_free(_buffer_queue);
+ wait_req_put(_waiter);
+ }
+
+ bool wait_for_dest_ready(size_t /*num_bytes*/, int32_t /*timeout_ms*/)
+ {
+ // For this I/O service, the destination is the queue to the offload
+ // thread. The queue is always able to accomodate new packets since it
+ // is sized to fit all the frames reserved from the link.
+ return true;
+ }
+
+ frame_buff::uptr get_send_buff(int32_t timeout_ms)
+ {
+ frame_buff* buff_ptr;
+ if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) {
+ if (!timeout_ms) {
+ return frame_buff::uptr();
+ }
+ // Nothing in the queue. Try waiting if there is a timeout.
+ auto timeout_point =
+ std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+ std::unique_lock<std::mutex> lock(_waiter->mutex);
+ wait_req_get(_waiter);
+ _waiter->complete = false;
+ auto is_complete = [this] { return !rte_ring_empty(_buffer_queue); };
+ if (timeout_ms < 0) {
+ _waiter->cond.wait(lock, is_complete);
+ } else {
+ auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete);
+ if (!status) {
+ return frame_buff::uptr();
+ }
+ }
+ if (rte_ring_dequeue(_buffer_queue, (void**)&buff_ptr)) {
+ return frame_buff::uptr();
+ }
+ }
+ return frame_buff::uptr(buff_ptr);
+ }
+
+ void release_send_buff(frame_buff::uptr buff)
+ {
+ auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release();
+ assert(buff_ptr);
+ int status = rte_ring_enqueue(_send_queue, buff_ptr);
+ if (status != 0) {
+ assert(false);
+ }
+ // TODO: Should we retry if it failed?
+ }
+
+private:
+ friend class dpdk_io_service;
+
+ dpdk_io_if _dpdk_io_if;
+ size_t _num_frames_in_use = 0;
+
+ dpdk::service_queue& _servq;
+ dpdk::dpdk_ctx::sptr _ctx;
+ struct rte_ring* _buffer_queue;
+ struct rte_ring* _send_queue;
+ dpdk::wait_req* _waiter;
+ send_callback_t _send_cb;
+ fc_callback_t _fc_cb;
+};
+
+class dpdk_recv_io : public virtual recv_io_if
+{
+public:
+ using sptr = std::shared_ptr<dpdk_recv_io>;
+
+ dpdk_recv_io(dpdk_io_service::sptr io_srv,
+ udp_dpdk_link* link,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb,
+ size_t num_send_frames,
+ fc_callback_t fc_cb)
+ : _dpdk_io_if(true, link, io_srv, recv_cb)
+ , _servq(io_srv->_servq)
+ , _fc_cb(fc_cb) // Call on release
+ {
+ // Get reference to DPDK context, since this owns some DPDK memory
+ _ctx = dpdk::dpdk_ctx::get();
+ _num_send_frames = num_send_frames;
+ _num_recv_frames = num_recv_frames;
+ // Create the recv and release queues
+ // Must be power of 2, and add one since usable ring is size-1
+ size_t queue_size = (size_t)exp2(ceil(log2(num_recv_frames + 1)));
+ dpdk::port_id_t nic_port = link->get_port()->get_port_id();
+ uint16_t id = io_srv->_get_unique_client_id();
+ UHD_LOG_DEBUG("DPDK::IO_SERVICE", "Creating recv client with queue size of " << queue_size);
+ char name[16];
+ snprintf(name, sizeof(name), "rx%hu-%hu", nic_port, id);
+ _recv_queue = rte_ring_create(
+ name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
+ snprintf(name, sizeof(name), "~rx%hu-%hu", nic_port, id);
+ _release_queue = rte_ring_create(
+ name, queue_size, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
+ UHD_LOG_TRACE("DPDK::RECV_IO", "dpdk_recv_io() " << _recv_queue->name);
+ // Create the wait_request object that gets passed around
+ _waiter = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_RX, (void*)&_dpdk_io_if);
+ _waiter->complete = true;
+ }
+
+ ~dpdk_recv_io()
+ {
+ // Deregister with I/O service
+ UHD_LOG_TRACE("DPDK::RECV_IO", "~dpdk_recv_io() " << _recv_queue->name);
+ auto xport_req = dpdk::wait_req_alloc(
+ dpdk::wait_type::WAIT_XPORT_DISCONNECT, (void*)&_dpdk_io_if);
+ _servq.submit(xport_req, std::chrono::microseconds(-1));
+
+ // Clean up
+ wait_req_put(xport_req);
+ rte_ring_free(_recv_queue);
+ rte_ring_free(_release_queue);
+ wait_req_put(_waiter);
+ }
+
+ frame_buff::uptr get_recv_buff(int32_t timeout_ms)
+ {
+ frame_buff* buff_ptr;
+ if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) {
+ if (!timeout_ms) {
+ return frame_buff::uptr();
+ }
+ // Nothing in the queue. Try waiting if there is a timeout.
+ auto timeout_point =
+ std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+ std::unique_lock<std::mutex> lock(_waiter->mutex);
+ wait_req_get(_waiter);
+ _waiter->complete = false;
+ auto is_complete = [this] { return !rte_ring_empty(_recv_queue); };
+ if (timeout_ms < 0) {
+ _waiter->cond.wait(lock, is_complete);
+ } else {
+ auto status = _waiter->cond.wait_until(lock, timeout_point, is_complete);
+ if (!status) {
+ return frame_buff::uptr();
+ }
+ }
+ if (rte_ring_dequeue(_recv_queue, (void**)&buff_ptr)) {
+ return frame_buff::uptr();
+ }
+ }
+ return frame_buff::uptr(buff_ptr);
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ frame_buff* buff_ptr = buff.release();
+ int status = rte_ring_enqueue(_release_queue, buff_ptr);
+ if (status != 0) {
+ assert(false);
+ }
+ }
+
+private:
+ friend class dpdk_io_service;
+
+ dpdk_io_if _dpdk_io_if;
+ size_t _num_frames_in_use = 0;
+
+ dpdk::service_queue& _servq;
+ dpdk::dpdk_ctx::sptr _ctx;
+ struct rte_ring* _recv_queue;
+ struct rte_ring* _release_queue;
+ dpdk::wait_req* _waiter;
+ fc_callback_t _fc_cb;
+};
+
+
+}} // namespace uhd::transport
+
+#endif /* _INCLUDED_UHDLIB_TRANSPORT_DPDK_IO_SERVICE_CLIENT_HPP_ */
diff --git a/host/lib/include/uhdlib/transport/dpdk_simple.hpp b/host/lib/include/uhdlib/transport/dpdk_simple.hpp
index 86abaeff8..8420510ea 100644
--- a/host/lib/include/uhdlib/transport/dpdk_simple.hpp
+++ b/host/lib/include/uhdlib/transport/dpdk_simple.hpp
@@ -8,7 +8,6 @@
#define INCLUDED_DPDK_SIMPLE_HPP
#include <uhd/transport/udp_simple.hpp>
-#include <uhdlib/transport/dpdk_common.hpp>
namespace uhd { namespace transport {
@@ -17,35 +16,11 @@ class dpdk_simple : public udp_simple
public:
virtual ~dpdk_simple(void) = 0;
- /*!
- * Make a new connected dpdk transport:
- * This transport is for sending and receiving
- * between this host and a single endpoint.
- * The primary usage for this transport will be control transactions.
- *
- * The address must be an ipv4 address.
- * The port must be a number.
- *
- * \param addr a string representing the destination address
- * \param port a string representing the destination port
- */
- static udp_simple::sptr make_connected(struct uhd_dpdk_ctx &ctx,
- const std::string &addr, const std::string &port);
+ static udp_simple::sptr make_connected(
+ const std::string& addr, const std::string& port);
- /*!
- * Make a new broadcasting dpdk transport:
- * This transport can send broadcast datagrams
- * and receive datagrams from multiple sources.
- * The primary usage for this transport will be to discover devices.
- *
- * The address must be an ipv4 address.
- * The port must be a number.
- *
- * \param addr a string representing the destination address
- * \param port a string representing the destination port
- */
- static udp_simple::sptr make_broadcast(struct uhd_dpdk_ctx &ctx,
- const std::string &addr, const std::string &port);
+ static udp_simple::sptr make_broadcast(
+ const std::string& addr, const std::string& port);
/*!
* Send a single buffer.
diff --git a/host/lib/include/uhdlib/transport/link_base.hpp b/host/lib/include/uhdlib/transport/link_base.hpp
index e4d329e2a..a57b681ca 100644
--- a/host/lib/include/uhdlib/transport/link_base.hpp
+++ b/host/lib/include/uhdlib/transport/link_base.hpp
@@ -56,7 +56,7 @@ private:
* the link interface methods.
*
* This template requires the following methods in the derived class:
- * bool get_send_buf_derived(frame_buff& buf, int32_t timeout_ms);
+ * bool get_send_buff_derived(frame_buff& buf, int32_t timeout_ms);
* void release_send_buf_derived(frame_buff& buf);
*
* Additionally, the subclass must call preload_free_buf for each frame_buff
@@ -145,7 +145,7 @@ private:
* the link interface methods.
*
* This template requires the following methods in the derived class:
- * bool get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms);
+ * size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms);
* void release_recv_buff_derived(frame_buff& buff);
*
* Additionally, the subclass must call preload_free_buff for each
diff --git a/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp
new file mode 100644
index 000000000..eaf3cf7c4
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/udp_dpdk_link.hpp
@@ -0,0 +1,267 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP
+#define INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/types/device_addr.hpp>
+#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/link_if.hpp>
+#include <uhdlib/transport/links.hpp>
+#include <rte_udp.h>
+#include <cassert>
+#include <string>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * A zero copy transport interface to the dpdk DMA library.
+ */
+class udp_dpdk_link : public virtual recv_link_if, public virtual send_link_if
+{
+public:
+ using sptr = std::shared_ptr<udp_dpdk_link>;
+
+ udp_dpdk_link(const dpdk::port_id_t port_id,
+ const std::string& remote_addr,
+ const std::string& remote_port,
+ const std::string& local_port,
+ const link_params_t& params);
+
+ ~udp_dpdk_link();
+
+ /*!
+ * Make a new dpdk link. Get port ID from routing table.
+ *
+ * \param remote_addr Remote IP address
+ * \param remote_port Remote UDP port
+ * \param params Values for frame sizes, num frames, and buffer sizes
+ * \return a shared_ptr to a new udp dpdk link
+ */
+ static sptr make(const std::string& remote_addr,
+ const std::string& remote_port,
+ const link_params_t& params);
+
+ /*!
+ * Make a new dpdk link. User specifies DPDK port ID directly.
+ *
+ * \param port_id DPDK port ID to use for communication
+ * \param remote_addr Remote IP address
+ * \param remote_port Remote UDP port
+ * \param local_port Local UDP port
+ * \param params Values for frame sizes, num frames, and buffer sizes
+ * \return a shared_ptr to a new udp dpdk link
+ */
+ static sptr make(const dpdk::port_id_t port_id,
+ const std::string& remote_addr,
+ const std::string& remote_port,
+ const std::string& local_port,
+ const link_params_t& params);
+
+ /*!
+ * Get the associated dpdk_port
+ *
+ * \return a pointer to the dpdk_port used by this link
+ */
+ inline dpdk::dpdk_port* get_port()
+ {
+ return _port;
+ }
+
+ /*!
+ * Get the DMA queue associated with this link
+ *
+ * \return the queue ID for this link's DMA queue
+ */
+ inline dpdk::queue_id_t get_queue_id()
+ {
+ return _queue;
+ }
+
+ /*!
+ * Get the local UDP port used by this link
+ *
+ * \return the local UDP port, in network order
+ */
+ inline uint16_t get_local_port()
+ {
+ return _local_port;
+ }
+
+ /*!
+ * Get the remote UDP port used by this link
+ *
+ * \return the remote UDP port, in network order
+ */
+ inline uint16_t get_remote_port()
+ {
+ return _remote_port;
+ }
+
+ /*!
+ * Get the remote IPv4 address used by this link
+ *
+ * \return the remote IPv4 address, in network order
+ */
+ inline uint32_t get_remote_ipv4()
+ {
+ return _remote_ipv4;
+ }
+
+ /*!
+ * Set the remote host's MAC address
+ * This MAC address must be filled in for the remote IPv4 address before
+ * the link can reach its destination.
+ *
+ * \param mac the remote host's MAC address
+ */
+ inline void set_remote_mac(struct ether_addr& mac)
+ {
+ ether_addr_copy(&mac, &_remote_mac);
+ }
+
+ /*!
+ * Get the remote host's MAC address
+ *
+ * \param mac Where to write the MAC address
+ */
+ inline void get_remote_mac(struct ether_addr& dst)
+ {
+ ether_addr_copy(&_remote_mac, &dst);
+ }
+
+ /*!
+ * Get the number of frame buffers that can be queued by this link.
+ */
+ size_t get_num_send_frames() const
+ {
+ return _num_send_frames;
+ }
+
+ /*!
+ * Get the maximum capacity of a frame buffer.
+ */
+ size_t get_send_frame_size() const
+ {
+ return _send_frame_size;
+ }
+
+ /*!
+ * Get the physical adapter ID used for this link
+ */
+ inline adapter_id_t get_send_adapter_id() const
+ {
+ return _adapter_id;
+ }
+
+ /*!
+ * Get the number of frame buffers that can be queued by this link.
+ */
+ size_t get_num_recv_frames() const
+ {
+ return _num_recv_frames;
+ }
+
+ /*!
+ * Get the maximum capacity of a frame buffer.
+ */
+ size_t get_recv_frame_size() const
+ {
+ return _recv_frame_size;
+ }
+
+ /*!
+ * Get the physical adapter ID used for this link
+ */
+ inline adapter_id_t get_recv_adapter_id() const
+ {
+ return _adapter_id;
+ }
+
+ /*!
+ * Enqueue a received mbuf, which can be pulled via get_recv_buff()
+ */
+ void enqueue_recv_mbuf(struct rte_mbuf* mbuf);
+
+ /*!
+ * Receive a packet and return a frame buffer containing the packet data.
+ * The timeout argument is ignored.
+ *
+ * Received buffers are pulled from the frame buffer list. No buffers can
+ * be retrieved unless the corresponding rte_mbufs were placed in the list
+ * via the enqueue_recv_mbuf() method.
+ *
+ * \return a frame buffer, or null uptr if timeout occurs
+ */
+ frame_buff::uptr get_recv_buff(int32_t /*timeout_ms*/);
+
+ /*!
+ * Release a frame buffer, allowing the link driver to reuse it.
+ *
+ * \param buffer frame buffer to release for reuse by the link
+ */
+ void release_recv_buff(frame_buff::uptr buff);
+
+ /*!
+ * Get an empty frame buffer in which to write packet contents.
+ *
+ * \param timeout_ms a positive timeout value specifies the maximum number
+ of ms to wait, a negative value specifies to block
+ until successful, and a value of 0 specifies no wait.
+ * \return a frame buffer, or null uptr if timeout occurs
+ */
+ frame_buff::uptr get_send_buff(int32_t /*timeout_ms*/);
+
+ /*!
+ * Send a packet with the contents of the frame buffer and release the
+ * buffer, allowing the link driver to reuse it. If the size of the frame
+ * buffer is 0, the buffer is released with no packet being sent.
+ *
+ * Note that this function will only fill in the L2 header and send the
+ * mbuf. The L3 and L4 headers, in addition to the lengths in the rte_mbuf
+ * fields, must be set in the I/O service.
+ *
+ * \param buffer frame buffer containing packet data
+ *
+ * Throws an exception if an I/O error occurs while sending
+ */
+ void release_send_buff(frame_buff::uptr buff);
+
+private:
+ //! A reference to the DPDK context
+ dpdk::dpdk_ctx::sptr _ctx;
+ //! The DPDK NIC port used by this link
+ dpdk::dpdk_port* _port;
+ //! Local UDP port, in network order
+ uint16_t _local_port;
+ //! Remote UDP port, in network order
+ uint16_t _remote_port;
+ //! Remote IPv4 address, in network order
+ uint32_t _remote_ipv4;
+ //! Remote host's MAC address
+ struct ether_addr _remote_mac;
+ //! Number of recv frames is not validated
+ size_t _num_recv_frames;
+ //! Maximum bytes of UDP payload data in recv frame
+ size_t _recv_frame_size;
+ //! Number of send frames is not validated
+ size_t _num_send_frames;
+ //! Maximum bytes of UDP payload data in send frame
+ size_t _send_frame_size;
+ //! Registered adapter ID for this link's DPDK NIC port
+ adapter_id_t _adapter_id;
+ //! The RX frame buff list head
+ dpdk::dpdk_frame_buff* _recv_buff_head = nullptr;
+ // TODO: Implement ability to use multiple queues
+ dpdk::queue_id_t _queue = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHDLIB_TRANSPORT_UDP_DPDK_LINK_HPP */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 646b2837e..d88631ae3 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -143,5 +143,14 @@ endif(ENABLE_LIBERIO)
if(ENABLE_DPDK)
INCLUDE_SUBDIRECTORY(uhd-dpdk)
+
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_simple.cpp
+ )
+ set_source_files_properties(
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_dpdk_link.cpp
+ PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE"
+ )
endif(ENABLE_DPDK)
diff --git a/host/lib/transport/dpdk_simple.cpp b/host/lib/transport/dpdk_simple.cpp
index 001775934..50855f36a 100644
--- a/host/lib/transport/dpdk_simple.cpp
+++ b/host/lib/transport/dpdk_simple.cpp
@@ -1,179 +1,203 @@
//
-// Copyright 2019 Ettus Research, a National Instruments Company
+// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
+#include <uhd/transport/frame_buff.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/dpdk/udp.hpp>
+#include <uhdlib/transport/dpdk_io_service.hpp>
+#include <uhdlib/transport/dpdk_io_service_client.hpp>
#include <uhdlib/transport/dpdk_simple.hpp>
-#include <uhdlib/transport/uhd-dpdk.h>
+#include <uhdlib/transport/links.hpp>
+#include <uhdlib/transport/udp_dpdk_link.hpp>
#include <arpa/inet.h>
+
namespace uhd { namespace transport {
namespace {
- constexpr uint64_t USEC = 1000000;
- // Non-data fields are headers (Ethernet + IPv4 + UDP) + CRC
- constexpr size_t DPDK_SIMPLE_NONDATA_SIZE = 14 + 20 + 8 + 4;
+constexpr double SEND_TIMEOUT_MS = 500; // seconds
}
class dpdk_simple_impl : public dpdk_simple {
public:
- dpdk_simple_impl(struct uhd_dpdk_ctx &ctx, const std::string &addr,
- const std::string &port, bool filter_bcast)
+ dpdk_simple_impl(const std::string& addr, const std::string& port)
{
- UHD_ASSERT_THROW(ctx.is_init_done());
-
- // Get NIC that can route to addr
- int port_id = ctx.get_route(addr);
- UHD_ASSERT_THROW(port_id >= 0);
-
- _port_id = port_id;
- uint32_t dst_ipv4 = (uint32_t) inet_addr(addr.c_str());
- uint16_t dst_port = htons(std::stoi(port, NULL, 0));
-
- struct uhd_dpdk_sockarg_udp sockarg = {
- .is_tx = false,
- .filter_bcast = filter_bcast,
- .local_port = 0,
- .remote_port = dst_port,
- .dst_addr = dst_ipv4,
- .num_bufs = 1
+ link_params_t link_params = _get_default_link_params();
+ _link =
+ uhd::transport::udp_dpdk_link::make(addr, port, link_params);
+ UHD_LOG_TRACE("DPDK::SIMPLE",
+ "Creating simple UDP object for " << addr << ":" << port
+ << ", DPDK port index "
+ << _link->get_port()->get_port_id());
+ // The context must be initialized, or we'd never get here
+ auto ctx = uhd::transport::dpdk::dpdk_ctx::get();
+ UHD_ASSERT_THROW(ctx->is_init_done());
+
+ // Init I/O service
+ _port_id = _link->get_port()->get_port_id();
+ _io_service = ctx->get_io_service(_port_id);
+ // This is normally done by the I/O service manager, but with DPDK, this
+ // is all it does so we skip that step
+ UHD_LOG_TRACE("DPDK::SIMPLE", "Attaching link to I/O service...");
+ _io_service->attach_recv_link(_link);
+ _io_service->attach_send_link(_link);
+
+ auto recv_cb = [this](buff_t::uptr& buff, recv_link_if*, send_link_if*) {
+ return this->_recv_callback(buff);
+ };
+
+ auto fc_cb = [this](buff_t::uptr buff, recv_link_if*, send_link_if*) {
+ this->_recv_fc_callback(std::move(buff));
};
- _rx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
- UHD_ASSERT_THROW(_rx_sock != nullptr);
-
- // Backfill the local port, in case it was auto-assigned
- uhd_dpdk_udp_get_info(_rx_sock, &sockarg);
- sockarg.is_tx = true;
- sockarg.remote_port = dst_port;
- sockarg.dst_addr = dst_ipv4;
- sockarg.num_bufs = 1;
- _tx_sock = uhd_dpdk_sock_open(_port_id, UHD_DPDK_SOCK_UDP, &sockarg);
- UHD_ASSERT_THROW(_tx_sock != nullptr);
- UHD_LOG_TRACE("DPDK", "Created simple transports between " << addr << ":"
- << ntohs(dst_port) << " and NIC(" << _port_id
- << "):" << ntohs(sockarg.local_port));
+
+ _recv_io = _io_service->make_recv_client(_link,
+ link_params.num_recv_frames,
+ recv_cb,
+ nullptr, // No send/fc link
+ 0, // No send frames
+ fc_cb);
+
+ auto send_cb = [this](buff_t::uptr buff, transport::send_link_if*) {
+ this->_send_callback(std::move(buff));
+ };
+ _send_io = _io_service->make_send_client(_link,
+ link_params.num_send_frames,
+ send_cb,
+ nullptr, // no FC link
+ 0,
+ nullptr, // No receive callback necessary
+ [](const size_t) { return true; } // We can always send
+ );
+ UHD_LOG_TRACE("DPDK::SIMPLE", "Constructor complete");
}
- ~dpdk_simple_impl(void) {}
+ ~dpdk_simple_impl(void)
+ {
+ UHD_LOG_TRACE("DPDK::SIMPLE",
+ "~dpdk_simple_impl(), DPDK port index " << _link->get_port()->get_port_id());
+ // Disconnect the clients from the I/O service
+ _send_io.reset();
+ _recv_io.reset();
+ // Disconnect the link from the I/O service
+ _io_service->detach_recv_link(_link);
+ _io_service->detach_send_link(_link);
+ }
- /*!
- * Send and release outstanding buffer
+ /*! Send and release outstanding buffer
*
* \param length bytes of data to send
* \return number of bytes sent (releases buffer if sent)
*/
- size_t send(const boost::asio::const_buffer& buff)
+ size_t send(const boost::asio::const_buffer& user_buff)
{
- struct rte_mbuf* tx_mbuf;
- size_t frame_size = _get_tx_buf(&tx_mbuf);
- UHD_ASSERT_THROW(tx_mbuf)
- size_t nbytes = boost::asio::buffer_size(buff);
- UHD_ASSERT_THROW(nbytes <= frame_size)
- const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(buff);
-
- uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_tx_sock, tx_mbuf);
- std::memcpy(pkt_data, user_data, nbytes);
- tx_mbuf->pkt_len = nbytes;
- tx_mbuf->data_len = nbytes;
-
- int num_tx = uhd_dpdk_send(_tx_sock, &tx_mbuf, 1);
- if (num_tx == 0)
- return 0;
+ // Extract buff and sanity check
+ const size_t nbytes = boost::asio::buffer_size(user_buff);
+ UHD_ASSERT_THROW(nbytes <= _link->get_send_frame_size())
+ const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(user_buff);
+
+ // Get send buff
+ auto buff = _send_io->get_send_buff(SEND_TIMEOUT_MS);
+ UHD_ASSERT_THROW(buff);
+ buff->set_packet_size(nbytes);
+ std::memcpy(buff->data(), user_data, nbytes);
+
+ // Release send buff (send the packet)
+ _send_io->release_send_buff(std::move(buff));
return nbytes;
}
- /*!
- * Receive a single packet.
+ /*! Receive a single packet.
+ *
* Buffer provided by transport (must be freed before next operation).
*
* \param buf a pointer to place to write buffer location
* \param timeout the timeout in seconds
* \return the number of bytes received or zero on timeout
*/
- size_t recv(const boost::asio::mutable_buffer& buff, double timeout)
+ size_t recv(const boost::asio::mutable_buffer& user_buff, double timeout)
{
- struct rte_mbuf *rx_mbuf;
- size_t buff_size = boost::asio::buffer_size(buff);
- uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(buff);
+ size_t user_buff_size = boost::asio::buffer_size(user_buff);
+ uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(user_buff);
- int bufs = uhd_dpdk_recv(_rx_sock, &rx_mbuf, 1, (int) (timeout*USEC));
- if (bufs != 1 || rx_mbuf == nullptr) {
- return 0;
- }
- if ((rx_mbuf->ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
- uhd_dpdk_free_buf(rx_mbuf);
+ auto buff = _recv_io->get_recv_buff(static_cast<int32_t>(timeout * 1e3));
+ if (!buff) {
return 0;
}
- uhd_dpdk_get_src_ipv4(_rx_sock, rx_mbuf, &_last_recv_addr);
- const size_t nbytes = uhd_dpdk_get_len(_rx_sock, rx_mbuf);
- UHD_ASSERT_THROW(nbytes <= buff_size);
+ // Extract the sender's address. This is only possible because we know
+ // the memory layout of the buff
+ struct udp_hdr* udp_hdr_end = (struct udp_hdr*)buff->data();
+ struct ipv4_hdr* ip_hdr_end = (struct ipv4_hdr*)(&udp_hdr_end[-1]);
+ struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)(&ip_hdr_end[-1]);
+ _last_recv_addr = ip_hdr->src_addr;
+
+ // Extract the buffer data
+ const size_t copy_len = std::min(user_buff_size, buff->packet_size());
+ if (copy_len < buff->packet_size()) {
+ UHD_LOG_WARNING("DPDK", "Truncating recv packet");
+ }
+ std::memcpy(user_data, buff->data(), copy_len);
- uint8_t* pkt_data = (uint8_t*) uhd_dpdk_buf_to_data(_rx_sock, rx_mbuf);
- std::memcpy(user_data, pkt_data, nbytes);
- _put_rx_buf(rx_mbuf);
- return nbytes;
+ // Housekeeping
+ _recv_io->release_recv_buff(std::move(buff));
+ return copy_len;
}
- /*!
- * Get the last IP address as seen by recv().
- * Only use this with the broadcast socket.
- */
std::string get_recv_addr(void)
{
- char addr_str[INET_ADDRSTRLEN];
- struct in_addr ipv4_addr;
- ipv4_addr.s_addr = _last_recv_addr;
- inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
- return std::string(addr_str);
+ return dpdk::ipv4_num_to_str(_last_recv_addr);
}
- /*!
- * Get the IP address for the destination
- */
std::string get_send_addr(void)
{
- struct in_addr ipv4_addr;
- int status = uhd_dpdk_get_ipv4_addr(_port_id, &ipv4_addr.s_addr, nullptr);
- UHD_ASSERT_THROW(status);
- char addr_str[INET_ADDRSTRLEN];
- inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
- return std::string(addr_str);
+ return dpdk::ipv4_num_to_str(_link->get_remote_ipv4());
}
+
private:
- /*!
- * Request a single send buffer of specified size.
- *
- * \param buf a pointer to place to write buffer location
- * \return the maximum length of the buffer
- */
- size_t _get_tx_buf(struct rte_mbuf** buf)
+ using buff_t = frame_buff;
+
+ link_params_t _get_default_link_params()
{
- int bufs = uhd_dpdk_request_tx_bufs(_tx_sock, buf, 1, 0);
- if (bufs != 1) {
- *buf = nullptr;
- return 0;
- }
- return _mtu - DPDK_SIMPLE_NONDATA_SIZE;
+ link_params_t link_params;
+ link_params.recv_frame_size = 8000;
+ link_params.send_frame_size = 8000;
+ link_params.num_recv_frames = 1;
+ link_params.num_send_frames = 1;
+ link_params.recv_buff_size = 8000;
+ link_params.send_buff_size = 8000;
+ return link_params;
}
- /*!
- * Return/free receive buffer
- * Can also use to free un-sent TX bufs
- */
- void _put_rx_buf(struct rte_mbuf *rx_mbuf)
+ void _send_callback(buff_t::uptr buff)
+ {
+ _link->release_send_buff(std::move(buff));
+ }
+
+ bool _recv_callback(buff_t::uptr&)
{
- UHD_ASSERT_THROW(rx_mbuf)
- uhd_dpdk_free_buf(rx_mbuf);
+ // Queue it up
+ return true;
}
+ void _recv_fc_callback(buff_t::uptr buff)
+ {
+ _link->release_recv_buff(std::move(buff));
+ }
+
+ /*** Attributes **********************************************************/
unsigned int _port_id;
- size_t _mtu;
- struct uhd_dpdk_socket *_tx_sock;
- struct uhd_dpdk_socket *_rx_sock;
uint32_t _last_recv_addr;
+
+ udp_dpdk_link::sptr _link;
+
+ dpdk_io_service::sptr _io_service;
+
+ send_io_if::sptr _send_io;
+
+ recv_io_if::sptr _recv_io;
};
dpdk_simple::~dpdk_simple(void) {}
@@ -182,16 +206,16 @@ dpdk_simple::~dpdk_simple(void) {}
* DPDK simple transport public make functions
**********************************************************************/
udp_simple::sptr dpdk_simple::make_connected(
- struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port
-){
- return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, true));
+ const std::string& addr, const std::string& port)
+{
+ return udp_simple::sptr(new dpdk_simple_impl(addr, port));
}
+// For DPDK, this is not special and the same as make_connected
udp_simple::sptr dpdk_simple::make_broadcast(
- struct uhd_dpdk_ctx &ctx, const std::string &addr, const std::string &port
-){
- return udp_simple::sptr(new dpdk_simple_impl(ctx, addr, port, false));
+ const std::string& addr, const std::string& port)
+{
+ return udp_simple::sptr(new dpdk_simple_impl(addr, port));
}
}} // namespace uhd::transport
-
diff --git a/host/lib/transport/udp_dpdk_link.cpp b/host/lib/transport/udp_dpdk_link.cpp
new file mode 100644
index 000000000..dc56de43c
--- /dev/null
+++ b/host/lib/transport/udp_dpdk_link.cpp
@@ -0,0 +1,198 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/config.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/static.hpp>
+#include <uhdlib/transport/adapter.hpp>
+#include <uhdlib/transport/dpdk/udp.hpp>
+#include <uhdlib/transport/udp_dpdk_link.hpp>
+#include <arpa/inet.h>
+#include <memory>
+
+using namespace uhd::transport;
+using namespace uhd::transport::dpdk;
+
+udp_dpdk_link::udp_dpdk_link(dpdk::port_id_t port_id,
+ const std::string& remote_addr,
+ const std::string& remote_port,
+ const std::string& local_port,
+ const link_params_t& params)
+ : _num_recv_frames(params.num_recv_frames)
+ , _recv_frame_size(params.recv_frame_size)
+ , _num_send_frames(params.num_send_frames)
+ , _send_frame_size(params.send_frame_size)
+{
+ // Get a reference to the context, since this class manages DPDK memory
+ _ctx = dpdk_ctx::get();
+ UHD_ASSERT_THROW(_ctx);
+
+ // Fill in remote IPv4 address and UDP port
+ // NOTE: Remote MAC address is filled in later by I/O service
+ int status = inet_pton(AF_INET, remote_addr.c_str(), &_remote_ipv4);
+ if (status != 1) {
+ UHD_LOG_ERROR("DPDK", std::string("Invalid destination address ") + remote_addr);
+ throw uhd::runtime_error(
+ std::string("DPDK: Invalid destination address ") + remote_addr);
+ }
+ _remote_port = rte_cpu_to_be_16(std::stoul(remote_port));
+
+ // Grab the port with a route to the remote host
+ _port = _ctx->get_port(port_id);
+
+ uint16_t local_port_num = rte_cpu_to_be_16(std::stoul(local_port));
+ // Get an unused UDP port for listening
+ _local_port = _port->alloc_udp_port(local_port_num);
+
+ // Validate params
+ const size_t max_frame_size = _port->get_mtu() - dpdk::HDR_SIZE_UDP_IPV4;
+ UHD_ASSERT_THROW(params.send_frame_size <= max_frame_size);
+ UHD_ASSERT_THROW(params.recv_frame_size <= max_frame_size);
+
+ // Register the adapter
+ auto info = _port->get_adapter_info();
+ auto& adap_ctx = adapter_ctx::get();
+ _adapter_id = adap_ctx.register_adapter(info);
+ UHD_LOGGER_TRACE("DPDK") << boost::format("Created udp_dpdk_link to (%s:%s)")
+ % remote_addr % remote_port;
+ UHD_LOGGER_TRACE("DPDK")
+ << boost::format("num_recv_frames=%d, recv_frame_size=%d, num_send_frames=%d, "
+ "send_frame_size=%d")
+ % params.num_recv_frames % params.recv_frame_size % params.num_send_frames
+ % params.send_frame_size;
+}
+
+udp_dpdk_link::~udp_dpdk_link() {}
+
+udp_dpdk_link::sptr udp_dpdk_link::make(const std::string& remote_addr,
+ const std::string& remote_port,
+ const link_params_t& params)
+{
+ auto ctx = dpdk::dpdk_ctx::get();
+ auto port = ctx->get_route(remote_addr);
+ if (!port) {
+ UHD_LOG_ERROR("DPDK",
+ std::string("Could not find route to destination address ") + remote_addr);
+ throw uhd::runtime_error(
+ std::string("DPDK: Could not find route to destination address ")
+ + remote_addr);
+ }
+ return make(port->get_port_id(), remote_addr, remote_port, "0", params);
+}
+
+udp_dpdk_link::sptr udp_dpdk_link::make(const dpdk::port_id_t port_id,
+ const std::string& remote_addr,
+ const std::string& remote_port,
+ const std::string& local_port,
+ const link_params_t& params)
+{
+ UHD_ASSERT_THROW(params.recv_frame_size > 0);
+ UHD_ASSERT_THROW(params.send_frame_size > 0);
+ UHD_ASSERT_THROW(params.num_send_frames > 0);
+ UHD_ASSERT_THROW(params.num_recv_frames > 0);
+
+ return std::make_shared<udp_dpdk_link>(
+ port_id, remote_addr, remote_port, local_port, params);
+}
+
+void udp_dpdk_link::enqueue_recv_mbuf(struct rte_mbuf* mbuf)
+{
+ // Get packet size
+ struct udp_hdr* hdr = rte_pktmbuf_mtod_offset(
+ mbuf, struct udp_hdr*, sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr));
+ size_t packet_size = rte_be_to_cpu_16(hdr->dgram_len) - sizeof(struct udp_hdr);
+ // Prepare the dpdk_frame_buff
+ auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf);
+ buff->header_jump(
+ sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr));
+ buff->set_packet_size(packet_size);
+ // Add the dpdk_frame_buff to the list
+ if (_recv_buff_head) {
+ buff->prev = _recv_buff_head->prev;
+ buff->next = _recv_buff_head;
+ _recv_buff_head->prev->next = buff;
+ _recv_buff_head->prev = buff;
+ } else {
+ _recv_buff_head = buff;
+ buff->next = buff;
+ buff->prev = buff;
+ }
+}
+
+frame_buff::uptr udp_dpdk_link::get_recv_buff(int32_t /*timeout_ms*/)
+{
+ auto buff = _recv_buff_head;
+ if (buff) {
+ if (_recv_buff_head->next == buff) {
+ /* Only had the one buff, so the list is empty */
+ _recv_buff_head = nullptr;
+ } else {
+ /* Make the next buff the new list head */
+ _recv_buff_head->next->prev = _recv_buff_head->prev;
+ _recv_buff_head->prev->next = _recv_buff_head->next;
+ _recv_buff_head = _recv_buff_head->next;
+ }
+ buff->next = nullptr;
+ buff->prev = nullptr;
+ return frame_buff::uptr(buff);
+ }
+ return frame_buff::uptr();
+}
+
+void udp_dpdk_link::release_recv_buff(frame_buff::uptr buff)
+{
+ dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release();
+ assert(buff_ptr);
+ rte_pktmbuf_free(buff_ptr->get_pktmbuf());
+}
+
+frame_buff::uptr udp_dpdk_link::get_send_buff(int32_t /*timeout_ms*/)
+{
+ auto mbuf = rte_pktmbuf_alloc(_port->get_tx_pktbuf_pool());
+ if (mbuf) {
+ auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf);
+ buff->header_jump(
+ sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr));
+ return frame_buff::uptr(buff);
+ }
+ return frame_buff::uptr();
+}
+
+void udp_dpdk_link::release_send_buff(frame_buff::uptr buff)
+{
+ dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release();
+ assert(buff_ptr);
+ auto mbuf = buff_ptr->get_pktmbuf();
+ if (buff_ptr->packet_size()) {
+ // Fill in L2 header
+ auto local_mac = _port->get_mac_addr();
+ struct ether_hdr* l2_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
+ ether_addr_copy(&_remote_mac, &l2_hdr->d_addr);
+ ether_addr_copy(&local_mac, &l2_hdr->s_addr);
+ l2_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+ // Fill in L3 and L4 headers
+ dpdk::fill_udp_hdr(mbuf,
+ _port,
+ _remote_ipv4,
+ _local_port,
+ _remote_port,
+ buff_ptr->packet_size());
+ // Prepare the packet buffer and send it out
+ int status = rte_eth_tx_prepare(_port->get_port_id(), _queue, &mbuf, 1);
+ if (status != 1) {
+ throw uhd::runtime_error("DPDK: Failed to prepare TX buffer for send");
+ }
+ status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1);
+ while (status != 1) {
+ status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1);
+ // FIXME: Should we make available retrying?
+ // throw uhd::runtime_error("DPDK: Failed to send TX buffer");
+ }
+ } else {
+ // Release the buffer if there is nothing in it
+ rte_pktmbuf_free(mbuf);
+ }
+}
diff --git a/host/lib/transport/uhd-dpdk/CMakeLists.txt b/host/lib/transport/uhd-dpdk/CMakeLists.txt
index ea78aa1e8..a13886653 100644
--- a/host/lib/transport/uhd-dpdk/CMakeLists.txt
+++ b/host/lib/transport/uhd-dpdk/CMakeLists.txt
@@ -19,9 +19,11 @@ if(ENABLE_DPDK)
LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp
)
set_source_files_properties(
${CMAKE_CURRENT_SOURCE_DIR}/dpdk_common.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/dpdk_io_service.cpp
PROPERTIES COMPILE_FLAGS "${UHD_DPDK_CFLAGS} -D_GNU_SOURCE"
)
include_directories(${DPDK_INCLUDE_DIR})
diff --git a/host/lib/transport/uhd-dpdk/dpdk_common.cpp b/host/lib/transport/uhd-dpdk/dpdk_common.cpp
index 43a1507bb..46818e973 100644
--- a/host/lib/transport/uhd-dpdk/dpdk_common.cpp
+++ b/host/lib/transport/uhd-dpdk/dpdk_common.cpp
@@ -3,8 +3,13 @@
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
+
+#include <uhd/utils/algorithm.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/dpdk/arp.hpp>
#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/dpdk/udp.hpp>
+#include <uhdlib/transport/dpdk_io_service.hpp>
#include <uhdlib/utils/prefs.hpp>
#include <arpa/inet.h>
#include <rte_arp.h>
@@ -23,6 +28,7 @@ constexpr uint16_t DPDK_DEFAULT_RING_SIZE = 512;
inline char* eal_add_opt(
std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)
{
+ UHD_LOG_TRACE("DPDK", opt << " " << arg);
char* ptr = dst;
strncpy(ptr, opt, n);
argv.push_back(ptr);
@@ -34,15 +40,6 @@ inline char* eal_add_opt(
return ptr;
}
-inline std::string eth_addr_to_string(const struct ether_addr mac_addr)
-{
- auto mac_stream = boost::format("%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx");
- mac_stream % (uint32_t)mac_addr.addr_bytes[0] % (uint32_t)mac_addr.addr_bytes[1]
- % (uint32_t)mac_addr.addr_bytes[2] % (uint32_t)mac_addr.addr_bytes[3]
- % (uint32_t)mac_addr.addr_bytes[4] % (uint32_t)mac_addr.addr_bytes[5];
- return mac_stream.str();
-}
-
inline void separate_ipv4_addr(
const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask)
{
@@ -59,28 +56,29 @@ inline void separate_ipv4_addr(
dpdk_port::uptr dpdk_port::make(port_id_t port,
size_t mtu,
uint16_t num_queues,
- size_t num_mbufs,
+ uint16_t num_desc,
struct rte_mempool* rx_pktbuf_pool,
struct rte_mempool* tx_pktbuf_pool,
std::string ipv4_address)
{
return std::make_unique<dpdk_port>(
- port, mtu, num_queues, num_mbufs, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);
+ port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);
}
dpdk_port::dpdk_port(port_id_t port,
size_t mtu,
uint16_t num_queues,
- size_t num_mbufs,
+ uint16_t num_desc,
struct rte_mempool* rx_pktbuf_pool,
struct rte_mempool* tx_pktbuf_pool,
std::string ipv4_address)
: _port(port)
, _mtu(mtu)
+ , _num_queues(num_queues)
, _rx_pktbuf_pool(rx_pktbuf_pool)
, _tx_pktbuf_pool(tx_pktbuf_pool)
{
- /* 1. Set MTU and IPv4 address */
+ /* Set MTU and IPv4 address */
int retval;
retval = rte_eth_dev_set_mtu(_port, _mtu);
@@ -96,7 +94,7 @@ dpdk_port::dpdk_port(port_id_t port,
separate_ipv4_addr(ipv4_address, _ipv4, _netmask);
- /* 2. Set hardware offloads */
+ /* Set hardware offloads */
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(_port, &dev_info);
uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM;
@@ -132,13 +130,13 @@ dpdk_port::dpdk_port(port_id_t port,
throw uhd::runtime_error("DPDK: Failed to configure the device");
}
- /* 3. Set descriptor ring sizes */
- uint16_t rx_desc = num_mbufs;
+ /* Set descriptor ring sizes */
+ uint16_t rx_desc = num_desc;
if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc
|| (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) {
UHD_LOGGER_ERROR("DPDK")
<< boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]")
- % _port % num_mbufs % dev_info.rx_desc_lim.nb_min
+ % _port % num_desc % dev_info.rx_desc_lim.nb_min
% dev_info.rx_desc_lim.nb_max;
UHD_LOGGER_ERROR("DPDK")
<< boost::format("Num RX descriptors must also be aligned to 0x%x")
@@ -146,12 +144,12 @@ dpdk_port::dpdk_port(port_id_t port,
throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors");
}
- uint16_t tx_desc = num_mbufs;
+ uint16_t tx_desc = num_desc;
if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc
|| (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) {
UHD_LOGGER_ERROR("DPDK")
<< boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]")
- % _port % num_mbufs % dev_info.tx_desc_lim.nb_min
+ % _port % num_desc % dev_info.tx_desc_lim.nb_min
% dev_info.tx_desc_lim.nb_max;
UHD_LOGGER_ERROR("DPDK")
<< boost::format("Num TX descriptors must also be aligned to 0x%x")
@@ -165,7 +163,7 @@ dpdk_port::dpdk_port(port_id_t port,
throw uhd::runtime_error("DPDK: Failed to configure the DMA queues");
}
- /* 4. Set up the RX and TX DMA queues (May not be generally supported after
+ /* Set up the RX and TX DMA queues (May not be generally supported after
* eth_dev_start) */
unsigned int cpu_socket = rte_eth_dev_socket_id(_port);
for (uint16_t i = 0; i < _num_queues; i++) {
@@ -187,36 +185,9 @@ dpdk_port::dpdk_port(port_id_t port,
}
}
- /* 5. Set up initial flow table */
+ /* TODO: Enable multiple queues (only support 1 right now) */
- /* Append all free queues except 0, which is reserved for ARP */
- _free_queues.reserve(_num_queues - 1);
- for (unsigned int i = 1; i < _num_queues; i++) {
- _free_queues.push_back(i);
- }
-
- // struct rte_flow_attr flow_attr;
- // flow_attr.group = 0;
- // flow_attr.priority = 1;
- // flow_attr.ingress = 1;
- // flow_attr.egress = 0;
- // flow_attr.transfer = 0;
- // flow_attr.reserved = 0;
-
- // struct rte_flow_item[] flow_pattern = {
- //};
- // int rte_flow_validate(uint16_t port_id,
- // const struct rte_flow_attr *attr,
- // const struct rte_flow_item pattern[],
- // const struct rte_flow_action actions[],
- // struct rte_flow_error *error);
- // struct rte_flow * rte_flow_create(uint16_t port_id,
- // const struct rte_flow_attr *attr,
- // const struct rte_flow_item pattern[],
- // const struct rte_flow_action *actions[],
- // struct rte_flow_error *error);
-
- /* 6. Start the Ethernet device */
+ /* Start the Ethernet device */
retval = rte_eth_dev_start(_port);
if (retval < 0) {
UHD_LOGGER_ERROR("DPDK")
@@ -230,39 +201,60 @@ dpdk_port::dpdk_port(port_id_t port,
<< " MAC: " << eth_addr_to_string(_mac_addr);
}
-/* TODO: Do flow directions */
-queue_id_t dpdk_port::alloc_queue(struct rte_flow_pattern recv_pattern[])
+dpdk_port::~dpdk_port()
{
- std::lock_guard<std::mutex> lock(_mutex);
- UHD_ASSERT_THROW(_free_queues.size() != 0);
- auto queue = _free_queues.back();
- _free_queues.pop_back();
- return queue;
+ rte_eth_dev_stop(_port);
+ rte_spinlock_lock(&_spinlock);
+ for (auto kv : _arp_table) {
+ for (auto req : kv.second->reqs) {
+ req->cond.notify_one();
+ }
+ rte_free(kv.second);
+ }
+ _arp_table.clear();
+ rte_spinlock_unlock(&_spinlock);
}
-void dpdk_port::free_queue(queue_id_t queue)
+uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port)
{
+ uint16_t port_selected;
std::lock_guard<std::mutex> lock(_mutex);
- auto flow = _flow_rules.at(queue);
- int status = rte_flow_destroy(_port, flow, NULL);
- if (status) {
- UHD_LOGGER_ERROR("DPDK")
- << boost::format("Failed to destroy flow rule on port %u, queue %u") % _port
- % queue;
- throw uhd::runtime_error("DPDK: Failed to destroy flow rule");
+ if (udp_port) {
+ if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) {
+ return 0;
+ }
+ port_selected = rte_be_to_cpu_16(udp_port);
} else {
- _flow_rules.erase(queue);
+ if (_udp_ports.size() >= 65535) {
+ UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain");
+ return 0;
+ }
+ port_selected = _next_udp_port;
+ while (true) {
+ if (port_selected == 0) {
+ continue;
+ }
+ if (_udp_ports.count(port_selected) == 0) {
+ _next_udp_port = port_selected - 1;
+ break;
+ }
+ if (port_selected - 1 == _next_udp_port) {
+ return 0;
+ }
+ port_selected--;
+ }
}
- _free_queues.push_back(queue);
+ _udp_ports.insert(port_selected);
+ return rte_cpu_to_be_16(port_selected);
}
-int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* arp_req)
+int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req)
{
struct rte_mbuf* mbuf;
struct ether_hdr* hdr;
struct arp_hdr* arp_frame;
- mbuf = rte_pktmbuf_alloc(tx_pktbuf_pool);
+ mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool);
if (unlikely(mbuf == NULL)) {
UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response");
return -ENOMEM;
@@ -288,8 +280,7 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar
mbuf->pkt_len = 42;
mbuf->data_len = 42;
- // ARP replies always on queue 0
- if (rte_eth_tx_burst(_port, 0, &mbuf, 1) != 1) {
+ if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) {
UHD_LOGGER_WARNING("DPDK")
<< boost::format("%s: TX descriptor ring is full") % __func__;
rte_pktmbuf_free(mbuf);
@@ -298,61 +289,16 @@ int dpdk_port::_arp_reply(struct rte_mempool* tx_pktbuf_pool, struct arp_hdr* ar
return 0;
}
-// TODO: ARP processing for queue 0
-// int dpdk_port::process_arp(struct rte_mempool *tx_pktbuf_pool, struct arp_hdr
-// *arp_frame)
-//{
-// std::lock_guard<std::mutex> lock(_mutex);
-// uint32_t dest_ip = arp_frame->arp_data.arp_sip;
-// struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
-//
-// /* Add entry to ARP table */
-// struct uhd_dpdk_arp_entry *entry = NULL;
-// rte_hash_lookup_data(_arp_table, &dest_ip, (void **) &entry);
-// if (!entry) {
-// entry = rte_zmalloc(NULL, sizeof(*entry), 0);
-// if (!entry) {
-// return -ENOMEM;
-// }
-// LIST_INIT(&entry->pending_list);
-// ether_addr_copy(&dest_addr, &entry->mac_addr);
-// if (rte_hash_add_key_data(port->arp_table, &dest_ip, entry) < 0) {
-// rte_free(entry);
-// return -ENOSPC;
-// }
-// } else {
-// struct uhd_dpdk_config_req *req = NULL;
-// ether_addr_copy(&dest_addr, &entry->mac_addr);
-// /* Now wake any config reqs waiting for the ARP */
-// LIST_FOREACH(req, &entry->pending_list, entry) {
-// _uhd_dpdk_config_req_compl(req, 0);
-// }
-// while (entry->pending_list.lh_first != NULL) {
-// LIST_REMOVE(entry->pending_list.lh_first, entry);
-// }
-// }
-// /* Respond if this was an ARP request */
-// if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) &&
-// arp_frame->arp_data.arp_tip == port->ipv4_addr) {
-// _arp_reply(tx_pktbuf_pool, arp_frame);
-// }
-//
-// return 0;
-//
-//}
-
-static dpdk_ctx* global_ctx = nullptr;
+static dpdk_ctx::sptr global_ctx = nullptr;
static std::mutex global_ctx_mutex;
dpdk_ctx::sptr dpdk_ctx::get()
{
std::lock_guard<std::mutex> lock(global_ctx_mutex);
if (!global_ctx) {
- auto new_ctx = std::make_shared<dpdk_ctx>();
- global_ctx = new_ctx.get();
- return new_ctx;
+ global_ctx = std::make_shared<dpdk_ctx>();
}
- return global_ctx->shared_from_this();
+ return global_ctx;
}
dpdk_ctx::dpdk_ctx(void) : _init_done(false) {}
@@ -384,6 +330,7 @@ void dpdk_ctx::_eal_init(const device_addr_t& eal_args)
auto args = new std::array<char, 4096>();
char* opt = args->data();
char* end = args->data() + args->size();
+ UHD_LOG_TRACE("DPDK", "EAL init options: ");
for (std::string& key : eal_args.keys()) {
std::string val = eal_args[key];
if (key == "dpdk_coremask") {
@@ -452,10 +399,16 @@ void dpdk_ctx::init(const device_addr_t& user_args)
_num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS);
_mbuf_cache_size =
dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE);
+ UHD_LOG_TRACE("DPDK",
+ "mtu: " << _mtu << " num_mbufs: " << _num_mbufs
+ << " mbuf_cache_size: " << _mbuf_cache_size);
/* Get device info for all the NIC ports */
int num_dpdk_ports = rte_eth_dev_count_avail();
- UHD_ASSERT_THROW(num_dpdk_ports > 0);
+ if (num_dpdk_ports == 0) {
+ UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!");
+ throw uhd::runtime_error("No available DPDK devices (ports) found!");
+ }
device_addrs_t nics(num_dpdk_ports);
RTE_ETH_FOREACH_DEV(i)
{
@@ -480,6 +433,8 @@ void dpdk_ctx::init(const device_addr_t& user_args)
}
/* Now combine user args with conf file */
auto conf = uhd::prefs::get_dpdk_nic_args(nic);
+ // TODO: Enable the use of multiple DMA queues
+ conf["dpdk_num_queues"] = "1";
/* Update config, and remove ports that aren't fully configured */
if (conf.has_key("dpdk_ipv4")) {
@@ -491,10 +446,17 @@ void dpdk_ctx::init(const device_addr_t& user_args)
}
}
+ std::map<size_t, std::vector<size_t>> lcore_to_port_id_map;
RTE_ETH_FOREACH_DEV(i)
{
auto& conf = nics.at(i);
if (conf.has_key("dpdk_ipv4")) {
+ UHD_ASSERT_THROW(conf.has_key("dpdk_lcore"));
+ const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0);
+ if (!lcore_to_port_id_map.count(lcore_id)) {
+ lcore_to_port_id_map.insert({lcore_id, {}});
+ }
+
// Allocating enough buffers for all DMA queues for each CPU socket
// - This is a bit inefficient for larger systems, since NICs may not
// all be on one socket
@@ -507,10 +469,13 @@ void dpdk_ctx::init(const device_addr_t& user_args)
_ports[i] = dpdk_port::make(i,
_mtu,
conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()),
- _num_mbufs,
+ conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE),
rx_pool,
tx_pool,
conf["dpdk_ipv4"]);
+
+ // Remember all port IDs that map to an lcore
+ lcore_to_port_id_map.at(lcore_id).push_back(i);
}
}
@@ -522,12 +487,29 @@ void dpdk_ctx::init(const device_addr_t& user_args)
rte_eth_link_get(portid, &link);
unsigned int link_status = link.link_status;
unsigned int link_speed = link.link_speed;
- UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps\n")
- % portid % link_status % link_speed;
+ UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps") % portid
+ % link_status % link_speed;
}
- UHD_LOG_TRACE("DPDK", "Init DONE!");
-
+ UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services");
_init_done = true;
+
+ // Links are up, now create one IO service per lcore
+ for (auto& lcore_portids_pair : lcore_to_port_id_map) {
+ const size_t lcore_id = lcore_portids_pair.first;
+ std::vector<dpdk_port*> dpdk_ports;
+ dpdk_ports.reserve(lcore_portids_pair.second.size());
+ for (const size_t port_id : lcore_portids_pair.second) {
+ dpdk_ports.push_back(get_port(port_id));
+ }
+ const size_t servq_depth = 32; // FIXME
+ UHD_LOG_TRACE("DPDK",
+ "Creating I/O service for lcore "
+ << lcore_id << ", servicing " << dpdk_ports.size()
+ << " ports, service queue depth " << servq_depth);
+ _io_srv_portid_map.insert(
+ {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth),
+ lcore_portids_pair.second});
+ }
}
}
@@ -577,7 +559,7 @@ int dpdk_ctx::get_port_link_status(port_id_t portid) const
return link.link_status;
}
-int dpdk_ctx::get_route(const std::string& addr) const
+dpdk_port* dpdk_ctx::get_route(const std::string& addr) const
{
const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
for (const auto& port : _ports) {
@@ -586,10 +568,10 @@ int dpdk_ctx::get_route(const std::string& addr) const
uint32_t src_ipv4 = port.second->get_ipv4();
uint32_t netmask = port.second->get_netmask();
if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) {
- return (int)port.first;
+ return port.second.get();
}
}
- return -ENODEV;
+ return NULL;
}
@@ -598,6 +580,20 @@ bool dpdk_ctx::is_init_done(void) const
return _init_done.load();
}
+uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id)
+{
+ for (auto& io_srv_portid_pair : _io_srv_portid_map) {
+ if (uhd::has(io_srv_portid_pair.second, port_id)) {
+ return io_srv_portid_pair.first;
+ }
+ }
+
+ std::string err_msg = std::string("Cannot look up I/O service for port ID: ")
+ + std::to_string(port_id) + ". No such port ID!";
+ UHD_LOG_ERROR("DPDK", err_msg);
+ throw uhd::lookup_error(err_msg);
+}
+
struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(
unsigned int cpu_socket, size_t num_bufs)
{
@@ -605,8 +601,12 @@ struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(
const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM;
char name[32];
snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket);
- _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(
- name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY);
+ _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name,
+ num_bufs,
+ _mbuf_cache_size,
+ DPDK_MBUF_PRIV_SIZE,
+ mbuf_size,
+ SOCKET_ID_ANY);
if (!_rx_pktbuf_pools.at(cpu_socket)) {
UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool");
throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool");
diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp
new file mode 100644
index 000000000..1fcedca51
--- /dev/null
+++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp
@@ -0,0 +1,950 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/thread.hpp>
+#include <uhdlib/transport/dpdk/arp.hpp>
+#include <uhdlib/transport/dpdk/udp.hpp>
+#include <uhdlib/transport/dpdk_io_service_client.hpp>
+#include <uhdlib/utils/narrow.hpp>
+#include <cmath>
+
+/*
+ * Memory management
+ *
+ * Every object that allocates and frees DPDK memory has a reference to the
+ * dpdk_ctx.
+ *
+ * Ownership hierarchy:
+ *
+ * dpdk_io_service_mgr (1) =>
+ * dpdk_ctx::sptr
+ * dpdk_io_service::sptr
+ *
+ * xport (1) =>
+ * dpdk_send_io::sptr
+ * dpdk_recv_io::sptr
+ *
+ * usrp link_mgr (1) =>
+ * udp_dpdk_link::sptr
+ *
+ * dpdk_send_io (2) =>
+ * dpdk_ctx::sptr
+ * dpdk_io_service::sptr
+ *
+ * dpdk_recv_io (2) =>
+ * dpdk_ctx::sptr
+ * dpdk_io_service::sptr
+ *
+ * dpdk_io_service (3) =>
+ * dpdk_ctx::wptr (weak_ptr)
+ * udp_dpdk_link::sptr
+ *
+ * udp_dpdk_link (4) =>
+ * dpdk_ctx::sptr
+ */
+
+using namespace uhd::transport;
+
+dpdk_io_service::dpdk_io_service(
+ unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth)
+ : _ctx(dpdk::dpdk_ctx::get())
+ , _lcore_id(lcore_id)
+ , _ports(ports)
+ , _servq(servq_depth, lcore_id)
+{
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id);
+ for (auto port : _ports) {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE",
+ "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id());
+ _tx_queues[port->get_port_id()] = std::list<dpdk_send_io*>();
+ _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>();
+ }
+ int status = rte_eal_remote_launch(_io_worker, this, lcore_id);
+ if (status) {
+ throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore");
+ }
+}
+
+dpdk_io_service::sptr dpdk_io_service::make(
+ unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth)
+{
+ return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth));
+}
+
+dpdk_io_service::~dpdk_io_service()
+{
+ UHD_LOG_TRACE(
+ "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id);
+ dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL);
+ if (!req) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE",
+ "Could not allocate request for lcore termination for lcore " << _lcore_id);
+ return;
+ }
+ dpdk::wait_req_get(req);
+ _servq.submit(req, std::chrono::microseconds(-1));
+ dpdk::wait_req_put(req);
+}
+
+void dpdk_io_service::attach_recv_link(recv_link_if::sptr link)
+{
+ struct dpdk_flow_data data;
+ data.link = dynamic_cast<udp_dpdk_link*>(link.get());
+ data.is_recv = true;
+ assert(data.link);
+ auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data);
+ if (!req) {
+ UHD_LOG_ERROR(
+ "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link");
+ throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link");
+ }
+ _servq.submit(req, std::chrono::microseconds(-1));
+ dpdk::wait_req_put(req);
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _recv_links.push_back(link);
+ }
+}
+
+void dpdk_io_service::attach_send_link(send_link_if::sptr link)
+{
+ udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get());
+ assert(dpdk_link);
+
+ // First, fill in destination MAC address
+ struct dpdk::arp_request arp_data;
+ arp_data.tpa = dpdk_link->get_remote_ipv4();
+ arp_data.port = dpdk_link->get_port()->get_port_id();
+ if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) {
+ // If a broadcast IP, skip the ARP and fill with broadcast MAC addr
+ memset(arp_data.tha.addr_bytes, 0xFF, 6);
+ } else {
+ auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data);
+ if (!arp_req) {
+ UHD_LOG_ERROR(
+ "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request");
+ throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request");
+ }
+ if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) {
+ // Try one more time...
+ auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data);
+ if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) {
+ wait_req_put(arp_req);
+ wait_req_put(arp_req2);
+ throw uhd::io_error("DPDK: Could not reach host");
+ }
+ wait_req_put(arp_req2);
+ }
+ wait_req_put(arp_req);
+ }
+ dpdk_link->set_remote_mac(arp_data.tha);
+
+ // Then, submit the link to the I/O service thread
+ struct dpdk_flow_data data;
+ data.link = dpdk_link;
+ data.is_recv = false;
+ auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data);
+ if (!req) {
+ UHD_LOG_ERROR(
+ "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link");
+ throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link");
+ }
+ _servq.submit(req, std::chrono::microseconds(-1));
+ wait_req_put(req);
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _send_links.push_back(link);
+ }
+}
+
+void dpdk_io_service::detach_recv_link(recv_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ struct dpdk_flow_data data;
+ data.link = dynamic_cast<udp_dpdk_link*>(link_ptr);
+ data.is_recv = true;
+ auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data);
+ if (!req) {
+ UHD_LOG_ERROR(
+ "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link");
+ throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link");
+ }
+ _servq.submit(req, std::chrono::microseconds(-1));
+ wait_req_put(req);
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _recv_links.remove_if(
+ [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; });
+ }
+}
+
+void dpdk_io_service::detach_send_link(send_link_if::sptr link)
+{
+ auto link_ptr = link.get();
+ struct dpdk_flow_data data;
+ data.link = dynamic_cast<udp_dpdk_link*>(link_ptr);
+ data.is_recv = false;
+ auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data);
+ if (!req) {
+ UHD_LOG_ERROR(
+ "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link");
+ throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link");
+ }
+ _servq.submit(req, std::chrono::microseconds(-1));
+ wait_req_put(req);
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _send_links.remove_if(
+ [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; });
+ }
+}
+
+recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link,
+ size_t num_recv_frames,
+ recv_callback_t cb,
+ send_link_if::sptr /*fc_link*/,
+ size_t num_send_frames,
+ recv_io_if::fc_callback_t fc_cb)
+{
+ auto link = dynamic_cast<udp_dpdk_link*>(data_link.get());
+ auto recv_io = std::make_shared<dpdk_recv_io>(
+ shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb);
+
+ // Register with I/O service
+ recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get());
+ auto xport_req = dpdk::wait_req_alloc(
+ dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if);
+ _servq.submit(xport_req, std::chrono::microseconds(-1));
+ wait_req_put(xport_req);
+ return recv_io;
+}
+
+send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link,
+ size_t num_send_frames,
+ send_io_if::send_callback_t send_cb,
+ recv_link_if::sptr /*recv_link*/,
+ size_t num_recv_frames,
+ recv_callback_t recv_cb,
+ send_io_if::fc_callback_t fc_cb)
+{
+ auto link = dynamic_cast<udp_dpdk_link*>(send_link.get());
+ auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(),
+ link,
+ num_send_frames,
+ send_cb,
+ num_recv_frames,
+ recv_cb,
+ fc_cb);
+
+ // Register with I/O service
+ send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get());
+ auto xport_req = dpdk::wait_req_alloc(
+ dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if);
+ _servq.submit(xport_req, std::chrono::microseconds(-1));
+ wait_req_put(xport_req);
+ return send_io;
+}
+
+
+int dpdk_io_service::_io_worker(void* arg)
+{
+ if (!arg)
+ return -EINVAL;
+ dpdk_io_service* srv = (dpdk_io_service*)arg;
+
+ /* Check that this is a valid lcore */
+ unsigned int lcore_id = rte_lcore_id();
+ if (lcore_id == LCORE_ID_ANY)
+ return -ENODEV;
+
+ /* Check that this lcore has ports */
+ if (srv->_ports.size() == 0)
+ return -ENODEV;
+
+ char name[16];
+ snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id);
+ rte_thread_setname(pthread_self(), name);
+ UHD_LOG_TRACE("DPDK::IO_SERVICE",
+ "I/O service thread '" << name << "' started on lcore " << lcore_id);
+
+ uhd::set_thread_priority_safe();
+
+ snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id);
+ struct rte_hash_parameters hash_params = {.name = name,
+ .entries = MAX_FLOWS,
+ .reserved = 0,
+ .key_len = sizeof(struct dpdk::ipv4_5tuple),
+ .hash_func = NULL,
+ .hash_func_init_val = 0,
+ .socket_id = uhd::narrow_cast<int>(rte_socket_id()),
+ .extra_flag = 0};
+ srv->_rx_table = rte_hash_create(&hash_params);
+ if (srv->_rx_table == NULL) {
+ return rte_errno;
+ }
+
+ int status = 0;
+ while (!status) {
+ /* For each port, attempt to receive packets and process */
+ for (auto port : srv->_ports) {
+ srv->_rx_burst(port, 0);
+ }
+ /* For each port's TX queues, do TX */
+ for (auto port : srv->_ports) {
+ srv->_tx_burst(port);
+ }
+ /* For each port's RX release queues, release buffers */
+ for (auto port : srv->_ports) {
+ srv->_rx_release(port);
+ }
+ /* Retry waking clients */
+ if (srv->_retry_head) {
+ dpdk_io_if* node = srv->_retry_head;
+ dpdk_io_if* end = srv->_retry_head->prev;
+ while (true) {
+ dpdk_io_if* next = node->next;
+ srv->_wake_client(node);
+ if (node == end) {
+ break;
+ } else {
+ node = next;
+ next = node->next;
+ }
+ }
+ }
+ /* Check for open()/close()/term() requests and service 1 at a time
+ * Leave this last so we immediately terminate if requested
+ */
+ status = srv->_service_requests();
+ }
+
+ return status;
+}
+
+int dpdk_io_service::_service_requests()
+{
+ for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) {
+ /* Dequeue */
+ dpdk::wait_req* req = _servq.pop();
+ if (!req) {
+ break;
+ }
+ switch (req->reason) {
+ case dpdk::wait_type::WAIT_SIMPLE:
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ break;
+ case dpdk::wait_type::WAIT_RX:
+ case dpdk::wait_type::WAIT_TX_BUF:
+ throw uhd::not_implemented_error(
+ "DPDK: _service_requests(): DPDK is still a WIP");
+ case dpdk::wait_type::WAIT_FLOW_OPEN:
+ _service_flow_open(req);
+ break;
+ case dpdk::wait_type::WAIT_FLOW_CLOSE:
+ _service_flow_close(req);
+ break;
+ case dpdk::wait_type::WAIT_XPORT_CONNECT:
+ _service_xport_connect(req);
+ break;
+ case dpdk::wait_type::WAIT_XPORT_DISCONNECT:
+ _service_xport_disconnect(req);
+ break;
+ case dpdk::wait_type::WAIT_ARP: {
+ assert(req->data != NULL);
+ int arp_status = _service_arp_request(req);
+ assert(arp_status != -ENOMEM);
+ if (arp_status == 0) {
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ }
+ break;
+ }
+ case dpdk::wait_type::WAIT_LCORE_TERM:
+ rte_free(_rx_table);
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ // Return a positive value to indicate we should terminate
+ return 1;
+ default:
+ UHD_LOG_ERROR(
+ "DPDK::IO_SERVICE", "Invalid reason associated with wait request");
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ break;
+ }
+ }
+ return 0;
+}
+
+void dpdk_io_service::_service_flow_open(dpdk::wait_req* req)
+{
+ auto flow_req_data = (struct dpdk_flow_data*)req->data;
+ assert(flow_req_data);
+ if (flow_req_data->is_recv) {
+ // If RX, add to RX table. Currently, nothing to do for TX.
+ struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
+ .src_ip = 0,
+ .dst_ip = flow_req_data->link->get_port()->get_ipv4(),
+ .src_port = 0,
+ .dst_port = flow_req_data->link->get_local_port()};
+ // Check the UDP port isn't in use
+ if (rte_hash_lookup(_rx_table, &ht_key) > 0) {
+ req->retval = -EADDRINUSE;
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table");
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ return;
+ }
+ // Add xport list for this UDP port
+ auto rx_entry = new std::list<dpdk_io_if*>();
+ if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table");
+ delete rx_entry;
+ req->retval = -ENOMEM;
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ return;
+ }
+ }
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+}
+
+void dpdk_io_service::_service_flow_close(dpdk::wait_req* req)
+{
+ auto flow_req_data = (struct dpdk_flow_data*)req->data;
+ assert(flow_req_data);
+ if (flow_req_data->is_recv) {
+ // If RX, remove from RX table. Currently, nothing to do for TX.
+ struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
+ .src_ip = 0,
+ .dst_ip = flow_req_data->link->get_port()->get_ipv4(),
+ .src_port = 0,
+ .dst_port = flow_req_data->link->get_local_port()};
+ std::list<dpdk_io_if*>* xport_list;
+
+ if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) {
+ UHD_ASSERT_THROW(xport_list->empty());
+ delete xport_list;
+ rte_hash_del_key(_rx_table, &ht_key);
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ return;
+ }
+ }
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+}
+
+void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req)
+{
+ auto dpdk_io = static_cast<dpdk_io_if*>(req->data);
+ UHD_ASSERT_THROW(dpdk_io);
+ auto port = dpdk_io->link->get_port();
+ if (dpdk_io->recv_cb) {
+ // Add to RX table only if have a callback.
+ struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
+ .src_ip = 0,
+ .dst_ip = port->get_ipv4(),
+ .src_port = 0,
+ .dst_port = dpdk_io->link->get_local_port()};
+ void* hash_data;
+ if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) {
+ req->retval = -ENOENT;
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table");
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ return;
+ }
+ // Add to xport list for this UDP port
+ auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
+ rx_entry->push_back(dpdk_io);
+ }
+ if (dpdk_io->is_recv) {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request...");
+ // Add to xport list for this NIC port
+ auto& xport_list = _recv_xport_map.at(port->get_port_id());
+ xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client);
+ } else {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request...");
+ dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client);
+ // Add to xport list for this NIC port
+ auto& xport_list = _tx_queues.at(port->get_port_id());
+ xport_list.push_back(send_io);
+ for (size_t i = 0; i < send_io->_num_send_frames; i++) {
+ auto buff_ptr =
+ (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release();
+ if (!buff_ptr) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE",
+ "TX mempool out of memory. Please increase dpdk_num_mbufs.");
+ break;
+ }
+ if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) {
+ rte_pktmbuf_free(buff_ptr->get_pktmbuf());
+ break;
+ }
+ send_io->_num_frames_in_use++;
+ }
+ }
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+}
+
+void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req)
+{
+ auto dpdk_io = (struct dpdk_io_if*)req->data;
+ assert(dpdk_io);
+ auto port = dpdk_io->link->get_port();
+ if (dpdk_io->recv_cb) {
+ // Remove from RX table only if have a callback.
+ struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
+ .src_ip = 0,
+ .dst_ip = port->get_ipv4(),
+ .src_port = 0,
+ .dst_port = dpdk_io->link->get_local_port()};
+ void* hash_data;
+ if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) {
+ // Remove from xport list for this UDP port
+ auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
+ rx_entry->remove(dpdk_io);
+ } else {
+ req->retval = -EINVAL;
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table");
+ }
+ }
+ if (dpdk_io->is_recv) {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request...");
+ dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client);
+ // Remove from xport list for this NIC port
+ auto& xport_list = _recv_xport_map.at(port->get_port_id());
+ xport_list.remove(recv_client);
+ while (!rte_ring_empty(recv_client->_recv_queue)) {
+ frame_buff* buff_ptr;
+ rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr);
+ dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr));
+ }
+ while (!rte_ring_empty(recv_client->_release_queue)) {
+ frame_buff* buff_ptr;
+ rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr);
+ dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr));
+ }
+ } else {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request...");
+ dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client);
+ // Remove from xport list for this NIC port
+ auto& xport_list = _tx_queues.at(port->get_port_id());
+ xport_list.remove(send_client);
+ while (!rte_ring_empty(send_client->_send_queue)) {
+ frame_buff* buff_ptr;
+ rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr);
+ dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr));
+ }
+ while (!rte_ring_empty(send_client->_buffer_queue)) {
+ frame_buff* buff_ptr;
+ rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr);
+ dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr));
+ }
+ }
+ // Now remove the node if it's on the retry list
+ if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) {
+ _retry_head = NULL;
+ } else if (_retry_head) {
+ dpdk_io_if* node = _retry_head->next;
+ while (node != _retry_head) {
+ if (node == dpdk_io) {
+ dpdk_io->prev->next = dpdk_io->next;
+ dpdk_io->next->prev = dpdk_io->prev;
+ break;
+ }
+ node = node->next;
+ }
+ }
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+}
+
+int dpdk_io_service::_service_arp_request(dpdk::wait_req* req)
+{
+ int status = 0;
+ auto arp_req_data = (struct dpdk::arp_request*)req->data;
+ dpdk::ipv4_addr dst_addr = arp_req_data->tpa;
+ auto ctx_sptr = _ctx.lock();
+ UHD_ASSERT_THROW(ctx_sptr);
+ dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port);
+ UHD_LOG_TRACE("DPDK::IO_SERVICE",
+ "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr));
+
+ rte_spinlock_lock(&port->_spinlock);
+ struct dpdk::arp_entry* entry = NULL;
+ if (port->_arp_table.count(dst_addr) == 0) {
+ entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ status = -ENOMEM;
+ goto arp_end;
+ }
+ entry = new (entry) dpdk::arp_entry();
+ entry->reqs.push_back(req);
+ port->_arp_table[dst_addr] = entry;
+ status = -EAGAIN;
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request.");
+ _send_arp_request(port, 0, arp_req_data->tpa);
+ } else {
+ entry = port->_arp_table.at(dst_addr);
+ if (is_zero_ether_addr(&entry->mac_addr)) {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE",
+ "ARP: Address in table, but not populated yet. Resending ARP request.");
+ port->_arp_table.at(dst_addr)->reqs.push_back(req);
+ status = -EAGAIN;
+ _send_arp_request(port, 0, arp_req_data->tpa);
+ } else {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table.");
+ ether_addr_copy(&entry->mac_addr, &arp_req_data->tha);
+ status = 0;
+ }
+ }
+arp_end:
+ rte_spinlock_unlock(&port->_spinlock);
+ return status;
+}
+
+int dpdk_io_service::_send_arp_request(
+ dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip)
+{
+ struct rte_mbuf* mbuf;
+ struct ether_hdr* hdr;
+ struct arp_hdr* arp_frame;
+
+ mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool());
+ if (unlikely(mbuf == NULL)) {
+ UHD_LOG_WARNING(
+ "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request");
+ return -ENOMEM;
+ }
+
+ hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
+ arp_frame = (struct arp_hdr*)&hdr[1];
+
+ memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
+ hdr->s_addr = port->get_mac_addr();
+ hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
+
+ arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER);
+ arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
+ arp_frame->arp_hln = 6;
+ arp_frame->arp_pln = 4;
+ arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST);
+ arp_frame->arp_data.arp_sha = port->get_mac_addr();
+ arp_frame->arp_data.arp_sip = port->get_ipv4();
+ memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN);
+ arp_frame->arp_data.arp_tip = ip;
+
+ mbuf->pkt_len = 42;
+ mbuf->data_len = 42;
+
+ if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) {
+ UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full");
+ rte_pktmbuf_free(mbuf);
+ return -EAGAIN;
+ }
+ return 0;
+}
+
+/* Do a burst of RX on port */
+int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue)
+{
+ struct ether_hdr* hdr;
+ char* l2_data;
+ struct rte_mbuf* bufs[RX_BURST_SIZE];
+ const uint16_t num_rx =
+ rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE);
+ if (unlikely(num_rx == 0)) {
+ return 0;
+ }
+
+ for (int buf = 0; buf < num_rx; buf++) {
+ uint64_t ol_flags = bufs[buf]->ol_flags;
+ hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*);
+ l2_data = (char*)&hdr[1];
+ switch (rte_be_to_cpu_16(hdr->ether_type)) {
+ case ETHER_TYPE_ARP:
+ _process_arp(port, queue, (struct arp_hdr*)l2_data);
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ case ETHER_TYPE_IPv4:
+ if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
+ UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum");
+ } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) {
+ UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum");
+ } else {
+ _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data);
+ }
+ break;
+ default:
+ rte_pktmbuf_free(bufs[buf]);
+ break;
+ }
+ }
+ return num_rx;
+}
+
+int dpdk_io_service::_process_arp(
+ dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame)
+{
+ uint32_t dest_ip = arp_frame->arp_data.arp_sip;
+ struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
+ UHD_LOG_TRACE("DPDK::IO_SERVICE",
+ "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> "
+ << dpdk::eth_addr_to_string(dest_addr));
+ /* Add entry to ARP table */
+ rte_spinlock_lock(&port->_spinlock);
+ struct dpdk::arp_entry* entry = NULL;
+ if (port->_arp_table.count(dest_ip) == 0) {
+ entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0);
+ if (!entry) {
+ return -ENOMEM;
+ }
+ entry = new (entry) dpdk::arp_entry();
+ ether_addr_copy(&dest_addr, &entry->mac_addr);
+ port->_arp_table[dest_ip] = entry;
+ } else {
+ entry = port->_arp_table.at(dest_ip);
+ ether_addr_copy(&dest_addr, &entry->mac_addr);
+ for (auto req : entry->reqs) {
+ auto arp_data = (struct dpdk::arp_request*)req->data;
+ ether_addr_copy(&dest_addr, &arp_data->tha);
+ while (_servq.complete(req) == -ENOBUFS)
+ ;
+ }
+ entry->reqs.clear();
+ }
+ rte_spinlock_unlock(&port->_spinlock);
+
+ /* Respond if this was an ARP request */
+ if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST)
+ && arp_frame->arp_data.arp_tip == port->get_ipv4()) {
+ UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply.");
+ port->_arp_reply(queue_id, arp_frame);
+ }
+
+ return 0;
+}
+
+int dpdk_io_service::_process_ipv4(
+ dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt)
+{
+ bool bcast = port->dst_is_broadcast(pkt->dst_addr);
+ if (pkt->dst_addr != port->get_ipv4() && !bcast) {
+ rte_pktmbuf_free(mbuf);
+ return -ENODEV;
+ }
+ if (pkt->next_proto_id == IPPROTO_UDP) {
+ return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast);
+ }
+ rte_pktmbuf_free(mbuf);
+ return -EINVAL;
+}
+
+
+int dpdk_io_service::_process_udp(
+ dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/)
+{
+ // Get the link
+ struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
+ .src_ip = 0,
+ .dst_ip = port->get_ipv4(),
+ .src_port = 0,
+ .dst_port = pkt->dst_port};
+ void* hash_data;
+ if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) {
+ UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table");
+ rte_pktmbuf_free(mbuf);
+ return -ENOENT;
+ }
+ // Get xport list for this UDP port
+ auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
+ if (rx_entry->empty()) {
+ UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link");
+ rte_pktmbuf_free(mbuf);
+ return -ENOENT;
+ }
+ // Turn rte_mbuf -> dpdk_frame_buff
+ auto link = rx_entry->front()->link;
+ link->enqueue_recv_mbuf(mbuf);
+ auto buff = link->get_recv_buff(0);
+ bool rcvr_found = false;
+ for (auto client_if : *rx_entry) {
+ // Check all the muxed receivers...
+ if (client_if->recv_cb(buff, link, link)) {
+ rcvr_found = true;
+ if (buff) {
+ assert(client_if->is_recv);
+ auto recv_io = (dpdk_recv_io*)client_if->io_client;
+ auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release();
+ if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) {
+ rte_pktmbuf_free(buff_ptr->get_pktmbuf());
+ UHD_LOG_WARNING(
+ "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue");
+ } else {
+ recv_io->_num_frames_in_use++;
+ assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames);
+ _wake_client(client_if);
+ }
+ }
+ break;
+ }
+ }
+ if (!rcvr_found) {
+ UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found");
+ // Release the buffer if no receiver found
+ link->release_recv_buff(std::move(buff));
+ return -ENOENT;
+ }
+ return 0;
+}
+
+/* Do a burst of TX on port's tx queues */
+int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port)
+{
+ unsigned int total_tx = 0;
+ auto& queues = _tx_queues.at(port->get_port_id());
+
+ for (auto& send_io : queues) {
+ unsigned int num_tx = rte_ring_count(send_io->_send_queue);
+ num_tx = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE;
+ bool replaced_buffers = false;
+ for (unsigned int i = 0; i < num_tx; i++) {
+ size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size();
+ if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) {
+ break;
+ }
+ dpdk::dpdk_frame_buff* buff_ptr;
+ int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr);
+ if (status) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual");
+ break;
+ }
+ send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link);
+ // Attempt to replace buffer
+ buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0)
+ .release();
+ if (!buff_ptr) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE",
+ "TX mempool out of memory. Please increase dpdk_num_mbufs.");
+ send_io->_num_frames_in_use--;
+ } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) {
+ rte_pktmbuf_free(buff_ptr->get_pktmbuf());
+ send_io->_num_frames_in_use--;
+ } else {
+ replaced_buffers = true;
+ }
+ }
+ if (replaced_buffers) {
+ _wake_client(&send_io->_dpdk_io_if);
+ }
+ total_tx += num_tx;
+ }
+
+ return total_tx;
+}
+
+int dpdk_io_service::_rx_release(dpdk::dpdk_port* port)
+{
+ unsigned int total_bufs = 0;
+ auto& queues = _recv_xport_map.at(port->get_port_id());
+
+ for (auto& recv_io : queues) {
+ unsigned int num_buf = rte_ring_count(recv_io->_release_queue);
+ num_buf = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE;
+ for (unsigned int i = 0; i < num_buf; i++) {
+ dpdk::dpdk_frame_buff* buff_ptr;
+ int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr);
+ if (status) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual");
+ break;
+ }
+ recv_io->_fc_cb(frame_buff::uptr(buff_ptr),
+ recv_io->_dpdk_io_if.link,
+ recv_io->_dpdk_io_if.link);
+ recv_io->_num_frames_in_use--;
+ }
+ total_bufs += num_buf;
+ }
+
+ return total_bufs;
+}
+
+uint16_t dpdk_io_service::_get_unique_client_id()
+{
+ std::lock_guard<std::mutex> lock(_mutex);
+ if (_client_id_set.size() >= MAX_CLIENTS) {
+ UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients");
+ throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients");
+ }
+
+ uint16_t id = _next_client_id++;
+ while (_client_id_set.count(id)) {
+ id = _next_client_id++;
+ }
+ _client_id_set.insert(id);
+ return id;
+}
+
+void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io)
+{
+ dpdk::wait_req* req;
+ if (dpdk_io->is_recv) {
+ auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client);
+ req = recv_io->_waiter;
+ } else {
+ auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client);
+ req = send_io->_waiter;
+ }
+ bool stat = req->mutex.try_lock();
+ if (stat) {
+ bool active_req = !req->complete;
+ if (dpdk_io->next) {
+ // On the list: Take it off
+ if (dpdk_io->next == dpdk_io) {
+ // Only node on the list
+ _retry_head = NULL;
+ } else {
+ // Other nodes are on the list
+ if (_retry_head == dpdk_io) {
+ // Move list head to next
+ _retry_head = dpdk_io->next;
+ }
+ dpdk_io->next->prev = dpdk_io->prev;
+ dpdk_io->prev->next = dpdk_io->next;
+ }
+ dpdk_io->next = NULL;
+ dpdk_io->prev = NULL;
+ }
+ if (active_req) {
+ req->complete = true;
+ req->cond.notify_one();
+ }
+ req->mutex.unlock();
+ if (active_req) {
+ wait_req_put(req);
+ }
+ } else {
+ // Put on the retry list, if it isn't already
+ if (!dpdk_io->next) {
+ if (_retry_head) {
+ dpdk_io->next = _retry_head;
+ dpdk_io->prev = _retry_head->prev;
+ _retry_head->prev->next = dpdk_io;
+ _retry_head->prev = dpdk_io;
+ } else {
+ _retry_head = dpdk_io;
+ dpdk_io->next = dpdk_io;
+ dpdk_io->prev = dpdk_io;
+ }
+ }
+ }
+}
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index 8e58dd591..2c53e4905 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -124,7 +124,10 @@ if(ENABLE_DPDK)
${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp
${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp
${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/adapter.cpp
${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp
INCLUDE_DIRS
${DPDK_INCLUDE_DIR}
EXTRA_LIBS ${DPDK_LIBRARIES}
@@ -132,6 +135,8 @@ if(ENABLE_DPDK)
)
set_source_files_properties(
${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp
PROPERTIES COMPILE_FLAGS "-march=native -D_GNU_SOURCE"
)
ENDIF(ENABLE_DPDK)
diff --git a/host/tests/dpdk_port_test.cpp b/host/tests/dpdk_port_test.cpp
index 7ef386c52..8f6b20c34 100644
--- a/host/tests/dpdk_port_test.cpp
+++ b/host/tests/dpdk_port_test.cpp
@@ -6,6 +6,8 @@
#include <uhdlib/transport/dpdk/common.hpp>
#include <uhdlib/transport/dpdk/service_queue.hpp>
+#include <uhdlib/transport/dpdk_io_service.hpp>
+#include <uhdlib/transport/udp_dpdk_link.hpp>
#include <boost/program_options.hpp>
#include <iostream>
#include <memory>
@@ -101,6 +103,46 @@ int main(int argc, char **argv)
service_thread.join();
std::cout << "PASS: Service thread terminated" << std::endl;
delete queue;
+
+ std::cout << "Starting up ARP thread..." << std::endl;
+ std::vector<uhd::transport::dpdk::dpdk_port*> ports;
+ ports.push_back(port);
+ //auto io_srv = uhd::transport::dpdk_io_service::make(1, ports, 16);
+ auto io_srv = ctx->get_io_service(1);
+
+ // Create link
+ std::cout << "Creating UDP link..." << std::endl;
+ uhd::transport::link_params_t params;
+ params.recv_frame_size = 8000;
+ params.send_frame_size = 8000;
+ params.num_recv_frames = 511;
+ params.num_send_frames = 511;
+ params.recv_buff_size = params.recv_frame_size*params.num_recv_frames;
+ params.send_buff_size = params.send_frame_size*params.num_send_frames;
+ auto link = uhd::transport::udp_dpdk_link::make("192.168.10.2", "49600", params);
+
+ // Attach link
+ std::cout << "Attaching UDP send link..." << std::endl;
+ io_srv->attach_send_link(link);
+ struct ether_addr dest_mac;
+ link->get_remote_mac(dest_mac);
+ char mac_str[20];
+ ether_format_addr(mac_str, 20, &dest_mac);
+ std::cout << "Remote MAC address is " << mac_str << std::endl;
+ std::cout << std::endl;
+ std::cout << "Attaching UDP recv link..." << std::endl;
+ io_srv->attach_recv_link(link);
+ std::cout << "Press any key to quit..." << std::endl;
+ std::cin.get();
+
+ // Shut down
+ std::cout << "Detaching UDP send link..." << std::endl;
+ io_srv->detach_send_link(link);
+ std::cout << "Detaching UDP recv link..." << std::endl;
+ io_srv->detach_recv_link(link);
+ std::cout << "Shutting down I/O service..." << std::endl;
+ io_srv.reset();
+ std::cout << "Shutting down context..." << std::endl;
ctx.reset();
return 0;
}