diff options
Diffstat (limited to 'host/lib/transport/libusb1_zero_copy.cpp')
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 398 |
1 files changed, 239 insertions, 159 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 28bff9709..2d18e1623 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010-2012 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -18,11 +18,17 @@ #include "libusb1_base.hpp" #include <uhd/transport/usb_zero_copy.hpp> #include <uhd/transport/buffer_pool.hpp> +#include <uhd/transport/bounded_buffer.hpp> #include <uhd/utils/msg.hpp> #include <uhd/exception.hpp> #include <boost/foreach.hpp> +#include <boost/format.hpp> +#include <boost/function.hpp> +#include <boost/bind.hpp> #include <boost/make_shared.hpp> -#include <boost/thread/thread.hpp> +#include <boost/circular_buffer.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> #include <list> using namespace uhd; @@ -36,115 +42,128 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes #define LIBUSB_CALL #endif /*LIBUSB_CALL*/ +//! libusb_handle_events_timeout_completed is only in newer API +#ifndef HAVE_LIBUSB_HANDLE_EVENTS_TIMEOUT_COMPLETED + #define libusb_handle_events_timeout_completed(ctx, tx, completed) \ + libusb_handle_events_timeout(ctx, tx) +#endif + +//! libusb_error_name is only in newer API +#ifndef HAVE_LIBUSB_ERROR_NAME + #define libusb_error_name(code) \ + str(boost::format("LIBUSB_ERROR_CODE %d") % code) +#endif + +//! type for sharing the release queue with managed buffers +class libusb_zero_copy_mb; +typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb *> > mb_queue_sptr; + /*! - * All libusb callback functions should be marked with the LIBUSB_CALL macro - * to ensure that they are compiled with the same calling convention as libusb. + * The libusb docs state that status and actual length can only be read in the callback. + * Therefore, this struct is intended to store data seen from the callback function. */ +struct lut_result_t +{ + lut_result_t(void) + { + completed = 0; + status = LIBUSB_TRANSFER_COMPLETED; + actual_length = 0; + } + int completed; + libusb_transfer_status status; + int actual_length; + boost::mutex mut; + boost::condition_variable usb_transfer_complete; +}; -//! helper function: handles all async callbacks -static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ - *(static_cast<bool *>(lut->user_data)) = true; -} +// Created to be used as an argument to boost::condition_variable::timed_wait() function +struct lut_result_completed { + const lut_result_t& _result; + lut_result_completed(const lut_result_t& result):_result(result) {} + bool operator()() const {return (_result.completed ? true : false);} +}; /*! - * Wait for a managed buffer to become complete. - * - * This routine processes async events until the transaction completes. - * We must call the libusb handle events in a loop because the handler - * may complete managed buffers other than the one we are waiting on. - * - * We cannot determine if handle events timed out or processed an event. - * Therefore, the timeout condition is handled by using boost system time. - * - * \param ctx the libusb context structure - * \param timeout the wait timeout in seconds - * \param completed a reference to the completed flag - * \return true for completion, false for timeout + * All libusb callback functions should be marked with the LIBUSB_CALL macro + * to ensure that they are compiled with the same calling convention as libusb. */ -UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){ - //already completed by a previous call? - if (completed) return true; - - //perform a non-blocking event handle - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 0; - libusb_handle_events_timeout(ctx, &tv); - if (completed) return true; - - //finish the rest with a timeout loop - const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); - while (not completed and (boost::get_system_time() < timeout_time)){ - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 10000; /*10ms*/ - libusb_handle_events_timeout(ctx, &tv); - } - return completed; +//! helper function: handles all async callbacks +static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut) +{ + lut_result_t *r = (lut_result_t *)lut->user_data; + boost::lock_guard<boost::mutex> lock(r->mut); + r->status = lut->status; + r->actual_length = lut->actual_length; + r->completed = 1; + r->usb_transfer_complete.notify_one(); // wake up thread waiting in wait_for_completion() member function below } /*********************************************************************** - * Reusable managed receiver buffer: + * Reusable managed buffer: * - Associated with a particular libusb transfer struct. * - Submits the transfer to libusb in the release method. **********************************************************************/ -class libusb_zero_copy_mrb : public managed_recv_buffer{ +class libusb_zero_copy_mb : public managed_buffer +{ public: - libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size): + libusb_zero_copy_mb(libusb_transfer *lut, const size_t frame_size, boost::function<void(libusb_zero_copy_mb *)> release_cb, const bool is_recv, const std::string &name): + _release_cb(release_cb), _is_recv(is_recv), _name(name), _ctx(libusb::session::get_global_session()->get_context()), _lut(lut), _frame_size(frame_size) { /* NOP */ } - void release(void){ - completed = false; - _lut->length = _frame_size; //always reset length - UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); + void release(void){_release_cb(this);} + + UHD_INLINE void submit(void) + { + _lut->length = (_is_recv)? _frame_size : size(); //always set length + const int ret = libusb_submit_transfer(_lut); + if (ret != 0) throw uhd::runtime_error(str(boost::format( + "usb %s submit failed: %s") % _name % libusb_error_name(ret))); } - sptr get_new(const double timeout, size_t &index){ - if (wait_for_completion(_ctx, timeout, completed)){ - index++; - return make(this, _lut->buffer, _lut->actual_length); + template <typename buffer_type> + UHD_INLINE typename buffer_type::sptr get_new(const double timeout) + { + if (wait_for_completion(timeout)) + { + if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format( + "usb %s transfer status: %d") % _name % int(result.status))); + result.completed = 0; + return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size); } - return managed_recv_buffer::sptr(); + return typename buffer_type::sptr(); } - bool completed; + // This is public because it is accessed from the libusb_zero_copy_single constructor + lut_result_t result; -private: - libusb_context *_ctx; - libusb_transfer *_lut; - const size_t _frame_size; -}; - -/*********************************************************************** - * Reusable managed send buffer: - * - Associated with a particular libusb transfer struct. - * - Submits the transfer to libusb in the commit method. - **********************************************************************/ -class libusb_zero_copy_msb : public managed_send_buffer{ -public: - libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size): - _ctx(libusb::session::get_global_session()->get_context()), - _lut(lut), _frame_size(frame_size) { completed = true; } - - void release(void){ - completed = false; - _lut->length = size(); - UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); - } - - sptr get_new(const double timeout, size_t &index){ - if (wait_for_completion(_ctx, timeout, completed)){ - index++; - return make(this, _lut->buffer, _frame_size); + /*! + * Wait for a managed buffer to become complete. + * + * \param timeout the wait timeout in seconds. A negative value will wait forever. + * \return true for completion, false for timeout + */ + UHD_INLINE bool wait_for_completion(const double timeout) + { + boost::unique_lock<boost::mutex> lock(result.mut); + if (!result.completed) { + if (timeout < 0.0) { + result.usb_transfer_complete.wait(lock); + } else { + const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); + result.usb_transfer_complete.timed_wait(lock, timeout_time, lut_result_completed(result)); + } } - return managed_send_buffer::sptr(); + return result.completed; } - bool completed; - private: + + boost::function<void(libusb_zero_copy_mb *)> _release_cb; + const bool _is_recv; + const std::string _name; libusb_context *_ctx; libusb_transfer *_lut; const size_t _frame_size; @@ -153,39 +172,33 @@ private: /*********************************************************************** * USB zero_copy device class **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy{ +class libusb_zero_copy_single +{ public: - - libusb_zero_copy_impl( + libusb_zero_copy_single( libusb::device_handle::sptr handle, - const size_t recv_interface, - const size_t recv_endpoint, - const size_t send_interface, - const size_t send_endpoint, - const device_addr_t &hints + const size_t interface, const size_t endpoint, + const size_t num_frames, const size_t frame_size ): _handle(handle), - _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), - _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), - _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), - _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))), - _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), - _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _next_recv_buff_index(0), - _next_send_buff_index(0) + _num_frames(num_frames), + _frame_size(frame_size), + _buffer_pool(buffer_pool::make(_num_frames, _frame_size)), + _enqueued(_num_frames), _released(_num_frames) { - _handle->claim_interface(recv_interface); - _handle->claim_interface(send_interface); + const bool is_recv = (endpoint & 0x80) != 0; + const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f)); + _handle->claim_interface(interface); //flush the buffers out of the recv endpoint //limit the flushing to at most one second - for (size_t i = 0; i < 100; i++) + if (is_recv) for (size_t i = 0; i < 100; i++) { unsigned char buff[512]; int transfered = 0; const int status = libusb_bulk_transfer( _handle->get(), // dev_handle - (recv_endpoint & 0x7f) | 0x80, // endpoint + endpoint, // endpoint static_cast<unsigned char *>(buff), sizeof(buff), &transfered, //bytes xfered @@ -194,102 +207,169 @@ public: if (status == LIBUSB_ERROR_TIMEOUT) break; } - //allocate libusb transfer structs and managed receive buffers - for (size_t i = 0; i < get_num_recv_frames(); i++){ - + //allocate libusb transfer structs and managed buffers + for (size_t i = 0; i < get_num_frames(); i++) + { libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); - _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size())); + _mb_pool.push_back(boost::make_shared<libusb_zero_copy_mb>( + lut, this->get_frame_size(), boost::bind(&libusb_zero_copy_single::enqueue_damn_buffer, this, _1), is_recv, name + )); libusb_fill_bulk_transfer( lut, // transfer _handle->get(), // dev_handle - (recv_endpoint & 0x7f) | 0x80, // endpoint - static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer - this->get_recv_frame_size(), // length + endpoint, // endpoint + static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer + this->get_frame_size(), // length libusb_transfer_cb_fn(&libusb_async_cb), // callback - static_cast<void *>(&_mrb_pool.back()->completed), // user_data + static_cast<void *>(&_mb_pool.back()->result), // user_data 0 // timeout (ms) ); _all_luts.push_back(lut); - _mrb_pool.back()->release(); } - //allocate libusb transfer structs and managed send buffers - for (size_t i = 0; i < get_num_send_frames(); i++){ - - libusb_transfer *lut = libusb_alloc_transfer(0); - UHD_ASSERT_THROW(lut != NULL); - - _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size())); - - libusb_fill_bulk_transfer( - lut, // transfer - _handle->get(), // dev_handle - (send_endpoint & 0x7f) | 0x00, // endpoint - static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer - this->get_send_frame_size(), // length - libusb_transfer_cb_fn(&libusb_async_cb), // callback - static_cast<void *>(&_msb_pool.back()->completed), // user_data - 0 // timeout - ); - - _all_luts.push_back(lut); + //initial release for all buffers + for (size_t i = 0; i < get_num_frames(); i++) + { + libusb_zero_copy_mb &mb = *(_mb_pool[i]); + if (is_recv) mb.release(); + else + { + mb.result.completed = 1; + _enqueued.push_back(&mb); + } } } - ~libusb_zero_copy_impl(void){ - libusb_context *ctx = libusb::session::get_global_session()->get_context(); - + ~libusb_zero_copy_single(void) + { //cancel all transfers - BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + BOOST_FOREACH(libusb_transfer *lut, _all_luts) + { libusb_cancel_transfer(lut); } //process all transfers until timeout occurs - bool completed = false; - wait_for_completion(ctx, 0.01, completed); + BOOST_FOREACH(libusb_zero_copy_mb *mb, _enqueued) + { + mb->wait_for_completion(0.01); + } //free all transfers - BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + BOOST_FOREACH(libusb_transfer *lut, _all_luts) + { libusb_free_transfer(lut); } - } - managed_recv_buffer::sptr get_recv_buff(double timeout){ - if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; - return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); - } + template <typename buffer_type> + UHD_INLINE typename buffer_type::sptr get_buff(double timeout) + { + typename buffer_type::sptr buff; + libusb_zero_copy_mb *front = NULL; + boost::mutex::scoped_lock lock(_mutex); + if (_enqueued.empty()) + { + _cond.timed_wait(lock, boost::posix_time::microseconds(long(timeout*1e6))); + } + if (_enqueued.empty()) return buff; + front = _enqueued.front(); - managed_send_buffer::sptr get_send_buff(double timeout){ - if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; - return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); - } + lock.unlock(); + buff = front->get_new<buffer_type>(timeout); + lock.lock(); - size_t get_num_recv_frames(void) const { return _num_recv_frames; } - size_t get_num_send_frames(void) const { return _num_send_frames; } + if (buff) _enqueued.pop_front(); + this->submit_what_we_can(); + return buff; + } - size_t get_recv_frame_size(void) const { return _recv_frame_size; } - size_t get_send_frame_size(void) const { return _send_frame_size; } + UHD_INLINE size_t get_num_frames(void) const { return _num_frames; } + UHD_INLINE size_t get_frame_size(void) const { return _frame_size; } private: libusb::device_handle::sptr _handle; - const size_t _recv_frame_size, _num_recv_frames; - const size_t _send_frame_size, _num_send_frames; + const size_t _num_frames, _frame_size; //! Storage for transfer related objects - buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool; - std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool; - size_t _next_recv_buff_index, _next_send_buff_index; + buffer_pool::sptr _buffer_pool; + std::vector<boost::shared_ptr<libusb_zero_copy_mb> > _mb_pool; + + boost::mutex _mutex; + boost::condition_variable _cond; + + //! why 2 queues? there is room in the future to have > N buffers but only N in flight + boost::circular_buffer<libusb_zero_copy_mb *> _enqueued, _released; + + void enqueue_damn_buffer(libusb_zero_copy_mb *mb) + { + boost::mutex::scoped_lock l(_mutex); + _released.push_back(mb); + this->submit_what_we_can(); + l.unlock(); + _cond.notify_one(); + } + + void submit_what_we_can(void) + { + while (not _released.empty() and not _enqueued.full()) + { + _released.front()->submit(); + _enqueued.push_back(_released.front()); + _released.pop_front(); + } + } //! a list of all transfer structs we allocated std::list<libusb_transfer *> _all_luts; +}; + +/*********************************************************************** + * USB zero_copy device class + **********************************************************************/ +struct libusb_zero_copy_impl : usb_zero_copy +{ + libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + const size_t recv_interface, + const size_t recv_endpoint, + const size_t send_interface, + const size_t send_endpoint, + const device_addr_t &hints + ){ + _recv_impl.reset(new libusb_zero_copy_single( + handle, recv_interface, (recv_endpoint & 0x7f) | 0x80, + size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS)), + size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE)))); + _send_impl.reset(new libusb_zero_copy_single( + handle, send_interface, (send_endpoint & 0x7f) | 0x00, + size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)), + size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE)))); + } + + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + boost::mutex::scoped_lock l(_recv_mutex); + return _recv_impl->get_buff<managed_recv_buffer>(timeout); + } + + managed_send_buffer::sptr get_send_buff(double timeout) + { + boost::mutex::scoped_lock l(_send_mutex); + return _send_impl->get_buff<managed_send_buffer>(timeout); + } + + size_t get_num_recv_frames(void) const { return _recv_impl->get_num_frames(); } + size_t get_num_send_frames(void) const { return _send_impl->get_num_frames(); } + size_t get_recv_frame_size(void) const { return _recv_impl->get_frame_size(); } + size_t get_send_frame_size(void) const { return _send_impl->get_frame_size(); } + boost::shared_ptr<libusb_zero_copy_single> _recv_impl, _send_impl; + boost::mutex _recv_mutex, _send_mutex; }; /*********************************************************************** |