diff options
author | Alex Williams <alex.williams@ni.com> | 2019-11-20 17:44:40 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:34 -0800 |
commit | 911ca16adc9222be11d1cbf134bf547daa16806f (patch) | |
tree | 00041fd25acc0258a3690f8ed48194d63f518382 | |
parent | c730d7ba202286bfce3ee180df413486411e0b72 (diff) | |
download | uhd-911ca16adc9222be11d1cbf134bf547daa16806f.tar.gz uhd-911ca16adc9222be11d1cbf134bf547daa16806f.tar.bz2 uhd-911ca16adc9222be11d1cbf134bf547daa16806f.zip |
lib: Add DPDK service queue
This is a data structure intended for use by the DPDK I/O service.
It uses DPDK's lockless ring in multi-producer, single-consumer mode
to allow clients to submit requests to the DPDK I/O service's worker
thread. Clients can specify a timeout for the requests to be fulfilled.
-rw-r--r-- | host/lib/include/uhdlib/transport/dpdk/service_queue.hpp | 237 | ||||
-rw-r--r-- | host/tests/dpdk_port_test.cpp | 65 |
2 files changed, 302 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp new file mode 100644 index 000000000..7c9917079 --- /dev/null +++ b/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp @@ -0,0 +1,237 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// +#ifndef _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ +#define _INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <condition_variable> +#include <rte_malloc.h> +#include <rte_ring.h> +#include <chrono> +#include <memory> +#include <mutex> + +namespace uhd { namespace transport { namespace dpdk { + +enum wait_type { + //! Wake immediately + WAIT_SIMPLE, + //! Wake upon receiving an RX packet + WAIT_RX, + //! Wake when a TX buffer becomes available + WAIT_TX_BUF, + //! Wake once the new flow/socket is created + WAIT_FLOW_OPEN, + //! Wake once the flow/socket is destroyed + WAIT_FLOW_CLOSE, + //! Number of possible reasons for waiting + WAIT_TYPE_COUNT +}; + +/*! + * Aggregate representing a request for the DPDK I/O service and a corresponding + * entry in the wait queue + * + * This class is managed with explicit reference counting. + */ +struct wait_req +{ + //! The reason we're waiting (and service request associated with it) + enum wait_type reason; + //! The data associated with the service request (unmanaged by wait_req) + void* data; + //! A condition variable for waiting on the request + std::condition_variable cond; + //! The mutex associated with the condition variable + std::mutex mutex; + //! Whether the request was completed + bool complete; + //! An atomic reference counter for managing this request object's memory + rte_atomic32_t refcnt; +}; + +/*! + * Create a wait request + * + * \param t The reason for the wait or the service to be performed + * \param priv_data The data associated with this wait type + */ +inline wait_req* wait_req_alloc(wait_type t, void* priv_data) +{ + wait_req* req = (wait_req*)rte_zmalloc(NULL, sizeof(*req), 0); + if (!req) { + return NULL; + } + req = new (req) wait_req(); + req->reason = t; + req->data = priv_data; + rte_atomic32_set(&req->refcnt, 1); + return req; +} + +/*! + * Release a reference on the wait request. + * Frees resources if the refcnt drops to 0. + * + * \param req The wait request upon which to decrement refcnt + */ +inline void wait_req_put(wait_req* req) +{ + if (rte_atomic32_dec_and_test(&req->refcnt)) { + rte_free(req); + } +} + +/*! + * Get a reference to the wait request. + * Increments the refcnt of the wait request + * + * \param req The wait request upon which to increment refcnt + */ +inline void wait_req_get(wait_req* req) +{ + rte_atomic32_inc(&req->refcnt); +} + + +/*! + * A service queue to funnel requests from requesters to a single servicer. + * + * The DPDK I/O service uses this queue to process threads waiting for various + * reasons, such as creating/destroying a packet flow, receiving a packet, or + * getting a buffer for TX. + */ +class service_queue +{ +public: + /*! + * Create a service queue + * \param depth Must be a power of 2 + * \param lcore_id The DPDK lcore_id associated with this service queue + */ + service_queue(size_t depth, unsigned int lcore_id) + { + std::string name = std::string("servq") + std::to_string(lcore_id); + _waiter_ring = rte_ring_create( + name.c_str(), depth, rte_lcore_to_socket_id(lcore_id), RING_F_SC_DEQ); + if (!_waiter_ring) { + throw uhd::runtime_error("DPDK: Failed to create the service queue"); + } + } + + ~service_queue() + { + rte_ring_free(_waiter_ring); + } + + /*! + * Submit a wait/service request to the service queue + * Negative timeouts indicate to block indefinitely + * + * This is the only function intended for the requester + * + * \param req The wait request to wait on + * \param timeout How long to wait, with negative values indicating indefinitely + * \return 0 for no timeout, -ETIMEDOUT if there was a timeout + */ + int submit(wait_req* req, std::chrono::microseconds timeout) + { + auto timeout_point = std::chrono::steady_clock::now() + timeout; + std::unique_lock<std::mutex> lock(req->mutex); + /* Get a reference here, to be consumed by other thread (handshake) */ + wait_req_get(req); + req->complete = false; + if (rte_ring_enqueue(_waiter_ring, req)) { + wait_req_put(req); + return -ENOBUFS; + } + auto is_complete = [req] { return req->complete; }; + if (timeout < std::chrono::microseconds(0)) { + req->cond.wait(lock, is_complete); + } else { + auto status = req->cond.wait_until(lock, timeout_point, is_complete); + if (!status) { + return -ETIMEDOUT; + } + } + return 0; + } + + /*! + * Pop off the next request from the queue + * + * This should only be called by the servicer + * \return A pointer to the next wait request, else NULL + */ + wait_req* pop() + { + wait_req* req; + if (rte_ring_dequeue(_waiter_ring, (void**)&req)) { + return NULL; + } + return req; + } + + /*! + * Attempt to requeue a request to the service queue + * + * This should only be called by the servicer + * \param A pointer to the wait request to requeue + * \return 0 for success, -ENOBUFS if there was no space in the queue + */ + int requeue(wait_req* req) + { + if (rte_ring_enqueue(_waiter_ring, req)) { + return -ENOBUFS; + } + return 0; + } + + /*! + * Attempt to wake the waiter for this request + * If successful, decrement the reference count on the wait request. + * If the waiter could not be woken up, attempt to requeue the request and + * change its type to WAIT_SIMPLE. + * + * This should only be called by the servicer. + * + * \param req The wait request with the waiter to wake + * \return 0 if successful, -EAGAIN if the waiter was requeued, -ENOBUFS + * if the waiter could not be requeued + */ + int complete(wait_req* req) + { + // Grabbing the mutex only to avoid this sequence: + // A: Enqueue wait request + // B: Pull wait request and satisfy + // B: notify() on the condition variable + // A: wait_until() on the condition variable, possibly indefinitely... + bool stat = req->mutex.try_lock(); + if (!stat) { + if (rte_ring_enqueue(_waiter_ring, req)) { + UHD_LOG_WARNING("DPDK", "Could not lock wait_req mutex or requeue"); + return -ENOBUFS; + } else { + req->reason = WAIT_SIMPLE; + return -EAGAIN; + } + } + req->complete = true; + req->cond.notify_one(); + req->mutex.unlock(); + wait_req_put(req); + return stat; + } + +private: + //! Multi-producer, single-consumer ring for requests + struct rte_ring* _waiter_ring; +}; + +}}} // namespace uhd::transport::dpdk + +#endif /*_INCLUDED_UHDLIB_TRANSPORT_DPDK_SERVICE_QUEUE_HPP_ */ diff --git a/host/tests/dpdk_port_test.cpp b/host/tests/dpdk_port_test.cpp index 65cf045eb..7ef386c52 100644 --- a/host/tests/dpdk_port_test.cpp +++ b/host/tests/dpdk_port_test.cpp @@ -5,10 +5,65 @@ // #include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> #include <boost/program_options.hpp> #include <iostream> +#include <memory> +#include <thread> namespace po = boost::program_options; +namespace dpdk = uhd::transport::dpdk; + +void requester(dpdk::service_queue* queue) +{ + uint32_t count = 0; + std::chrono::seconds block(-1); + auto req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_TYPE_COUNT, &count + ); + std::cout << "Requesting count increment" << std::endl; + queue->submit(req, block); + if (count == 1) { + std::cout << "PASS: Count=1" << std::endl; + } + wait_req_put(req); + + std::cout << "Requesting termination" << std::endl; + req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_FLOW_CLOSE, NULL + ); + queue->submit(req, block); + wait_req_put(req); +} + +void servicer(uhd::transport::dpdk::service_queue* queue) +{ + bool running = true; + while (running) { + auto req = queue->pop(); + if (!req) { + std::this_thread::yield(); + continue; + } + switch (req->reason) { + case dpdk::wait_type::WAIT_TYPE_COUNT: + (*(uint32_t *) req->data)++; + break; + case dpdk::wait_type::WAIT_FLOW_CLOSE: + running = false; + break; + case dpdk::wait_type::WAIT_SIMPLE: + break; + default: + std::cout << "ERROR: Received unexpected service request type" << std::endl; + throw uhd::runtime_error("Unexpected service request type"); + } + if (queue->complete(req) == -ENOBUFS) { + req->reason = dpdk::wait_type::WAIT_SIMPLE; + while (queue->requeue(req) == -ENOBUFS); + } + } +} int main(int argc, char **argv) { @@ -36,6 +91,16 @@ int main(int argc, char **argv) std::cout << "Port 0 MTU: " << port->get_mtu() << std::endl; status = ctx->get_port_link_status(0); std::cout << "Port 0 Link up: " << status << std::endl; + + std::cout << std::endl << "Now testing the service queue..." << std::endl; + // Technically this isn't correct, since the lcore ID would need to be the + // service thread's, but we didn't use a DPDK lcore for this... + auto queue = new uhd::transport::dpdk::service_queue(8, rte_lcore_id()); + std::thread service_thread(servicer, queue); + requester(queue); + service_thread.join(); + std::cout << "PASS: Service thread terminated" << std::endl; + delete queue; ctx.reset(); return 0; } |