aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/libusb1_zero_copy.cpp
blob: 8cf76989bbf5d29a47d338b315c08ead0fb42de5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
//
// Copyright 2010-2013 Ettus Research LLC
// Copyright 2018 Ettus Research, a National Instruments Company
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include "libusb1_base.hpp"
#include <uhd/exception.hpp>
#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/transport/usb_zero_copy.hpp>
#include <uhd/utils/log.hpp>
#include <boost/circular_buffer.hpp>
#include <boost/format.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <functional>
#include <list>
#include <memory>

using namespace uhd;
using namespace uhd::transport;

static const size_t DEFAULT_NUM_XFERS = 16; // num xfers
static const size_t DEFAULT_XFER_SIZE = 32 * 512; // bytes

//! type for sharing the release queue with managed buffers
class libusb_zero_copy_mb;
typedef std::shared_ptr<bounded_buffer<libusb_zero_copy_mb*>> mb_queue_sptr;

/*!
 * The libusb docs state that status and actual length can only be read in the callback.
 * Therefore, this struct is intended to store data seen from the callback function.
 */
struct lut_result_t
{
    lut_result_t(void)
    {
        completed     = 0;
        status        = LIBUSB_TRANSFER_COMPLETED;
        actual_length = 0;
#ifdef UHD_TXRX_DEBUG_PRINTS
        start_time = 0;
        buff_num   = -1;
#endif
    }
    int completed;
    libusb_transfer_status status;
    int actual_length;
    boost::mutex mut;
    boost::condition_variable usb_transfer_complete;

#ifdef UHD_TXRX_DEBUG_PRINTS
    // These are fore debugging
    long start_time;
    int buff_num;
    bool is_recv;
#endif
};

// Created to be used as an argument to boost::condition_variable::timed_wait() function
struct lut_result_completed
{
    const lut_result_t& _result;
    lut_result_completed(const lut_result_t& result) : _result(result) {}
    bool operator()() const
    {
        return (_result.completed ? true : false);
    }
};

#ifdef UHD_TXRX_DEBUG_PRINTS
static std::string dbg_prefix("libusb1_zero_copy,");
static void libusb1_zerocopy_dbg_print_err(std::string msg)
{
    msg = dbg_prefix + msg;
    fprintf(stderr, "%s\n", msg.c_str());
}
#endif

/*!
 * All libusb callback functions should be marked with the LIBUSB_CALL macro
 * to ensure that they are compiled with the same calling convention as libusb.
 */

//! helper function: handles all async callbacks
static void LIBUSB_CALL libusb_async_cb(libusb_transfer* lut)
{
    lut_result_t* r = (lut_result_t*)lut->user_data;
    boost::lock_guard<boost::mutex> lock(r->mut);
    r->status        = lut->status;
    r->actual_length = lut->actual_length;
    r->completed     = 1;
    r->usb_transfer_complete.notify_one(); // wake up thread waiting in
                                           // wait_for_completion() member function below
#ifdef UHD_TXRX_DEBUG_PRINTS
    long end_time = boost::get_system_time().time_of_day().total_microseconds();
    libusb1_zerocopy_dbg_print_err(
        (boost::format("libusb_async_cb,%s,%i,%i,%i,%ld,%ld") % (r->is_recv ? "rx" : "tx")
            % r->buff_num % r->actual_length % r->status % end_time % r->start_time)
            .str());
#endif
}

/***********************************************************************
 * Reusable managed buffer:
 *  - Associated with a particular libusb transfer struct.
 *  - Submits the transfer to libusb in the release method.
 **********************************************************************/
class libusb_zero_copy_mb : public managed_buffer
{
public:
    libusb_zero_copy_mb(libusb_transfer* lut,
        const size_t frame_size,
        std::function<void(libusb_zero_copy_mb*)> release_cb,
        const bool is_recv,
        const std::string& name)
        : _release_cb(release_cb)
        , _is_recv(is_recv)
        , _name(name)
        , _ctx(libusb::session::get_global_session()->get_context())
        , _lut(lut)
        , _frame_size(frame_size)
    { /* NOP */
    }

    ~libusb_zero_copy_mb(void) override;

    void release(void) override
    {
        _release_cb(this);
    }

    UHD_INLINE void submit(void)
    {
        _lut->length = int((_is_recv) ? _frame_size : size()); // always set length
#ifdef UHD_TXRX_DEBUG_PRINTS
        result.start_time = boost::get_system_time().time_of_day().total_microseconds();
        result.buff_num   = num();
        result.is_recv    = _is_recv;
#endif
        int ret = libusb_submit_transfer(_lut);
        if (ret != LIBUSB_SUCCESS)
            throw uhd::usb_error(ret,
                str(boost::format("usb %s submit failed: %s") % _name
                    % libusb_error_name(ret)));
    }

