aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include
diff options
context:
space:
mode:
authorAlex Williams <alex.williams@ni.com>2019-11-20 17:44:40 -0800
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:34 -0800
commit911ca16adc9222be11d1cbf134bf547daa16806f (patch)
tree00041fd25acc0258a3690f8ed48194d63f518382 /host/lib/include
parentc730d7ba202286bfce3ee180df413486411e0b72 (diff)
downloaduhd-911ca16adc9222be11d1cbf134bf547daa16806f.tar.gz
uhd-911ca16adc9222be11d1cbf134bf547daa16806f.tar.bz2
uhd-911ca16adc9222be11d1cbf134bf547daa16806f.zip
lib: Add DPDK service queue
This is a data structure intended for use by the DPDK I/O service. It uses DPDK's lockless ring in multi-producer, single-consumer mode to allow clients to submit requests to the DPDK I/O service's worker thread. Clients can specify a timeout for the requests to be fulfilled.
Diffstat (limited to 'host/lib/include')
-rw-r--r--host/lib/include/uhdlib/transport/dpdk/service_queue.hpp237
1 files changed, 237 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp
new file mode 100644
index 000000000..7c9917079
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp
@@ -0,0 +1,237 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_
+#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <condition_variable>
+#include <rte_malloc.h>
+#include <rte_ring.h>
+#include <chrono>
+#include <memory>
+#include <mutex>
+
+namespace uhd { namespace transport { namespace dpdk {
+
+enum wait_type {
+ //! Wake immediately
+ WAIT_SIMPLE,
+ //! Wake upon receiving an RX packet
+ WAIT_RX,
+ //! Wake when a TX buffer becomes available
+ WAIT_TX_BUF,
+ //! Wake once the new flow/socket is created
+ WAIT_FLOW_OPEN,
+ //! Wake once the flow/socket is destroyed
+ WAIT_FLOW_CLOSE,
+ //! Number of possible reasons for waiting
+ WAIT_TYPE_COUNT
+};
+
+/*!
+ * Aggregate representing a request for the DPDK I/O service and a corresponding
+ * entry in the wait queue
+ *
+ * This class is managed with explicit reference counting.
+ */
+struct wait_req
+{
+ //! The reason we're waiting (and service request associated with it)
+ enum wait_type reason;
+ //! The data associated with the service request (unmanaged by wait_req)
+ void* data;
+ //! A condition variable for waiting on the request
+ std::condition_variable cond;
+ //! The mutex associated with the condition variable
+ std::mutex mutex;
+ //! Whether the request was completed
+ bool complete;
+ //! An atomic reference counter for managing this request object's memory
+ rte_atomic32_t refcnt;
+};
+
+/*!
+ * Create a wait request
+ *
+ * \param t The reason for the wait or the service to be performed
+ * \param priv_data The data associated with this wait type
+ */
+inline wait_req* wait_req_alloc(wait_type t, void* priv_data)
+{
+ wait_req* req = (wait_req*)rte_zmalloc(NULL, sizeof(*req), 0);
+ if (!req) {
+ return NULL;
+ }
+ req = new (req) wait_req();
+ req->reason = t;
+ req->data = priv_data;
+ rte_atomic32_set(&req->refcnt, 1);
+ return req;
+}
+
+/*!
+ * Release a reference on the wait request.
+ * Frees resources if the refcnt drops to 0.
+ *
+ * \param req The wait request upon which to decrement refcnt
+ */
+inline void wait_req_put(wait_req* req)
+{
+ if (rte_atomic32_dec_and_test(&req->refcnt)) {
+ rte_free(req);
+ }
+}
+
+/*!
+ * Get a reference to the wait request.
+ * Increments the refcnt of the wait request
+ *
+ * \param req The wait request upon which to increment refcnt
+ */
+inline void wait_req_get(wait_req* req)
+{
+ rte_atomic32_inc(&req->refcnt);
+}
+
+
+/*!
+ * A service queue to funnel requests from requesters to a single servicer.
+ *
+ * The DPDK I/O service uses this queue to process threads waiting for various
+ * reasons, such as creating/destroying a packet flow, receiving a packet, or
+ * getting a buffer for TX.
+ */
+class service_queue
+{
+public:
+ /*!
+ * Create a service queue
+ * \param depth Must be a power of 2
+ * \param lcore_id The DPDK lcore_id associated with this service queue
+ */
+ service_queue(size_t depth, unsigned int lcore_id)
+ {
+ std::string name = std::string("servq") + std::to_string(lcore_id);
+ _waiter_ring = rte_ring_create(
+ name.c_str(), depth, rte_lcore_to_socket_id(lcore_id), RING_F_SC_DEQ);
+ if (!_waiter_ring) {
+ throw uhd::runtime_error("DPDK: Failed to create the service queue");
+ }
+ }
+
+ ~service_queue()
+ {
+ rte_ring_free(_waiter_ring);
+ }
+
+ /*!
+ * Submit a wait/service request to the service queue
+ * Negative timeouts indicate to block indefinitely
+ *
+ * This is the only function intended for the requester
+ *
+ * \param req The wait request to wait on
+ * \param timeout How long to wait, with negative values indicating indefinitely
+ * \return 0 for no timeout, -ETIMEDOUT if there was a timeout
+ */
+ int submit(wait_req* req, std::chrono::microseconds timeout)
+ {
+ auto timeout_point = std::chrono::steady_clock::now() + timeout;
+ std::unique_lock<std::mutex> lock(req->mutex);
+ /* Get a reference here, to be consumed by other thread (handshake) */
+ wait_req_get(req);
+ req->complete = false;
+ if (rte_ring_enqueue(_waiter_ring, req)) {
+ wait_req_put(req);
+ return -ENOBUFS;
+ }
+ auto is_complete = [req] { return req->complete; };
+ if (timeout < std::chrono::microseconds(0)) {
+ req->cond.wait(lock, is_complete);
+ } else {
+ auto status = req->cond.wait_until(lock, timeout_point, is_complete);
+ if (!status) {
+ return -ETIMEDOUT;
+ }
+ }
+ return 0;
+ }
+
+ /*!
+ * Pop off the next request from the queue
+ *
+ * This should only be called by the servicer
+ * \return A pointer to the next wait request, else NULL
+ */
+ wait_req* pop()
+ {
+ wait_req* req;
+ if (rte_ring_dequeue(_waiter_ring, (void**)&req)) {
+ return NULL;
+ }
+ return req;
+ }
+
+ /*!
+ * Attempt to requeue a request to the service queue
+ *
+ * This should only be called by the servicer
+ * \param A pointer to the wait request to requeue
+ * \return 0 for success, -ENOBUFS if there was no space in the queue
+ */
+ int requeue(wait_req* req)
+ {
+ if (rte_ring_enqueue(_waiter_ring, req)) {
+ return -ENOBUFS;
+ }
+ return 0;
+ }
+
+ /*!
+ * Attempt to wake the waiter for this request
+ * If successful, decrement the reference count on the wait request.
+ * If the waiter could not be woken up, attempt to requeue the request and
+ * change its type to WAIT_SIMPLE.
+ *
+ * This should only be called by the servicer.
+ *
+ * \param req The wait request with the waiter to wake
+ * \return 0 if successful, -EAGAIN if the waiter was requeued, -ENOBUFS
+ * if the waiter could not be requeued
+ */
+ int complete(wait_req* req)
+ {
+ // Grabbing the mutex only to avoid this sequence:
+ // A: Enqueue wait request
+ // B: Pull wait request and satisfy
+ // B: notify() on the condition variable
+ // A: wait_until() on the condition variable, possibly indefinitely...
+ bool stat = req->mutex.try_lock();
+ if (!stat) {
+ if (rte_ring_enqueue(_waiter_ring, req)) {
+ UHD_LOG_WARNING("DPDK", "Could not lock wait_req mutex or requeue");
+ return -ENOBUFS;
+ } else {
+ req->reason = WAIT_SIMPLE;
+ return -EAGAIN;
+ }
+ }
+ req->complete = true;
+ req->cond.notify_one();
+ req->mutex.unlock();
+ wait_req_put(req);
+ return stat;
+ }
+
+private:
+ //! Multi-producer, single-consumer ring for requests
+ struct rte_ring* _waiter_ring;
+};
+
+}}} // namespace uhd::transport::dpdk
+
+#endif /*_INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ */