diff options
| author | Alex Williams <alex.williams@ni.com> | 2019-11-20 17:44:40 -0800 | 
|---|---|---|
| committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:34 -0800 | 
| commit | 911ca16adc9222be11d1cbf134bf547daa16806f (patch) | |
| tree | 00041fd25acc0258a3690f8ed48194d63f518382 /host/lib/include/uhdlib/transport/dpdk | |
| parent | c730d7ba202286bfce3ee180df413486411e0b72 (diff) | |
| download | uhd-911ca16adc9222be11d1cbf134bf547daa16806f.tar.gz uhd-911ca16adc9222be11d1cbf134bf547daa16806f.tar.bz2 uhd-911ca16adc9222be11d1cbf134bf547daa16806f.zip  | |
lib: Add DPDK service queue
This is a data structure intended for use by the DPDK I/O service.
It uses DPDK's lockless ring in multi-producer, single-consumer mode
to allow clients to submit requests to the DPDK I/O service's worker
thread. Clients can specify a timeout for the requests to be fulfilled.
Diffstat (limited to 'host/lib/include/uhdlib/transport/dpdk')
| -rw-r--r-- | host/lib/include/uhdlib/transport/dpdk/service_queue.hpp | 237 | 
1 files changed, 237 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp new file mode 100644 index 000000000..7c9917079 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp @@ -0,0 +1,237 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <condition_variable> +#include <rte_malloc.h> +#include <rte_ring.h> +#include <chrono> +#include <memory> +#include <mutex> + +namespace uhd { namespace transport { namespace dpdk { + +enum wait_type { +    //! Wake immediately +    WAIT_SIMPLE, +    //! Wake upon receiving an RX packet +    WAIT_RX, +    //! Wake when a TX buffer becomes available +    WAIT_TX_BUF, +    //! Wake once the new flow/socket is created +    WAIT_FLOW_OPEN, +    //! Wake once the flow/socket is destroyed +    WAIT_FLOW_CLOSE, +    //! Number of possible reasons for waiting +    WAIT_TYPE_COUNT +}; + +/*! + * Aggregate representing a request for the DPDK I/O service and a corresponding + * entry in the wait queue + * + * This class is managed with explicit reference counting. + */ +struct wait_req +{ +    //! The reason we're waiting (and service request associated with it) +    enum wait_type reason; +    //! The data associated with the service request (unmanaged by wait_req) +    void* data; +    //! A condition variable for waiting on the request +    std::condition_variable cond; +    //! The mutex associated with the condition variable +    std::mutex mutex; +    //! Whether the request was completed +    bool complete; +    //! An atomic reference counter for managing this request object's memory +    rte_atomic32_t refcnt; +}; + +/*! + * Create a wait request + * + * \param t The reason for the wait or the service to be performed + * \param priv_data The data associated with this wait type + */ +inline wait_req* wait_req_alloc(wait_type t, void* priv_data) +{ +    wait_req* req = (wait_req*)rte_zmalloc(NULL, sizeof(*req), 0); +    if (!req) { +        return NULL; +    } +    req         = new (req) wait_req(); +    req->reason = t; +    req->data   = priv_data; +    rte_atomic32_set(&req->refcnt, 1); +    return req; +} + +/*! + * Release a reference on the wait request. + * Frees resources if the refcnt drops to 0. + * + * \param req The wait request upon which to decrement refcnt + */ +inline void wait_req_put(wait_req* req) +{ +    if (rte_atomic32_dec_and_test(&req->refcnt)) { +        rte_free(req); +    } +} + +/*! + * Get a reference to the wait request. + * Increments the refcnt of the wait request + * + * \param req The wait request upon which to increment refcnt + */ +inline void wait_req_get(wait_req* req) +{ +    rte_atomic32_inc(&req->refcnt); +} + + +/*! + * A service queue to funnel requests from requesters to a single servicer. + * + * The DPDK I/O service uses this queue to process threads waiting for various + * reasons, such as creating/destroying a packet flow, receiving a packet, or + * getting a buffer for TX. + */ +class service_queue +{ +public: +    /*! +     * Create a service queue +     * \param depth Must be a power of 2 +     * \param lcore_id The DPDK lcore_id associated with this service queue +     */ +    service_queue(size_t depth, unsigned int lcore_id) +    { +        std::string name = std::string("servq") + std::to_string(lcore_id); +        _waiter_ring     = rte_ring_create( +            name.c_str(), depth, rte_lcore_to_socket_id(lcore_id), RING_F_SC_DEQ); +        if (!_waiter_ring) { +            throw uhd::runtime_error("DPDK: Failed to create the service queue"); +        } +    } + +    ~service_queue() +    { +        rte_ring_free(_waiter_ring); +    } + +    /*! +     * Submit a wait/service request to the service queue +     * Negative timeouts indicate to block indefinitely +     * +     * This is the only function intended for the requester +     * +     * \param req The wait request to wait on +     * \param timeout How long to wait, with negative values indicating indefinitely +     * \return 0 for no timeout, -ETIMEDOUT if there was a timeout +     */ +    int submit(wait_req* req, std::chrono::microseconds timeout) +    { +        auto timeout_point = std::chrono::steady_clock::now() + timeout; +        std::unique_lock<std::mutex> lock(req->mutex); +        /* Get a reference here, to be consumed by other thread (handshake) */ +        wait_req_get(req); +        req->complete = false; +        if (rte_ring_enqueue(_waiter_ring, req)) { +            wait_req_put(req); +            return -ENOBUFS; +        } +        auto is_complete = [req] { return req->complete; }; +        if (timeout < std::chrono::microseconds(0)) { +            req->cond.wait(lock, is_complete); +        } else { +            auto status = req->cond.wait_until(lock, timeout_point, is_complete); +            if (!status) { +                return -ETIMEDOUT; +            } +        } +        return 0; +    } + +    /*! +     * Pop off the next request from the queue +     * +     * This should only be called by the servicer +     * \return A pointer to the next wait request, else NULL +     */ +    wait_req* pop() +    { +        wait_req* req; +        if (rte_ring_dequeue(_waiter_ring, (void**)&req)) { +            return NULL; +        } +        return req; +    } + +    /*! +     * Attempt to requeue a request to the service queue +     * +     * This should only be called by the servicer +     * \param A pointer to the wait request to requeue +     * \return 0 for success, -ENOBUFS if there was no space in the queue +     */ +    int requeue(wait_req* req) +    { +        if (rte_ring_enqueue(_waiter_ring, req)) { +            return -ENOBUFS; +        } +        return 0; +    } + +    /*! +     * Attempt to wake the waiter for this request +     * If successful, decrement the reference count on the wait request. +     * If the waiter could not be woken up, attempt to requeue the request and +     * change its type to WAIT_SIMPLE. +     * +     * This should only be called by the servicer. +     * +     * \param req The wait request with the waiter to wake +     * \return 0 if successful, -EAGAIN if the waiter was requeued, -ENOBUFS +     *         if the waiter could not be requeued +     */ +    int complete(wait_req* req) +    { +        // Grabbing the mutex only to avoid this sequence: +        // A: Enqueue wait request +        // B: Pull wait request and satisfy +        // B: notify() on the condition variable +        // A: wait_until() on the condition variable, possibly indefinitely... +        bool stat = req->mutex.try_lock(); +        if (!stat) { +            if (rte_ring_enqueue(_waiter_ring, req)) { +                UHD_LOG_WARNING("DPDK", "Could not lock wait_req mutex or requeue"); +                return -ENOBUFS; +            } else { +                req->reason = WAIT_SIMPLE; +                return -EAGAIN; +            } +        } +        req->complete = true; +        req->cond.notify_one(); +        req->mutex.unlock(); +        wait_req_put(req); +        return stat; +    } + +private: +    //! Multi-producer, single-consumer ring for requests +    struct rte_ring* _waiter_ring; +}; + +}}} // namespace uhd::transport::dpdk + +#endif /*_INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ */  | 
