aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/libusb1_zero_copy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/libusb1_zero_copy.cpp')
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp338
1 files changed, 189 insertions, 149 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 9e91ae2fc..5b03aa86f 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -6,35 +6,35 @@
//
#include "libusb1_base.hpp"
-#include <uhd/transport/usb_zero_copy.hpp>
-#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/exception.hpp>
#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/transport/usb_zero_copy.hpp>
#include <uhd/utils/log.hpp>
-#include <uhd/exception.hpp>
+#include <boost/bind.hpp>
+#include <boost/circular_buffer.hpp>
#include <boost/format.hpp>
#include <boost/function.hpp>
-#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
-#include <boost/circular_buffer.hpp>
-#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
#include <list>
#ifdef UHD_TXRX_DEBUG_PRINTS
-#include <vector>
-#include <fstream>
-#include <boost/format.hpp>
+# include <boost/format.hpp>
+# include <fstream>
+# include <vector>
#endif
using namespace uhd;
using namespace uhd::transport;
-static const size_t DEFAULT_NUM_XFERS = 16; //num xfers
-static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
+static const size_t DEFAULT_NUM_XFERS = 16; // num xfers
+static const size_t DEFAULT_XFER_SIZE = 32 * 512; // bytes
//! type for sharing the release queue with managed buffers
class libusb_zero_copy_mb;
-typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb *> > mb_queue_sptr;
+typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb*>> mb_queue_sptr;
/*!
* The libusb docs state that status and actual length can only be read in the callback.
@@ -44,12 +44,12 @@ struct lut_result_t
{
lut_result_t(void)
{
- completed = 0;
- status = LIBUSB_TRANSFER_COMPLETED;
+ completed = 0;
+ status = LIBUSB_TRANSFER_COMPLETED;
actual_length = 0;
#ifdef UHD_TXRX_DEBUG_PRINTS
start_time = 0;
- buff_num = -1;
+ buff_num = -1;
#endif
}
int completed;
@@ -67,17 +67,22 @@ struct lut_result_t
};
// Created to be used as an argument to boost::condition_variable::timed_wait() function
-struct lut_result_completed {
+struct lut_result_completed
+{
const lut_result_t& _result;
- lut_result_completed(const lut_result_t& result):_result(result) {}
- bool operator()() const {return (_result.completed ? true : false);}
+ lut_result_completed(const lut_result_t& result) : _result(result) {}
+ bool operator()() const
+ {
+ return (_result.completed ? true : false);
+ }
};
#ifdef UHD_TXRX_DEBUG_PRINTS
static std::string dbg_prefix("libusb1_zero_copy,");
-static void libusb1_zerocopy_dbg_print_err(std::string msg){
- msg = dbg_prefix + msg;
- fprintf(stderr, "%s\n", msg.c_str());
+static void libusb1_zerocopy_dbg_print_err(std::string msg)
+{
+ msg = dbg_prefix + msg;
+ fprintf(stderr, "%s\n", msg.c_str());
}
#endif
@@ -87,17 +92,21 @@ static void libusb1_zerocopy_dbg_print_err(std::string msg){
*/
//! helper function: handles all async callbacks
-static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
+static void LIBUSB_CALL libusb_async_cb(libusb_transfer* lut)
{
- lut_result_t *r = (lut_result_t *)lut->user_data;
+ lut_result_t* r = (lut_result_t*)lut->user_data;
boost::lock_guard<boost::mutex> lock(r->mut);
- r->status = lut->status;
+ r->status = lut->status;
r->actual_length = lut->actual_length;
- r->completed = 1;
- r->usb_transfer_complete.notify_one(); // wake up thread waiting in wait_for_completion() member function below
+ r->completed = 1;
+ r->usb_transfer_complete.notify_one(); // wake up thread waiting in
+ // wait_for_completion() member function below
#ifdef UHD_TXRX_DEBUG_PRINTS
long end_time = boost::get_system_time().time_of_day().total_microseconds();
- libusb1_zerocopy_dbg_print_err( (boost::format("libusb_async_cb,%s,%i,%i,%i,%ld,%ld") % (r->is_recv ? "rx":"tx") % r->buff_num % r->actual_length % r->status % end_time % r->start_time).str() );
+ libusb1_zerocopy_dbg_print_err(
+ (boost::format("libusb_async_cb,%s,%i,%i,%i,%ld,%ld") % (r->is_recv ? "rx" : "tx")
+ % r->buff_num % r->actual_length % r->status % end_time % r->start_time)
+ .str());
#endif
}
@@ -109,41 +118,53 @@ static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
class libusb_zero_copy_mb : public managed_buffer
{
public:
- libusb_zero_copy_mb(libusb_transfer *lut, const size_t frame_size, boost::function<void(libusb_zero_copy_mb *)> release_cb, const bool is_recv, const std::string &name):
- _release_cb(release_cb), _is_recv(is_recv), _name(name),
- _ctx(libusb::session::get_global_session()->get_context()),
- _lut(lut), _frame_size(frame_size) { /* NOP */ }
+ libusb_zero_copy_mb(libusb_transfer* lut,
+ const size_t frame_size,
+ boost::function<void(libusb_zero_copy_mb*)> release_cb,
+ const bool is_recv,
+ const std::string& name)
+ : _release_cb(release_cb)
+ , _is_recv(is_recv)
+ , _name(name)
+ , _ctx(libusb::session::get_global_session()->get_context())
+ , _lut(lut)
+ , _frame_size(frame_size)
+ { /* NOP */
+ }
virtual ~libusb_zero_copy_mb(void);
- void release(void){
- _release_cb(this);
+ void release(void)
+ {
+ _release_cb(this);
}
UHD_INLINE void submit(void)
{
- _lut->length = int((_is_recv)? _frame_size : size()); //always set length
+ _lut->length = int((_is_recv) ? _frame_size : size()); // always set length
#ifdef UHD_TXRX_DEBUG_PRINTS
result.start_time = boost::get_system_time().time_of_day().total_microseconds();
- result.buff_num = num();
- result.is_recv = _is_recv;
+ result.buff_num = num();
+ result.is_recv = _is_recv;
#endif
- int ret = libusb_submit_transfer(_lut);
+ int ret = libusb_submit_transfer(_lut);
if (ret != LIBUSB_SUCCESS)
- throw uhd::usb_error(ret, str(boost::format(
- "usb %s submit failed: %s") % _name % libusb_error_name(ret)));
+ throw uhd::usb_error(ret,
+ str(boost::format("usb %s submit failed: %s") % _name
+ % libusb_error_name(ret)));
}
template <typename buffer_type>
UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
{
- if (wait_for_completion(timeout))
- {
+ if (wait_for_completion(timeout)) {
if (result.status != LIBUSB_TRANSFER_COMPLETED)
throw uhd::io_error(str(boost::format("usb %s transfer status: %d")
% _name % libusb_error_name(result.status)));
result.completed = 0;
- return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? size_t(result.actual_length) : _frame_size);
+ return make(reinterpret_cast<buffer_type*>(this),
+ _lut->buffer,
+ (_is_recv) ? size_t(result.actual_length) : _frame_size);
}
return typename buffer_type::sptr();
}
@@ -164,23 +185,27 @@ public:
if (timeout < 0.0) {
result.usb_transfer_complete.wait(lock);
} else {
- const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
- result.usb_transfer_complete.timed_wait(lock, timeout_time, lut_result_completed(result));
+ const boost::system_time timeout_time =
+ boost::get_system_time()
+ + boost::posix_time::microseconds(long(timeout * 1000000));
+ result.usb_transfer_complete.timed_wait(
+ lock, timeout_time, lut_result_completed(result));
}
}
return (result.completed > 0);
}
private:
- boost::function<void(libusb_zero_copy_mb *)> _release_cb;
+ boost::function<void(libusb_zero_copy_mb*)> _release_cb;
const bool _is_recv;
const std::string _name;
- libusb_context *_ctx;
- libusb_transfer *_lut;
+ libusb_context* _ctx;
+ libusb_transfer* _lut;
const size_t _frame_size;
};
-libusb_zero_copy_mb::~libusb_zero_copy_mb(void) {
+libusb_zero_copy_mb::~libusb_zero_copy_mb(void)
+{
/* NOP */
}
@@ -190,70 +215,71 @@ libusb_zero_copy_mb::~libusb_zero_copy_mb(void) {
class libusb_zero_copy_single
{
public:
- libusb_zero_copy_single(
- libusb::device_handle::sptr handle,
- const int interface, const unsigned char endpoint,
- const size_t num_frames, const size_t frame_size
- ):
- _handle(handle),
- _num_frames(num_frames),
- _frame_size(frame_size),
- _buffer_pool(buffer_pool::make(_num_frames, _frame_size)),
- _enqueued(_num_frames), _released(_num_frames),
- _status(STATUS_RUNNING)
+ libusb_zero_copy_single(libusb::device_handle::sptr handle,
+ const int interface,
+ const unsigned char endpoint,
+ const size_t num_frames,
+ const size_t frame_size)
+ : _handle(handle)
+ , _num_frames(num_frames)
+ , _frame_size(frame_size)
+ , _buffer_pool(buffer_pool::make(_num_frames, _frame_size))
+ , _enqueued(_num_frames)
+ , _released(_num_frames)
+ , _status(STATUS_RUNNING)
{
const bool is_recv = (endpoint & 0x80) != 0;
- const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f));
+ const std::string name =
+ str(boost::format("%s%d") % ((is_recv) ? "rx" : "tx") % int(endpoint & 0x7f));
_handle->claim_interface(interface);
- //flush the buffers out of the recv endpoint
- //limit the flushing to at most one second
- if (is_recv) for (size_t i = 0; i < 100; i++)
- {
- unsigned char buff[512];
- int transfered = 0;
- const int status = libusb_bulk_transfer(
- _handle->get(), // dev_handle
- endpoint, // endpoint
- static_cast<unsigned char *>(buff),
- int(sizeof(buff)),
- &transfered, //bytes xfered
- 10 //timeout ms
- );
- if (status == LIBUSB_ERROR_TIMEOUT) break;
- }
+ // flush the buffers out of the recv endpoint
+ // limit the flushing to at most one second
+ if (is_recv)
+ for (size_t i = 0; i < 100; i++) {
+ unsigned char buff[512];
+ int transfered = 0;
+ const int status = libusb_bulk_transfer(_handle->get(), // dev_handle
+ endpoint, // endpoint
+ static_cast<unsigned char*>(buff),
+ int(sizeof(buff)),
+ &transfered, // bytes xfered
+ 10 // timeout ms
+ );
+ if (status == LIBUSB_ERROR_TIMEOUT)
+ break;
+ }
- //allocate libusb transfer structs and managed buffers
- for (size_t i = 0; i < get_num_frames(); i++)
- {
- libusb_transfer *lut = libusb_alloc_transfer(0);
+ // allocate libusb transfer structs and managed buffers
+ for (size_t i = 0; i < get_num_frames(); i++) {
+ libusb_transfer* lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _mb_pool.push_back(boost::make_shared<libusb_zero_copy_mb>(
- lut, this->get_frame_size(), boost::bind(&libusb_zero_copy_single::enqueue_buffer, this, _1), is_recv, name
- ));
-
- libusb_fill_bulk_transfer(
- lut, // transfer
- _handle->get(), // dev_handle
- endpoint, // endpoint
- static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer
- int(this->get_frame_size()), // length
- libusb_transfer_cb_fn(&libusb_async_cb), // callback
- static_cast<void *>(&_mb_pool.back()->result), // user_data
- 0 // timeout (ms)
+ _mb_pool.push_back(boost::make_shared<libusb_zero_copy_mb>(lut,
+ this->get_frame_size(),
+ boost::bind(&libusb_zero_copy_single::enqueue_buffer, this, _1),
+ is_recv,
+ name));
+
+ libusb_fill_bulk_transfer(lut, // transfer
+ _handle->get(), // dev_handle
+ endpoint, // endpoint
+ static_cast<unsigned char*>(_buffer_pool->at(i)), // buffer
+ int(this->get_frame_size()), // length
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void*>(&_mb_pool.back()->result), // user_data
+ 0 // timeout (ms)
);
_all_luts.push_back(lut);
}
- //initial release for all buffers
- for (size_t i = 0; i < get_num_frames(); i++)
- {
- libusb_zero_copy_mb &mb = *(_mb_pool[i]);
- if (is_recv) mb.release();
- else
- {
+ // initial release for all buffers
+ for (size_t i = 0; i < get_num_frames(); i++) {
+ libusb_zero_copy_mb& mb = *(_mb_pool[i]);
+ if (is_recv)
+ mb.release();
+ else {
mb.result.completed = 1;
_enqueued.push_back(&mb);
}
@@ -262,21 +288,18 @@ public:
~libusb_zero_copy_single(void)
{
- //cancel all transfers
- for(libusb_transfer *lut: _all_luts)
- {
+ // cancel all transfers
+ for (libusb_transfer* lut : _all_luts) {
libusb_cancel_transfer(lut);
}
- //process all transfers until timeout occurs
- for(libusb_zero_copy_mb *mb: _enqueued)
- {
+ // process all transfers until timeout occurs
+ for (libusb_zero_copy_mb* mb : _enqueued) {
mb->wait_for_completion(0.01);
}
- //free all transfers
- for(libusb_transfer *lut: _all_luts)
- {
+ // free all transfers
+ for (libusb_transfer* lut : _all_luts) {
libusb_free_transfer(lut);
}
}
@@ -293,24 +316,32 @@ public:
boost::mutex::scoped_lock get_buff_lock(_get_buff_mutex);
boost::mutex::scoped_lock queue_lock(_queue_mutex);
- if (_enqueued.empty())
- {
- _buff_ready_cond.timed_wait(queue_lock, boost::posix_time::microseconds(long(timeout*1e6)));
+ if (_enqueued.empty()) {
+ _buff_ready_cond.timed_wait(
+ queue_lock, boost::posix_time::microseconds(long(timeout * 1e6)));
}
- if (_enqueued.empty()) return buff;
- libusb_zero_copy_mb *front = _enqueued.front();
+ if (_enqueued.empty())
+ return buff;
+ libusb_zero_copy_mb* front = _enqueued.front();
queue_lock.unlock();
buff = front->get_new<buffer_type>(timeout);
queue_lock.lock();
- if (buff) _enqueued.pop_front();
+ if (buff)
+ _enqueued.pop_front();
this->submit_what_we_can();
return buff;
}
- UHD_INLINE size_t get_num_frames(void) const { return _num_frames; }
- UHD_INLINE size_t get_frame_size(void) const { return _frame_size; }
+ UHD_INLINE size_t get_num_frames(void) const
+ {
+ return _num_frames;
+ }
+ UHD_INLINE size_t get_frame_size(void) const
+ {
+ return _frame_size;
+ }
private:
libusb::device_handle::sptr _handle;
@@ -318,18 +349,18 @@ private:
//! Storage for transfer related objects
buffer_pool::sptr _buffer_pool;
- std::vector<boost::shared_ptr<libusb_zero_copy_mb> > _mb_pool;
+ std::vector<boost::shared_ptr<libusb_zero_copy_mb>> _mb_pool;
boost::mutex _queue_mutex;
boost::condition_variable _buff_ready_cond;
boost::mutex _get_buff_mutex;
//! why 2 queues? there is room in the future to have > N buffers but only N in flight
- boost::circular_buffer<libusb_zero_copy_mb *> _enqueued, _released;
+ boost::circular_buffer<libusb_zero_copy_mb*> _enqueued, _released;
- enum {STATUS_RUNNING, STATUS_ERROR} _status;
+ enum { STATUS_RUNNING, STATUS_ERROR } _status;
- void enqueue_buffer(libusb_zero_copy_mb *mb)
+ void enqueue_buffer(libusb_zero_copy_mb* mb)
{
boost::mutex::scoped_lock l(_queue_mutex);
_released.push_back(mb);
@@ -341,15 +372,12 @@ private:
{
if (_status == STATUS_ERROR)
return;
- while (not _released.empty() and not _enqueued.full())
- {
+ while (not _released.empty() and not _enqueued.full()) {
try {
_released.front()->submit();
_enqueued.push_back(_released.front());
_released.pop_front();
- }
- catch (uhd::usb_error& e)
- {
+ } catch (uhd::usb_error& e) {
_status = STATUS_ERROR;
throw e;
}
@@ -357,7 +385,7 @@ private:
}
//! a list of all transfer structs we allocated
- std::list<libusb_transfer *> _all_luts;
+ std::list<libusb_transfer*> _all_luts;
};
/***********************************************************************
@@ -365,20 +393,21 @@ private:
**********************************************************************/
struct libusb_zero_copy_impl : usb_zero_copy
{
- libusb_zero_copy_impl(
- libusb::device_handle::sptr handle,
+ libusb_zero_copy_impl(libusb::device_handle::sptr handle,
const int recv_interface,
const unsigned char recv_endpoint,
const int send_interface,
const unsigned char send_endpoint,
- const device_addr_t &hints
- ){
- _recv_impl.reset(new libusb_zero_copy_single(
- handle, recv_interface, (recv_endpoint & 0x7f) | 0x80,
+ const device_addr_t& hints)
+ {
+ _recv_impl.reset(new libusb_zero_copy_single(handle,
+ recv_interface,
+ (recv_endpoint & 0x7f) | 0x80,
size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS)),
size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))));
- _send_impl.reset(new libusb_zero_copy_single(
- handle, send_interface, (send_endpoint & 0x7f) | 0x00,
+ _send_impl.reset(new libusb_zero_copy_single(handle,
+ send_interface,
+ (send_endpoint & 0x7f) | 0x00,
size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)),
size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))));
}
@@ -397,42 +426,53 @@ struct libusb_zero_copy_impl : usb_zero_copy
return _send_impl->get_buff<managed_send_buffer>(timeout);
}
- size_t get_num_recv_frames(void) const { return _recv_impl->get_num_frames(); }
- size_t get_num_send_frames(void) const { return _send_impl->get_num_frames(); }
+ size_t get_num_recv_frames(void) const
+ {
+ return _recv_impl->get_num_frames();
+ }
+ size_t get_num_send_frames(void) const
+ {
+ return _send_impl->get_num_frames();
+ }
- size_t get_recv_frame_size(void) const { return _recv_impl->get_frame_size(); }
- size_t get_send_frame_size(void) const { return _send_impl->get_frame_size(); }
+ size_t get_recv_frame_size(void) const
+ {
+ return _recv_impl->get_frame_size();
+ }
+ size_t get_send_frame_size(void) const
+ {
+ return _send_impl->get_frame_size();
+ }
boost::shared_ptr<libusb_zero_copy_single> _recv_impl, _send_impl;
boost::mutex _recv_mutex, _send_mutex;
};
-libusb_zero_copy_impl::~libusb_zero_copy_impl(void) {
+libusb_zero_copy_impl::~libusb_zero_copy_impl(void)
+{
/* NOP */
}
/***********************************************************************
* USB zero_copy destructor
**********************************************************************/
-usb_zero_copy::~usb_zero_copy(void) {
+usb_zero_copy::~usb_zero_copy(void)
+{
/* NOP */
}
/***********************************************************************
* USB zero_copy make functions
**********************************************************************/
-usb_zero_copy::sptr usb_zero_copy::make(
- usb_device_handle::sptr handle,
+usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle,
const int recv_interface,
const unsigned char recv_endpoint,
const int send_interface,
const unsigned char send_endpoint,
- const device_addr_t &hints
-){
+ const device_addr_t& hints)
+{
libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(
- boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()
- ));
+ boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()));
return sptr(new libusb_zero_copy_impl(
- dev_handle, recv_interface, recv_endpoint, send_interface, send_endpoint, hints
- ));
+ dev_handle, recv_interface, recv_endpoint, send_interface, send_endpoint, hints));
}