aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib/transport/dpdk/service_queue.hpp
blob: 85b29fe0a1f3e1139f4da7872e83ef92e604fab7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
//
// Copyright 2019 Ettus Research, a National Instruments brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
#pragma once

#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
#include <condition_variable>
#include <rte_malloc.h>
#include <rte_ring.h>
#include <chrono>
#include <memory>
#include <mutex>

namespace uhd { namespace transport { namespace dpdk {

enum wait_type {
    //! Wake immediately
    WAIT_SIMPLE,
    //! Wake upon receiving an RX packet
    WAIT_RX,
    //! Wake when a TX buffer becomes available
    WAIT_TX_BUF,
    //! Wake once the new flow/socket is created
    WAIT_FLOW_OPEN,
    //! Wake once the flow/socket is destroyed
    WAIT_FLOW_CLOSE,
    //! Wake once the new transport is connected
    WAIT_XPORT_CONNECT,
    //! Wake once the transport is disconnected
    WAIT_XPORT_DISCONNECT,
    //! Wake when MAC address found for IP address
    WAIT_ARP,
    //! Wake once the I/O worker terminates
    WAIT_LCORE_TERM,
    //! Number of possible reasons for waiting
    WAIT_TYPE_COUNT
};

/*!
 * Aggregate representing a request for the DPDK I/O service and a corresponding
 * entry in the wait queue
 *
 * This class is managed with explicit reference counting.
 */
struct wait_req
{
    //! The reason we're waiting (and service request associated with it)
    enum wait_type reason;
    //! The data associated with the service request (unmanaged by wait_req)
    void* data;
    //! A condition variable for waiting on the request
    std::condition_variable cond;
    //! The mutex associated with the condition variable
    std::mutex mutex;
    //! Whether the request was completed
    bool complete;
    //! The status or error code associated with the request
    int retval;
    //! An atomic reference counter for managing this request object's memory
    rte_atomic32_t refcnt;
};

/*!
 * Create a wait request
 *
 * \param t The reason for the wait or the service to be performed
 * \param priv_data The data associated with this wait type
 */
inline wait_req* wait_req_alloc(wait_type t, void* priv_data)
{
    wait_req* req = (wait_req*)rte_zmalloc(NULL, sizeof(*req), 0);
    if (!req) {
        return NULL;
    }
    req         = new (req) wait_req();
    req->reason = t;
    req->data   = priv_data;
    rte_atomic32_set(&req->refcnt, 1);
    return req;
}

/*!
 * Release a reference on the wait request.
 * Frees resources if the refcnt drops to 0.
 *
 * \param req The wait request upon which to decrement refcnt
 */
inline void wait_req_put(wait_req* req)
{
    if (rte_atomic32_dec_and_test(&req->refcnt)) {
        rte_free(req);
    }
}

/*!
 * Get a reference to the wait request.
 * Increments the refcnt of the wait request
 *
 * \param req The wait request upon which to increment refcnt
 */
inline void wait_req_get(wait_req* req)
{
    rte_atomic32_inc(&req->refcnt);
}


/*!
 * A service queue to funnel requests from requesters to a single servicer.
 *
 * The DPDK I/O service uses this queue to process threads waiting for various
 * reasons, such as creating/destroying a packet flow, receiving a packet, or
 * getting a buffer for TX.
 */
class service_queue
{
public:
    /*!
     * Create a service queue
     * \param depth Must be a power of 2. Actual size is less.
     * \param lcore_id The DPDK lcore_id associated with this service queue
     */
    service_queue(size_t depth, unsigned int lcore_id)
    {
        std::string name = std::string("servq") + std::to_string(lcore_id);
        _waiter_ring     = rte_ring_create(
            name.c_str(), depth, rte_lcore_to_socket_id(lcore_id), RING_F_SC_DEQ);
        if (!_waiter_ring) {
            throw uhd::runtime_error("DPDK: Failed to create the service queue");
        }
    }

    ~service_queue()
    {
        rte_ring_free(_waiter_ring);
    }

    /*!
     * Submit a wait/service request to the service queue
     * Negative timeouts indicate to block indefinitely
     *
     * This is the only function intended for the requester
     *
     * \param req The wait request to wait on
     * \param timeout How long to wait, with negative values indicating indefinitely
     * \return 0 for no timeout, -ETIMEDOUT if there was a timeout
     */
    int submit(wait_req* req, std::chrono::microseconds timeout)
    {
        auto timeout_point = std::chrono::steady_clock::now() + timeout;
        std::unique_lock<std::mutex> lock(req->mutex);
        /* Get a reference here, to be consumed by other thread (handshake) */
        wait_req_get(req);
        req->complete = false;
        if (rte_ring_enqueue(_waiter_ring, req)) {
            wait_req_put(req);
            return -ENOBUFS;
        }
        auto is_complete = [req] { return req->complete; };
        if (timeout < std::chrono::microseconds(0)) {
            req->cond.wait(lock, is_complete);
        } else {
            auto status = req->cond.wait_until(lock, timeout_point, is_complete);
            if (!status) {
                return -ETIMEDOUT;
            }
        }
        return 0;
    }

    /*!
     * Pop off the next request from the queue
     *
     * This should only be called by the servicer
     * \return A pointer to the next wait request, else NULL
     */
    wait_req* pop()
    {
        wait_req* req;
        if (rte_ring_dequeue(_waiter_ring, (void**)&req)) {
            return NULL;
        }
        return req;
    }

    /*!
     * Attempt to requeue a request to the service queue
     *
     * This should only be called by the servicer
     * \param A pointer to the wait request to requeue
     * \return 0 for success, -ENOBUFS if there was no space in the queue
     */
    int requeue(wait_req* req)
    {
        if (rte_ring_enqueue(_waiter_ring, req)) {
            return -ENOBUFS;
        }
        return 0;
    }

    /*!
     * Attempt to wake the waiter for this request
     * If successful, decrement the reference count on the wait request.
     * If the waiter could not be woken up, attempt to requeue the request and
     * change its type to WAIT_SIMPLE.
     *
     * This should only be called by the servicer.
     *
     * \param req The wait request with the waiter to wake
     * \return 0 if successful, -EAGAIN if the waiter was requeued, -ENOBUFS
     *         if the waiter could not be requeued
     */
    int complete(wait_req* req)
    {
        // Grabbing the mutex only to avoid this sequence:
        // A: Enqueue wait request
        // B: Pull wait request and satisfy
        // B: notify() on the condition variable
        // A: wait_until() on the condition variable, possibly indefinitely...
        bool stat = req->mutex.try_lock();
        if (!stat) {
            if (rte_ring_enqueue(_waiter_ring, req)) {
                UHD_LOG_WARNING("DPDK", "Could not lock wait_req mutex or requeue");
                return -ENOBUFS;
            } else {
                req->reason = WAIT_SIMPLE;
                return -EAGAIN;
            }
        }
        req->complete = true;
        req->cond.notify_one();
        req->mutex.unlock();
        wait_req_put(req);
        return stat;
    }

    /*!
     * Get the size of the service queue
     * \return size of the service queue
     */
    inline size_t get_size()
    {
        return rte_ring_get_size(_waiter_ring);
    }

    /*!
     * Get the capacity of the service queue
     * \return capacity of the service queue
     */
    inline size_t get_capacity()
    {
        return rte_ring_get_capacity(_waiter_ring);
    }

private:
    //! Multi-producer, single-consumer ring for requests
    struct rte_ring* _waiter_ring;
};

}}} // namespace uhd::transport::dpdk