    template <typename buffer_type>
    UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
    {
        if (wait_for_completion(timeout)) {
            if (result.status != LIBUSB_TRANSFER_COMPLETED
                && result.status != LIBUSB_TRANSFER_CANCELLED)
                throw uhd::io_error(str(boost::format("usb %s transfer status: %d")
                                        % _name % libusb_error_name(result.status)));
            result.completed = 0;
            return make(reinterpret_cast<buffer_type*>(this),
                _lut->buffer,
                (_is_recv) ? size_t(result.actual_length) : _frame_size);
        }
        return typename buffer_type::sptr();
    }

    // This is public because it is accessed from the libusb_zero_copy_single constructor
    lut_result_t result;

    /*!
     * Wait for a managed buffer to become complete.
     *
     * \param timeout the wait timeout in seconds.  A negative value will wait forever.
     * \return true for completion, false for timeout
     */
    UHD_INLINE bool wait_for_completion(const double timeout)
    {
        boost::unique_lock<boost::mutex> lock(result.mut);
        if (!result.completed) {
            if (timeout < 0.0) {
                result.usb_transfer_complete.wait(lock);
            } else {
                const boost::system_time timeout_time =
                    boost::get_system_time()
                    + boost::posix_time::microseconds(long(timeout * 1000000));
                result.usb_transfer_complete.timed_wait(
                    lock, timeout_time, lut_result_completed(result));
            }
        }
        return (result.completed > 0);
    }

private:
    std::function<void(libusb_zero_copy_mb*)> _release_cb;
    const bool _is_recv;
    const std::string _name;
    libusb_context* _ctx;
    libusb_transfer* _lut;
    const size_t _frame_size;
};

libusb_zero_copy_mb::~libusb_zero_copy_mb(void)
{
    /* NOP */
}

/***********************************************************************
 * USB zero_copy device class
 **********************************************************************/
class libusb_zero_copy_single
{
public:
    libusb_zero_copy_single(libusb::device_handle::sptr handle,
        const int interface,
        const unsigned char endpoint,
        const size_t num_frames,
        const size_t frame_size)
        : _handle(handle)
        , _num_frames(num_frames)
        , _frame_size(frame_size)
        , _buffer_pool(buffer_pool::make(_num_frames, _frame_size))
        , _enqueued(_num_frames)
        , _released(_num_frames)
        , _status(STATUS_RUNNING)
    {
        const bool is_recv = (endpoint & 0x80) != 0;
        const std::string name =
            str(boost::format("%s%d") % ((is_recv) ? "rx" : "tx") % int(endpoint & 0x7f));
        _handle->claim_interface(interface);

        // flush the buffers out of the recv endpoint
        // limit the flushing to at most one second
        if (is_recv)
            for (size_t i = 0; i < 100; i++) {
                unsigned char buff[512];
                int transfered   = 0;
                const int status = libusb_bulk_transfer(_handle->get(), // dev_handle
                    endpoint, // endpoint
                    static_cast<unsigned char*>(buff),
                    int(sizeof(buff)),
                    &transfered, // bytes xfered
                    10 // timeout ms
                );
                if (status == LIBUSB_ERROR_TIMEOUT)
                    break;
            }

        // allocate libusb transfer structs and managed buffers
        for (size_t i = 0; i < get_num_frames(); i++) {
            libusb_transfer* lut = libusb_alloc_transfer(0);
            UHD_ASSERT_THROW(lut != NULL);

            _mb_pool.push_back(std::make_shared<libusb_zero_copy_mb>(lut,
                this->get_frame_size(),
                std::bind(&libusb_zero_copy_single::enqueue_buffer,
                    this,
                    std::placeholders::_1),
                is_recv,
                name));

            libusb_fill_bulk_transfer(lut, // transfer
                _handle->get(), // dev_handle
                endpoint, // endpoint
                static_cast<unsigned char*>(_buffer_pool->at(i)), // buffer
                int(this->get_frame_size()), // length
                libusb_transfer_cb_fn(&libusb_async_cb), // callback
                static_cast<void*>(&_mb_pool.back()->result), // user_data
                0 // timeout (ms)
            );

            _all_luts.push_back(lut);
        }

        // initial release for all buffers
        for (size_t i = 0; i < get_num_frames(); i++) {
            libusb_zero_copy_mb& mb = *(_mb_pool[i]);
            if (is_recv)
                mb.release();
            else {
                mb.result.completed = 1;
                _enqueued.push_back(&mb);
            }
        }
    }

    ~libusb_zero_copy_single(void)
    {
        // cancel all transfers
        for (libusb_transfer* lut : _all_luts) {
            libusb_cancel_transfer(lut);
        }

        // process all transfers until timeout occurs
        for (libusb_zero_copy_mb* mb : _enqueued) {
            mb->wait_for_completion(0.01);
        }

        // free all transfers
        for (libusb_transfer* lut : _all_luts) {
            libusb_free_transfer(lut);
        }
    }

