// // Copyright 2019 Ettus Research, a National Instruments brand // // SPDX-License-Identifier: GPL-3.0-or-later // #ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ #define _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ #include #include #include #include #include #include #include #include namespace uhd { namespace transport { namespace dpdk { enum wait_type { //! Wake immediately WAIT_SIMPLE, //! Wake upon receiving an RX packet WAIT_RX, //! Wake when a TX buffer becomes available WAIT_TX_BUF, //! Wake once the new flow/socket is created WAIT_FLOW_OPEN, //! Wake once the flow/socket is destroyed WAIT_FLOW_CLOSE, //! Wake once the new transport is connected WAIT_XPORT_CONNECT, //! Wake once the transport is disconnected WAIT_XPORT_DISCONNECT, //! Wake when MAC address found for IP address WAIT_ARP, //! Wake once the I/O worker terminates WAIT_LCORE_TERM, //! Number of possible reasons for waiting WAIT_TYPE_COUNT }; /*! * Aggregate representing a request for the DPDK I/O service and a corresponding * entry in the wait queue * * This class is managed with explicit reference counting. */ struct wait_req { //! The reason we're waiting (and service request associated with it) enum wait_type reason; //! The data associated with the service request (unmanaged by wait_req) void* data; //! A condition variable for waiting on the request std::condition_variable cond; //! The mutex associated with the condition variable std::mutex mutex; //! Whether the request was completed bool complete; //! The status or error code associated with the request int retval; //! An atomic reference counter for managing this request object's memory rte_atomic32_t refcnt; }; /*! * Create a wait request * * \param t The reason for the wait or the service to be performed * \param priv_data The data associated with this wait type */ inline wait_req* wait_req_alloc(wait_type t, void* priv_data) { wait_req* req = (wait_req*)rte_zmalloc(NULL, sizeof(*req), 0); if (!req) { return NULL; } req = new (req) wait_req(); req->reason = t; req->data = priv_data; rte_atomic32_set(&req->refcnt, 1); return req; } /*! * Release a reference on the wait request. * Frees resources if the refcnt drops to 0. * * \param req The wait request upon which to decrement refcnt */ inline void wait_req_put(wait_req* req) { if (rte_atomic32_dec_and_test(&req->refcnt)) { rte_free(req); } } /*! * Get a reference to the wait request. * Increments the refcnt of the wait request * * \param req The wait request upon which to increment refcnt */ inline void wait_req_get(wait_req* req) { rte_atomic32_inc(&req->refcnt); } /*! * A service queue to funnel requests from requesters to a single servicer. * * The DPDK I/O service uses this queue to process threads waiting for various * reasons, such as creating/destroying a packet flow, receiving a packet, or * getting a buffer for TX. */ class service_queue { public: /*! * Create a service queue * \param depth Must be a power of 2. Actual size is less. * \param lcore_id The DPDK lcore_id associated with this service queue */ service_queue(size_t depth, unsigned int lcore_id) { std::string name = std::string("servq") + std::to_string(lcore_id); _waiter_ring = rte_ring_create( name.c_str(), depth, rte_lcore_to_socket_id(lcore_id), RING_F_SC_DEQ); if (!_waiter_ring) { throw uhd::runtime_error("DPDK: Failed to create the service queue"); } } ~service_queue() { rte_ring_free(_waiter_ring); } /*! * Submit a wait/service request to the service queue * Negative timeouts indicate to block indefinitely * * This is the only function intended for the requester * * \param req The wait request to wait on * \param timeout How long to wait, with negative values indicating indefinitely * \return 0 for no timeout, -ETIMEDOUT if there was a timeout */ int submit(wait_req* req, std::chrono::microseconds timeout) { auto timeout_point = std::chrono::steady_clock::now() + timeout; std::unique_lock lock(req->mutex); /* Get a reference here, to be consumed by other thread (handshake) */ wait_req_get(req); req->complete = false; if (rte_ring_enqueue(_waiter_ring, req)) { wait_req_put(req); return -ENOBUFS; } auto is_complete = [req] { return req->complete; }; if (timeout < std::chrono::microseconds(0)) { req->cond.wait(lock, is_complete); } else { auto status = req->cond.wait_until(lock, timeout_point, is_complete); if (!status) { return -ETIMEDOUT; } } return 0; } /*! * Pop off the next request from the queue * * This should only be called by the servicer * \return A pointer to the next wait request, else NULL */ wait_req* pop() { wait_req* req; if (rte_ring_dequeue(_waiter_ring, (void**)&req)) { return NULL; } return req; } /*! * Attempt to requeue a request to the service queue * * This should only be called by the servicer * \param A pointer to the wait request to requeue * \return 0 for success, -ENOBUFS if there was no space in the queue */ int requeue(wait_req* req) { if (rte_ring_enqueue(_waiter_ring, req)) { return -ENOBUFS; } return 0; } /*! * Attempt to wake the waiter for this request * If successful, decrement the reference count on the wait request. * If the waiter could not be woken up, attempt to requeue the request and * change its type to WAIT_SIMPLE. * * This should only be called by the servicer. * * \param req The wait request with the waiter to wake * \return 0 if successful, -EAGAIN if the waiter was requeued, -ENOBUFS * if the waiter could not be requeued */ int complete(wait_req* req) { // Grabbing the mutex only to avoid this sequence: // A: Enqueue wait request // B: Pull wait request and satisfy // B: notify() on the condition variable // A: wait_until() on the condition variable, possibly indefinitely... bool stat = req->mutex.try_lock(); if (!stat) { if (rte_ring_enqueue(_waiter_ring, req)) { UHD_LOG_WARNING("DPDK", "Could not lock wait_req mutex or requeue"); return -ENOBUFS; } else { req->reason = WAIT_SIMPLE; return -EAGAIN; } } req->complete = true; req->cond.notify_one(); req->mutex.unlock(); wait_req_put(req); return stat; } /*! * Get the size of the service queue * \return size of the service queue */ inline size_t get_size() { return rte_ring_get_size(_waiter_ring); } /*! * Get the capacity of the service queue * \return capacity of the service queue */ inline size_t get_capacity() { return rte_ring_get_capacity(_waiter_ring); } private: //! Multi-producer, single-consumer ring for requests struct rte_ring* _waiter_ring; }; }}} // namespace uhd::transport::dpdk #endif /*_INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ */