diff options
Diffstat (limited to 'host/lib/include/uhdlib')
-rw-r--r-- | host/lib/include/uhdlib/transport/dpdk/service_queue.hpp | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp new file mode 100644 index 000000000..7c9917079 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp @@ -0,0 +1,237 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <condition_variable> +#include <rte_malloc.h> +#include <rte_ring.h> +#include <chrono> +#include <memory> +#include <mutex> + +namespace uhd { namespace transport { namespace dpdk { + +enum wait_type { + //! Wake immediately + WAIT_SIMPLE, + //! Wake upon receiving an RX packet + WAIT_RX, + //! Wake when a TX buffer becomes available + WAIT_TX_BUF, + //! Wake once the new flow/socket is created + WAIT_FLOW_OPEN, + //! Wake once the flow/socket is destroyed + WAIT_FLOW_CLOSE, + //! Number of possible reasons for waiting + WAIT_TYPE_COUNT +}; + +/*! + * Aggregate representing a request for the DPDK I/O service and a corresponding + * entry in the wait queue + * + * This class is managed with explicit reference counting. + */ +struct wait_req +{ + //! The reason we're waiting (and service request associated with it) + enum wait_type reason; + //! The data associated with the service request (unmanaged by wait_req) + void* data; + //! A condition variable for waiting on the request + std::condition_variable cond; + //! The mutex associated with the condition variable + std::mutex mutex; + //! Whether the request was completed + bool complete; + //! An atomic reference counter for managing this request object's memory + rte_atomic32_t refcnt; +}; + +/*! + * Create a wait request + * + * \param t The reason for the wait or the service to be performed + * \param priv_data The data associated with this wait type + */ +inline wait_req* wait_req_alloc(wait_type t, void* priv_data) +{ + wait_req* req = (wait_req*)rte_zmalloc(NULL, sizeof(*req), 0); + if (!req) { + return NULL; + } + req = new (req) wait_req(); + req->reason = t; + req->data = priv_data; + rte_atomic32_set(&req->refcnt, 1); + return req; +} + +/*! + * Release a reference on the wait request. + * Frees resources if the refcnt drops to 0. + * + * \param req The wait request upon which to decrement refcnt + */ +inline void wait_req_put(wait_req* req) +{ + if (rte_atomic32_dec_and_test(&req->refcnt)) { + rte_free(req); + } +} + +/*! + * Get a reference to the wait request. + * Increments the refcnt of the wait request + * + * \param req The wait request upon which to increment refcnt + */ +inline void wait_req_get(wait_req* req) +{ + rte_atomic32_inc(&req->refcnt); +} + + +/*! + * A service queue to funnel requests from requesters to a single servicer. + * + * The DPDK I/O service uses this queue to process threads waiting for various + * reasons, such as creating/destroying a packet flow, receiving a packet, or + * getting a buffer for TX. + */ +class service_queue +{ +public: + /*! + * Create a service queue + * \param depth Must be a power of 2 + * \param lcore_id The DPDK lcore_id associated with this service queue + */ + service_queue(size_t depth, unsigned int lcore_id) + { + std::string name = std::string("servq") + std::to_string(lcore_id); + _waiter_ring = rte_ring_create( + name.c_str(), depth, rte_lcore_to_socket_id(lcore_id), RING_F_SC_DEQ); + if (!_waiter_ring) { + throw uhd::runtime_error("DPDK: Failed to create the service queue"); + } + } + + ~service_queue() + { + rte_ring_free(_waiter_ring); + } + + /*! + * Submit a wait/service request to the service queue + * Negative timeouts indicate to block indefinitely + * + * This is the only function intended for the requester + * + * \param req The wait request to wait on + * \param timeout How long to wait, with negative values indicating indefinitely + * \return 0 for no timeout, -ETIMEDOUT if there was a timeout + */ + int submit(wait_req* req, std::chrono::microseconds timeout) + { + auto timeout_point = std::chrono::steady_clock::now() + timeout; + std::unique_lock<std::mutex> lock(req->mutex); + /* Get a reference here, to be consumed by other thread (handshake) */ + wait_req_get(req); + req->complete = false; + if (rte_ring_enqueue(_waiter_ring, req)) { + wait_req_put(req); + return -ENOBUFS; + } + auto is_complete = [req] { return req->complete; }; + if (timeout < std::chrono::microseconds(0)) { + req->cond.wait(lock, is_complete); + } else { + auto status = req->cond.wait_until(lock, timeout_point, is_complete); + if (!status) { + return -ETIMEDOUT; + } + } + return 0; + } + + /*! + * Pop off the next request from the queue + * + * This should only be called by the servicer + * \return A pointer to the next wait request, else NULL + */ + wait_req* pop() + { + wait_req* req; + if (rte_ring_dequeue(_waiter_ring, (void**)&req)) { + return NULL; + } + return req; + } + + /*! + * Attempt to requeue a request to the service queue + * + * This should only be called by the servicer + * \param A pointer to the wait request to requeue + * \return 0 for success, -ENOBUFS if there was no space in the queue + */ + int requeue(wait_req* req) + { + if (rte_ring_enqueue(_waiter_ring, req)) { + return -ENOBUFS; + } + return 0; + } + + /*! + * Attempt to wake the waiter for this request + * If successful, decrement the reference count on the wait request. + * If the waiter could not be woken up, attempt to requeue the request and + * change its type to WAIT_SIMPLE. + * + * This should only be called by the servicer. + * + * \param req The wait request with the waiter to wake + * \return 0 if successful, -EAGAIN if the waiter was requeued, -ENOBUFS + * if the waiter could not be requeued + */ + int complete(wait_req* req) + { + // Grabbing the mutex only to avoid this sequence: + // A: Enqueue wait request + // B: Pull wait request and satisfy + // B: notify() on the condition variable + // A: wait_until() on the condition variable, possibly indefinitely... + bool stat = req->mutex.try_lock(); + if (!stat) { + if (rte_ring_enqueue(_waiter_ring, req)) { + UHD_LOG_WARNING("DPDK", "Could not lock wait_req mutex or requeue"); + return -ENOBUFS; + } else { + req->reason = WAIT_SIMPLE; + return -EAGAIN; + } + } + req->complete = true; + req->cond.notify_one(); + req->mutex.unlock(); + wait_req_put(req); + return stat; + } + +private: + //! Multi-producer, single-consumer ring for requests + struct rte_ring* _waiter_ring; +}; + +}}} // namespace uhd::transport::dpdk + +#endif /*_INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ */ |