aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/libusb1_zero_copy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/libusb1_zero_copy.cpp')
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp398
1 files changed, 239 insertions, 159 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 28bff9709..2d18e1623 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2012 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -18,11 +18,17 @@
#include "libusb1_base.hpp"
#include <uhd/transport/usb_zero_copy.hpp>
#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/exception.hpp>
#include <boost/foreach.hpp>
+#include <boost/format.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
-#include <boost/thread/thread.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
#include <list>
using namespace uhd;
@@ -36,115 +42,128 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
#define LIBUSB_CALL
#endif /*LIBUSB_CALL*/
+//! libusb_handle_events_timeout_completed is only in newer API
+#ifndef HAVE_LIBUSB_HANDLE_EVENTS_TIMEOUT_COMPLETED
+ #define libusb_handle_events_timeout_completed(ctx, tx, completed) \
+ libusb_handle_events_timeout(ctx, tx)
+#endif
+
+//! libusb_error_name is only in newer API
+#ifndef HAVE_LIBUSB_ERROR_NAME
+ #define libusb_error_name(code) \
+ str(boost::format("LIBUSB_ERROR_CODE %d") % code)
+#endif
+
+//! type for sharing the release queue with managed buffers
+class libusb_zero_copy_mb;
+typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb *> > mb_queue_sptr;
+
/*!
- * All libusb callback functions should be marked with the LIBUSB_CALL macro
- * to ensure that they are compiled with the same calling convention as libusb.
+ * The libusb docs state that status and actual length can only be read in the callback.
+ * Therefore, this struct is intended to store data seen from the callback function.
*/
+struct lut_result_t
+{
+ lut_result_t(void)
+ {
+ completed = 0;
+ status = LIBUSB_TRANSFER_COMPLETED;
+ actual_length = 0;
+ }
+ int completed;
+ libusb_transfer_status status;
+ int actual_length;
+ boost::mutex mut;
+ boost::condition_variable usb_transfer_complete;
+};
-//! helper function: handles all async callbacks
-static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){
- *(static_cast<bool *>(lut->user_data)) = true;
-}
+// Created to be used as an argument to boost::condition_variable::timed_wait() function
+struct lut_result_completed {
+ const lut_result_t& _result;
+ lut_result_completed(const lut_result_t& result):_result(result) {}
+ bool operator()() const {return (_result.completed ? true : false);}
+};
/*!
- * Wait for a managed buffer to become complete.
- *
- * This routine processes async events until the transaction completes.
- * We must call the libusb handle events in a loop because the handler
- * may complete managed buffers other than the one we are waiting on.
- *
- * We cannot determine if handle events timed out or processed an event.
- * Therefore, the timeout condition is handled by using boost system time.
- *
- * \param ctx the libusb context structure
- * \param timeout the wait timeout in seconds
- * \param completed a reference to the completed flag
- * \return true for completion, false for timeout
+ * All libusb callback functions should be marked with the LIBUSB_CALL macro
+ * to ensure that they are compiled with the same calling convention as libusb.
*/
-UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){
- //already completed by a previous call?
- if (completed) return true;
-
- //perform a non-blocking event handle
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- libusb_handle_events_timeout(ctx, &tv);
- if (completed) return true;
-
- //finish the rest with a timeout loop
- const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
- while (not completed and (boost::get_system_time() < timeout_time)){
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 10000; /*10ms*/
- libusb_handle_events_timeout(ctx, &tv);
- }
- return completed;
+//! helper function: handles all async callbacks
+static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
+{
+ lut_result_t *r = (lut_result_t *)lut->user_data;
+ boost::lock_guard<boost::mutex> lock(r->mut);
+ r->status = lut->status;
+ r->actual_length = lut->actual_length;
+ r->completed = 1;
+ r->usb_transfer_complete.notify_one(); // wake up thread waiting in wait_for_completion() member function below
}
/***********************************************************************
- * Reusable managed receiver buffer:
+ * Reusable managed buffer:
* - Associated with a particular libusb transfer struct.
* - Submits the transfer to libusb in the release method.
**********************************************************************/
-class libusb_zero_copy_mrb : public managed_recv_buffer{
+class libusb_zero_copy_mb : public managed_buffer
+{
public:
- libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size):
+ libusb_zero_copy_mb(libusb_transfer *lut, const size_t frame_size, boost::function<void(libusb_zero_copy_mb *)> release_cb, const bool is_recv, const std::string &name):
+ _release_cb(release_cb), _is_recv(is_recv), _name(name),
_ctx(libusb::session::get_global_session()->get_context()),
_lut(lut), _frame_size(frame_size) { /* NOP */ }
- void release(void){
- completed = false;
- _lut->length = _frame_size; //always reset length
- UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ void release(void){_release_cb(this);}
+
+ UHD_INLINE void submit(void)
+ {
+ _lut->length = (_is_recv)? _frame_size : size(); //always set length
+ const int ret = libusb_submit_transfer(_lut);
+ if (ret != 0) throw uhd::runtime_error(str(boost::format(
+ "usb %s submit failed: %s") % _name % libusb_error_name(ret)));
}
- sptr get_new(const double timeout, size_t &index){
- if (wait_for_completion(_ctx, timeout, completed)){
- index++;
- return make(this, _lut->buffer, _lut->actual_length);
+ template <typename buffer_type>
+ UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
+ {
+ if (wait_for_completion(timeout))
+ {
+ if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format(
+ "usb %s transfer status: %d") % _name % int(result.status)));
+ result.completed = 0;
+ return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size);
}
- return managed_recv_buffer::sptr();
+ return typename buffer_type::sptr();
}
- bool completed;
+ // This is public because it is accessed from the libusb_zero_copy_single constructor
+ lut_result_t result;
-private:
- libusb_context *_ctx;
- libusb_transfer *_lut;
- const size_t _frame_size;
-};
-
-/***********************************************************************
- * Reusable managed send buffer:
- * - Associated with a particular libusb transfer struct.
- * - Submits the transfer to libusb in the commit method.
- **********************************************************************/
-class libusb_zero_copy_msb : public managed_send_buffer{
-public:
- libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size):
- _ctx(libusb::session::get_global_session()->get_context()),
- _lut(lut), _frame_size(frame_size) { completed = true; }
-
- void release(void){
- completed = false;
- _lut->length = size();
- UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
- }
-
- sptr get_new(const double timeout, size_t &index){
- if (wait_for_completion(_ctx, timeout, completed)){
- index++;
- return make(this, _lut->buffer, _frame_size);
+ /*!
+ * Wait for a managed buffer to become complete.
+ *
+ * \param timeout the wait timeout in seconds. A negative value will wait forever.
+ * \return true for completion, false for timeout
+ */
+ UHD_INLINE bool wait_for_completion(const double timeout)
+ {
+ boost::unique_lock<boost::mutex> lock(result.mut);
+ if (!result.completed) {
+ if (timeout < 0.0) {
+ result.usb_transfer_complete.wait(lock);
+ } else {
+ const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
+ result.usb_transfer_complete.timed_wait(lock, timeout_time, lut_result_completed(result));
+ }
}
- return managed_send_buffer::sptr();
+ return result.completed;
}
- bool completed;
-
private:
+
+ boost::function<void(libusb_zero_copy_mb *)> _release_cb;
+ const bool _is_recv;
+ const std::string _name;
libusb_context *_ctx;
libusb_transfer *_lut;
const size_t _frame_size;
@@ -153,39 +172,33 @@ private:
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
-class libusb_zero_copy_impl : public usb_zero_copy{
+class libusb_zero_copy_single
+{
public:
-
- libusb_zero_copy_impl(
+ libusb_zero_copy_single(
libusb::device_handle::sptr handle,
- const size_t recv_interface,
- const size_t recv_endpoint,
- const size_t send_interface,
- const size_t send_endpoint,
- const device_addr_t &hints
+ const size_t interface, const size_t endpoint,
+ const size_t num_frames, const size_t frame_size
):
_handle(handle),
- _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
- _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),
- _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
- _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _next_recv_buff_index(0),
- _next_send_buff_index(0)
+ _num_frames(num_frames),
+ _frame_size(frame_size),
+ _buffer_pool(buffer_pool::make(_num_frames, _frame_size)),
+ _enqueued(_num_frames), _released(_num_frames)
{
- _handle->claim_interface(recv_interface);
- _handle->claim_interface(send_interface);
+ const bool is_recv = (endpoint & 0x80) != 0;
+ const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f));
+ _handle->claim_interface(interface);
//flush the buffers out of the recv endpoint
//limit the flushing to at most one second
- for (size_t i = 0; i < 100; i++)
+ if (is_recv) for (size_t i = 0; i < 100; i++)
{
unsigned char buff[512];
int transfered = 0;
const int status = libusb_bulk_transfer(
_handle->get(), // dev_handle
- (recv_endpoint & 0x7f) | 0x80, // endpoint
+ endpoint, // endpoint
static_cast<unsigned char *>(buff),
sizeof(buff),
&transfered, //bytes xfered
@@ -194,102 +207,169 @@ public:
if (status == LIBUSB_ERROR_TIMEOUT) break;
}
- //allocate libusb transfer structs and managed receive buffers
- for (size_t i = 0; i < get_num_recv_frames(); i++){
-
+ //allocate libusb transfer structs and managed buffers
+ for (size_t i = 0; i < get_num_frames(); i++)
+ {
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size()));
+ _mb_pool.push_back(boost::make_shared<libusb_zero_copy_mb>(
+ lut, this->get_frame_size(), boost::bind(&libusb_zero_copy_single::enqueue_damn_buffer, this, _1), is_recv, name
+ ));
libusb_fill_bulk_transfer(
lut, // transfer
_handle->get(), // dev_handle
- (recv_endpoint & 0x7f) | 0x80, // endpoint
- static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer
- this->get_recv_frame_size(), // length
+ endpoint, // endpoint
+ static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer
+ this->get_frame_size(), // length
libusb_transfer_cb_fn(&libusb_async_cb), // callback
- static_cast<void *>(&_mrb_pool.back()->completed), // user_data
+ static_cast<void *>(&_mb_pool.back()->result), // user_data
0 // timeout (ms)
);
_all_luts.push_back(lut);
- _mrb_pool.back()->release();
}
- //allocate libusb transfer structs and managed send buffers
- for (size_t i = 0; i < get_num_send_frames(); i++){
-
- libusb_transfer *lut = libusb_alloc_transfer(0);
- UHD_ASSERT_THROW(lut != NULL);
-
- _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size()));
-
- libusb_fill_bulk_transfer(
- lut, // transfer
- _handle->get(), // dev_handle
- (send_endpoint & 0x7f) | 0x00, // endpoint
- static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer
- this->get_send_frame_size(), // length
- libusb_transfer_cb_fn(&libusb_async_cb), // callback
- static_cast<void *>(&_msb_pool.back()->completed), // user_data
- 0 // timeout
- );
-
- _all_luts.push_back(lut);
+ //initial release for all buffers
+ for (size_t i = 0; i < get_num_frames(); i++)
+ {
+ libusb_zero_copy_mb &mb = *(_mb_pool[i]);
+ if (is_recv) mb.release();
+ else
+ {
+ mb.result.completed = 1;
+ _enqueued.push_back(&mb);
+ }
}
}
- ~libusb_zero_copy_impl(void){
- libusb_context *ctx = libusb::session::get_global_session()->get_context();
-
+ ~libusb_zero_copy_single(void)
+ {
//cancel all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts)
+ {
libusb_cancel_transfer(lut);
}
//process all transfers until timeout occurs
- bool completed = false;
- wait_for_completion(ctx, 0.01, completed);
+ BOOST_FOREACH(libusb_zero_copy_mb *mb, _enqueued)
+ {
+ mb->wait_for_completion(0.01);
+ }
//free all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts)
+ {
libusb_free_transfer(lut);
}
-
}
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
- return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
- }
+ template <typename buffer_type>
+ UHD_INLINE typename buffer_type::sptr get_buff(double timeout)
+ {
+ typename buffer_type::sptr buff;
+ libusb_zero_copy_mb *front = NULL;
+ boost::mutex::scoped_lock lock(_mutex);
+ if (_enqueued.empty())
+ {
+ _cond.timed_wait(lock, boost::posix_time::microseconds(long(timeout*1e6)));
+ }
+ if (_enqueued.empty()) return buff;
+ front = _enqueued.front();
- managed_send_buffer::sptr get_send_buff(double timeout){
- if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
- return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
- }
+ lock.unlock();
+ buff = front->get_new<buffer_type>(timeout);
+ lock.lock();
- size_t get_num_recv_frames(void) const { return _num_recv_frames; }
- size_t get_num_send_frames(void) const { return _num_send_frames; }
+ if (buff) _enqueued.pop_front();
+ this->submit_what_we_can();
+ return buff;
+ }
- size_t get_recv_frame_size(void) const { return _recv_frame_size; }
- size_t get_send_frame_size(void) const { return _send_frame_size; }
+ UHD_INLINE size_t get_num_frames(void) const { return _num_frames; }
+ UHD_INLINE size_t get_frame_size(void) const { return _frame_size; }
private:
libusb::device_handle::sptr _handle;
- const size_t _recv_frame_size, _num_recv_frames;
- const size_t _send_frame_size, _num_send_frames;
+ const size_t _num_frames, _frame_size;
//! Storage for transfer related objects
- buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool;
- std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool;
- size_t _next_recv_buff_index, _next_send_buff_index;
+ buffer_pool::sptr _buffer_pool;
+ std::vector<boost::shared_ptr<libusb_zero_copy_mb> > _mb_pool;
+
+ boost::mutex _mutex;
+ boost::condition_variable _cond;
+
+ //! why 2 queues? there is room in the future to have > N buffers but only N in flight
+ boost::circular_buffer<libusb_zero_copy_mb *> _enqueued, _released;
+
+ void enqueue_damn_buffer(libusb_zero_copy_mb *mb)
+ {
+ boost::mutex::scoped_lock l(_mutex);
+ _released.push_back(mb);
+ this->submit_what_we_can();
+ l.unlock();
+ _cond.notify_one();
+ }
+
+ void submit_what_we_can(void)
+ {
+ while (not _released.empty() and not _enqueued.full())
+ {
+ _released.front()->submit();
+ _enqueued.push_back(_released.front());
+ _released.pop_front();
+ }
+ }
//! a list of all transfer structs we allocated
std::list<libusb_transfer *> _all_luts;
+};
+
+/***********************************************************************
+ * USB zero_copy device class
+ **********************************************************************/
+struct libusb_zero_copy_impl : usb_zero_copy
+{
+ libusb_zero_copy_impl(
+ libusb::device_handle::sptr handle,
+ const size_t recv_interface,
+ const size_t recv_endpoint,
+ const size_t send_interface,
+ const size_t send_endpoint,
+ const device_addr_t &hints
+ ){
+ _recv_impl.reset(new libusb_zero_copy_single(
+ handle, recv_interface, (recv_endpoint & 0x7f) | 0x80,
+ size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS)),
+ size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))));
+ _send_impl.reset(new libusb_zero_copy_single(
+ handle, send_interface, (send_endpoint & 0x7f) | 0x00,
+ size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)),
+ size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))));
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout)
+ {
+ boost::mutex::scoped_lock l(_recv_mutex);
+ return _recv_impl->get_buff<managed_recv_buffer>(timeout);
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ boost::mutex::scoped_lock l(_send_mutex);
+ return _send_impl->get_buff<managed_send_buffer>(timeout);
+ }
+
+ size_t get_num_recv_frames(void) const { return _recv_impl->get_num_frames(); }
+ size_t get_num_send_frames(void) const { return _send_impl->get_num_frames(); }
+ size_t get_recv_frame_size(void) const { return _recv_impl->get_frame_size(); }
+ size_t get_send_frame_size(void) const { return _send_impl->get_frame_size(); }
+ boost::shared_ptr<libusb_zero_copy_single> _recv_impl, _send_impl;
+ boost::mutex _recv_mutex, _send_mutex;
};
/***********************************************************************