    template <typename buffer_type>
    UHD_INLINE typename buffer_type::sptr get_buff(double timeout)
    {
        typename buffer_type::sptr buff;

        if (_status == STATUS_ERROR)
            return buff;

        // Serialize access to buffers
        boost::mutex::scoped_lock get_buff_lock(_get_buff_mutex);

        boost::mutex::scoped_lock queue_lock(_queue_mutex);
        if (_enqueued.empty()) {
            _buff_ready_cond.timed_wait(
                queue_lock, boost::posix_time::microseconds(long(timeout * 1e6)));
        }
        if (_enqueued.empty())
            return buff;
        libusb_zero_copy_mb* front = _enqueued.front();

        queue_lock.unlock();
        buff = front->get_new<buffer_type>(timeout);
        queue_lock.lock();

        if (buff)
            _enqueued.pop_front();
        this->submit_what_we_can();
        return buff;
    }

    UHD_INLINE size_t get_num_frames(void) const
    {
        return _num_frames;
    }
    UHD_INLINE size_t get_frame_size(void) const
    {
        return _frame_size;
    }

private:
    libusb::device_handle::sptr _handle;
    const size_t _num_frames, _frame_size;

    //! Storage for transfer related objects
    buffer_pool::sptr _buffer_pool;
    std::vector<std::shared_ptr<libusb_zero_copy_mb>> _mb_pool;

    boost::mutex _queue_mutex;
    boost::condition_variable _buff_ready_cond;
    boost::mutex _get_buff_mutex;

    //! why 2 queues? there is room in the future to have > N buffers but only N in flight
    boost::circular_buffer<libusb_zero_copy_mb*> _enqueued, _released;

    enum { STATUS_RUNNING, STATUS_ERROR } _status;

    void enqueue_buffer(libusb_zero_copy_mb* mb)
    {
        boost::mutex::scoped_lock l(_queue_mutex);
        _released.push_back(mb);
        this->submit_what_we_can();
        _buff_ready_cond.notify_one();
    }

    void submit_what_we_can(void)
    {
        if (_status == STATUS_ERROR)
            return;
        while (not _released.empty() and not _enqueued.full()) {
            try {
                _released.front()->submit();
                _enqueued.push_back(_released.front());
                _released.pop_front();
            } catch (uhd::usb_error& e) {
                _status = STATUS_ERROR;
                throw e;
            }
        }
    }

    //! a list of all transfer structs we allocated
    std::list<libusb_transfer*> _all_luts;
};

/***********************************************************************
 * USB zero_copy device class
 **********************************************************************/
struct libusb_zero_copy_impl : usb_zero_copy
{
    libusb_zero_copy_impl(libusb::device_handle::sptr handle,
        const int recv_interface,
        const unsigned char recv_endpoint,
        const int send_interface,
        const unsigned char send_endpoint,
        const device_addr_t& hints)
    {
        _recv_impl.reset(new libusb_zero_copy_single(handle,
            recv_interface,
            (recv_endpoint & 0x7f) | 0x80,
            size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS)),
            size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))));
        _send_impl.reset(new libusb_zero_copy_single(handle,
            send_interface,
            (send_endpoint & 0x7f) | 0x00,
            size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)),
            size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))));
    }

    ~libusb_zero_copy_impl(void) override;

    managed_recv_buffer::sptr get_recv_buff(double timeout) override
    {
        boost::mutex::scoped_lock l(_recv_mutex);
        return _recv_impl->get_buff<managed_recv_buffer>(timeout);
    }

    managed_send_buffer::sptr get_send_buff(double timeout) override
    {
        boost::mutex::scoped_lock l(_send_mutex);
        return _send_impl->get_buff<managed_send_buffer>(timeout);
    }

    size_t get_num_recv_frames(void) const override
    {
        return _recv_impl->get_num_frames();
    }
    size_t get_num_send_frames(void) const override
    {
        return _send_impl->get_num_frames();
    }

    size_t get_recv_frame_size(void) const override
    {
        return _recv_impl->get_frame_size();
    }
    size_t get_send_frame_size(void) const override
    {
        return _send_impl->get_frame_size();
    }

    std::shared_ptr<libusb_zero_copy_single> _recv_impl, _send_impl;
    boost::mutex _recv_mutex, _send_mutex;
};

libusb_zero_copy_impl::~libusb_zero_copy_impl(void)
{
    /* NOP */
}

/***********************************************************************
 * USB zero_copy destructor
 **********************************************************************/
usb_zero_copy::~usb_zero_copy(void)
{
    /* NOP */
}

/***********************************************************************
 * USB zero_copy make functions
 **********************************************************************/
usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle,
    const int recv_interface,
    const unsigned char recv_endpoint,
    const int send_interface,
    const unsigned char send_endpoint,
    const device_addr_t& hints)
{
    libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(
        std::static_pointer_cast<libusb::special_handle>(handle)->get_device()));
    return sptr(new libusb_zero_copy_impl(
        dev_handle, recv_interface, recv_endpoint, send_interface, send_endpoint, hints));
}