summaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2010-09-26 20:23:23 -0700
committerJosh Blum <josh@joshknows.com>2010-09-26 20:23:23 -0700
commit0816925695aa69a1ae344863fef47d239b3ec8af (patch)
tree22abcd826c9ded4e01e16265894d68be1dbc58b4 /host
parentdc8bcfde805228ed5d00334ce44c6c0684dcfe2c (diff)
downloaduhd-0816925695aa69a1ae344863fef47d239b3ec8af.tar.gz
uhd-0816925695aa69a1ae344863fef47d239b3ec8af.tar.bz2
uhd-0816925695aa69a1ae344863fef47d239b3ec8af.zip
usb: zero copy work, multiple endpoints with single context, async io
Heavy work on the zero copy interface and endpoint wrappers to properly use the async io. The global libusb session starts a thread to run the event handler, the async callbacks push completed transfers onto a thread-safe bounded buffer. The managed buffer creation routines use the bounded buffer to efficiently pop off completed transfers. works on linux, throws a weird exception on cleanup
Diffstat (limited to 'host')
-rw-r--r--host/lib/transport/libusb1_base.cpp16
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp474
2 files changed, 134 insertions, 356 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index 5ff996642..26e864459 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -20,6 +20,7 @@
#include <uhd/types/dict.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
#include <iostream>
using namespace uhd::transport;
@@ -32,9 +33,12 @@ public:
libusb_session_impl(void){
UHD_ASSERT_THROW(libusb_init(&_context) == 0);
libusb_set_debug(_context, debug_level);
+ _thread_group.create_thread(boost::bind(&libusb_session_impl::run_event_loop, this));
}
~libusb_session_impl(void){
+ _running = false;
+ _thread_group.join_all();
libusb_exit(_context);
}
@@ -44,6 +48,18 @@ public:
private:
libusb_context *_context;
+ boost::thread_group _thread_group;
+ bool _running;
+
+ void run_event_loop(void){
+ _running = true;
+ timeval tv;
+ while(_running){
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000; //100ms
+ libusb_handle_events_timeout(this->get_context(), &tv);
+ }
+ }
};
libusb::session::sptr libusb::session::get_global_session(void){
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 41cd4b3fc..e84793e88 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -17,8 +17,11 @@
#include "libusb1_base.hpp"
#include <uhd/transport/usb_zero_copy.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/utils/assert.hpp>
-#include <boost/format.hpp>
+#include <boost/shared_array.hpp>
+#include <boost/foreach.hpp>
+#include <vector>
#include <iostream>
#include <iomanip>
@@ -55,52 +58,57 @@ void pp_transfer(libusb_transfer *lut)
* interface provided by the kernel.
**********************************************************************/
class usb_endpoint {
+public:
+ typedef boost::shared_ptr<usb_endpoint> sptr;
+
+ usb_endpoint(
+ libusb::device_handle::sptr handle,
+ int endpoint,
+ bool input,
+ size_t transfer_size,
+ size_t num_transfers
+ );
+
+ ~usb_endpoint(void);
+
+ // Exposed interface for submitting / retrieving transfer buffers
+
+ //! Submit a new transfer that was presumably just filled or emptied.
+ void submit(libusb_transfer *lut);
+
+ /*!
+ * Get an available transfer:
+ * For inputs, this is a just filled transfer.
+ * For outputs, this is a just emptied transfer.
+ * \param timeout_ms the timeout to wait for a lut
+ * \return the transfer pointer or NULL if timeout
+ */
+ libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100);
+
+ //Callback use only
+ void callback_handle_transfer(libusb_transfer *lut);
+
private:
libusb::device_handle::sptr _handle;
- libusb::session::sptr _session;
int _endpoint;
bool _input;
size_t _transfer_size;
size_t _num_transfers;
- // Transfer state lists (transfers are free, pending, or completed)
- std::list<libusb_transfer *> _free_list;
- std::list<libusb_transfer *> _pending_list;
- std::list<libusb_transfer *> _completed_list;
+ //! hold a bounded buffer of completed transfers
+ typedef bounded_buffer<libusb_transfer *> lut_buff_type;
+ lut_buff_type::sptr _completed_list;
+
+ //! a list of all transfer structs we allocated
+ std::vector<libusb_transfer *> _all_luts;
+
+ //! a list of shared arrays for the transfer buffers
+ std::vector<boost::shared_array<boost::uint8_t> > _buffers;
// Calls for processing asynchronous I/O
libusb_transfer *allocate_transfer(int buff_len);
- bool cancel(libusb_transfer *lut);
- bool cancel_all();
- bool reap_pending_list();
- bool reap_pending_list_timeout();
- bool reap_completed_list();
-
- // Transfer state manipulators
- void free_list_add(libusb_transfer *lut);
- void pending_list_add(libusb_transfer *lut);
- void completed_list_add(libusb_transfer *lut);
- libusb_transfer *free_list_get();
- libusb_transfer *completed_list_get();
- bool pending_list_remove(libusb_transfer *lut);
-
- // Debug use
void print_transfer_status(libusb_transfer *lut);
-
-public:
- usb_endpoint(libusb::device_handle::sptr handle, libusb::session::sptr sess,
- int endpoint, bool input, size_t transfer_size, size_t num_transfers);
-
- ~usb_endpoint();
-
- // Exposed interface for submitting / retrieving transfer buffers
- bool submit(libusb_transfer *lut);
- libusb_transfer *get_completed_transfer();
- libusb_transfer *get_free_transfer();
-
- //Callback use only
- void callback_handle_transfer(libusb_transfer *lut);
};
@@ -113,9 +121,8 @@ public:
* it from the pending to completed status list.
* \param lut pointer to libusb_transfer
*/
-static void callback(libusb_transfer *lut)
-{
- usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;
+static void callback(libusb_transfer *lut){
+ usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;
endpoint->callback_handle_transfer(lut);
}
@@ -124,14 +131,8 @@ static void callback(libusb_transfer *lut)
* Accessor call to allow list access from callback space
* \param pointer to libusb_transfer
*/
-void usb_endpoint::callback_handle_transfer(libusb_transfer *lut)
-{
- if (!pending_list_remove(lut)) {
- std::cerr << "USB: pending remove failed" << std::endl;
- return;
- }
-
- completed_list_add(lut);
+void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){
+ _completed_list->push_with_wait(lut);
}
@@ -142,18 +143,27 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut)
* data is available.
*/
usb_endpoint::usb_endpoint(
- libusb::device_handle::sptr handle, libusb::session::sptr sess,
- int endpoint, bool input, size_t transfer_size, size_t num_transfers)
- : _handle(handle), _session(sess),
- _endpoint(endpoint), _input(input),
- _transfer_size(transfer_size), _num_transfers(num_transfers)
+ libusb::device_handle::sptr handle,
+ int endpoint,
+ bool input,
+ size_t transfer_size,
+ size_t num_transfers
+):
+ _handle(handle),
+ _endpoint(endpoint),
+ _input(input),
+ _transfer_size(transfer_size),
+ _num_transfers(num_transfers)
{
- unsigned int i;
- for (i = 0; i < _num_transfers; i++) {
- free_list_add(allocate_transfer(_transfer_size));
+ _completed_list = lut_buff_type::make(num_transfers);
- if (_input)
- submit(free_list_get());
+ for (size_t i = 0; i < _num_transfers; i++){
+ _all_luts.push_back(allocate_transfer(_transfer_size));
+
+ //input luts are immediately submitted to be filled
+ //output luts go into the completed list as free buffers
+ if (_input) this->submit(_all_luts.back());
+ else _completed_list->push_with_wait(_all_luts.back());
}
}
@@ -165,22 +175,21 @@ usb_endpoint::usb_endpoint(
* the transfers. Libusb will deallocate the data buffer held by
* each transfer.
*/
-usb_endpoint::~usb_endpoint()
-{
- cancel_all();
-
- while (!_pending_list.empty()) {
- if (!reap_pending_list())
- std::cerr << "error: destructor failed to reap" << std::endl;
+usb_endpoint::~usb_endpoint(void){
+ //cancel all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_cancel_transfer(lut);
}
- while (!_completed_list.empty()) {
- if (!reap_completed_list())
- std::cerr << "error: destructor failed to reap" << std::endl;
- }
+ //collect canceled transfers (drain the queue)
+ libusb_transfer *lut;
+ while(_completed_list->pop_with_timed_wait(
+ lut, boost::posix_time::milliseconds(100)
+ ));
- while (!_free_list.empty()) {
- libusb_free_transfer(free_list_get());
+ //free all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_free_transfer(lut);
}
}
@@ -192,20 +201,21 @@ usb_endpoint::~usb_endpoint()
* \param buff_len size of the individual buffer held by each transfer
* \return pointer to an allocated libusb_transfer
*/
-libusb_transfer *usb_endpoint::allocate_transfer(int buff_len)
-{
+libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){
libusb_transfer *lut = libusb_alloc_transfer(0);
- unsigned char *buff = new unsigned char[buff_len];
+ boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]);
+ _buffers.push_back(buff); //store a reference to this shared array
unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0));
+ libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);
libusb_fill_bulk_transfer(lut, // transfer
_handle->get(), // dev_handle
endpoint, // endpoint
- buff, // buffer
+ buff.get(), // buffer
buff_len, // length
- libusb_transfer_cb_fn(callback), // callback
+ lut_callback, // callback
this, // user_data
0); // timeout
return lut;
@@ -218,95 +228,15 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len)
* \param lut pointer to libusb_transfer
* \return true on success or false on error
*/
-bool usb_endpoint::submit(libusb_transfer *lut)
-{
- int retval;
- if ((retval = libusb_submit_transfer(lut)) < 0) {
- std::cerr << "error: libusb_submit_transfer: " << retval << std::endl;
- return false;
- }
-
- pending_list_add(lut);
- return true;
-}
-
-
-/*
- * Cancel a pending transfer
- * Search the pending list for the transfer and cancel if found.
- * \param lut pointer to libusb_transfer to cancel
- * \return true on success or false if transfer is not found
- *
- * Note: success only indicates submission of cancelation request.
- * Sucessful cancelation is not known until the callback occurs.
- */
-bool usb_endpoint::cancel(libusb_transfer *lut)
-{
- std::list<libusb_transfer*>::iterator iter;
- for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) {
- if (*iter == lut) {
- libusb_cancel_transfer(lut);
- return true;
- }
- }
- return false;
-}
-
-
-/*
- * Cancel all pending transfers
- * \return bool true if cancelation request is submitted
- *
- * Note: success only indicates submission of cancelation request.
- * Sucessful cancelation is not known until the callback occurs.
- */
-bool usb_endpoint::cancel_all()
-{
- std::list<libusb_transfer*>::iterator iter;
-
- for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) {
- if (libusb_cancel_transfer(*iter) < 0) {
- std::cerr << "error: libusb_cancal_transfer() failed" << std::endl;
- return false;
- }
- }
-
- return true;
-}
-
-
-/*
- * Reap completed transfers
- * return true if at least one transfer was reaped, false otherwise.
- * Check completed transfers for errors and mark as free. This is a
- * blocking call.
- * \return bool true if a libusb transfer is reaped, false otherwise
- */
-bool usb_endpoint::reap_completed_list()
-{
- libusb_transfer *lut;
-
- if (_completed_list.empty()) {
- if (!reap_pending_list_timeout())
- return false;
- }
-
- while (!_completed_list.empty()) {
- lut = completed_list_get();
- print_transfer_status(lut);
- free_list_add(lut);
- }
-
- return true;
+void usb_endpoint::submit(libusb_transfer *lut){
+ UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0);
}
-
/*
* Print status errors of a completed transfer
* \param lut pointer to an libusb_transfer
*/
-void usb_endpoint::print_transfer_status(libusb_transfer *lut)
-{
+void usb_endpoint::print_transfer_status(libusb_transfer *lut){
switch (lut->status) {
case LIBUSB_TRANSFER_COMPLETED:
if (lut->actual_length < lut->length) {
@@ -342,166 +272,14 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut)
}
}
-
-/*
- * Reap pending transfers without timeout
- * This is a blocking call. Reaping submitted transfers is
- * handled by libusb and the assigned callback function.
- * Block until at least one transfer is reaped.
- * \return true true if a transfer was reaped or false otherwise
- */
-bool usb_endpoint::reap_pending_list()
-{
- int retval;
-
- if ((retval = libusb_handle_events(_session->get_context())) < 0) {
- std::cerr << "error: libusb_handle_events: " << retval << std::endl;
- return false;
- }
-
- return true;
-}
-
-
-/*
- * Reap pending transfers with timeout
- * This call blocks until a transfer is reaped or timeout.
- * Reaping submitted transfers is handled by libusb and the
- * assigned callback function. Block until at least one
- * transfer is reaped or timeout occurs.
- * \return true if a transfer was reaped or false otherwise
- */
-bool usb_endpoint::reap_pending_list_timeout()
-{
- int retval;
- timeval tv;
-
- tv.tv_sec = 0;
- tv.tv_usec = 100000; //100ms
-
- size_t pending_list_size = _pending_list.size();
-
- if ((retval = libusb_handle_events_timeout(_session->get_context(), &tv)) < 0) {
- std::cerr << "error: libusb_handle_events: " << retval << std::endl;
- return false;
- }
-
- if (_pending_list.size() < pending_list_size) {
- return true;
- }
- else {
- return false;
- }
-}
-
-
-/*
- * Get a free transfer
- * The transfer has an empty data bufer for OUT requests
- * \return pointer to a libusb_transfer
- */
-libusb_transfer *usb_endpoint::get_free_transfer()
-{
- if (_free_list.empty()) {
- if (!reap_completed_list())
- return NULL;
- }
-
- return free_list_get();
-}
-
-
-/*
- * Get a completed transfer
- * The transfer has a full data buffer for IN requests
- * \return pointer to libusb_transfer
- */
-libusb_transfer *usb_endpoint::get_completed_transfer()
-{
- if (_completed_list.empty()) {
- if (!reap_pending_list_timeout())
- return NULL;
- }
-
- return completed_list_get();
-}
-
-/*
- * List operations
- */
-void usb_endpoint::free_list_add(libusb_transfer *lut)
-{
- _free_list.push_back(lut);
-}
-
-void usb_endpoint::pending_list_add(libusb_transfer *lut)
-{
- _pending_list.push_back(lut);
-}
-
-void usb_endpoint::completed_list_add(libusb_transfer *lut)
-{
- _completed_list.push_back(lut);
-}
-
-
-/*
- * Free and completed lists don't have ordered content
- * Pop transfers from the front as needed
- */
-libusb_transfer *usb_endpoint::free_list_get()
-{
- libusb_transfer *lut;
-
- if (_free_list.size() == 0) {
- return NULL;
- }
- else {
- lut = _free_list.front();
- _free_list.pop_front();
- return lut;
- }
-}
-
-
-/*
- * Free and completed lists don't have ordered content
- * Pop transfers from the front as needed
- */
-libusb_transfer *usb_endpoint::completed_list_get()
-{
+libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){
libusb_transfer *lut;
-
- if (_completed_list.empty()) {
- return NULL;
- }
- else {
- lut = _completed_list.front();
- _completed_list.pop_front();
- return lut;
- }
+ if (_completed_list->pop_with_timed_wait(
+ lut, boost::posix_time::milliseconds(timeout_ms)
+ )) return lut;
+ return NULL;
}
-
-/*
- * Search and remove transfer from pending list
- * Assuming that the callbacks occur in order, the front element
- * should yield the correct transfer. If not, then something else
- * is going on. If no transfers match, then something went wrong.
- */
-bool usb_endpoint::pending_list_remove(libusb_transfer *lut)
-{
- std::list<libusb_transfer*>::iterator iter;
- for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) {
- if (*iter == lut) {
- _pending_list.erase(iter);
- return true;
- }
- }
- return false;
-}
-
-
/***********************************************************************
* Managed buffers
**********************************************************************/
@@ -515,17 +293,15 @@ bool usb_endpoint::pending_list_remove(libusb_transfer *lut)
class libusb_managed_recv_buffer_impl : public managed_recv_buffer {
public:
libusb_managed_recv_buffer_impl(libusb_transfer *lut,
- usb_endpoint *endpoint)
+ usb_endpoint::sptr endpoint)
: _buff(lut->buffer, lut->length)
{
_lut = lut;
_endpoint = endpoint;
}
- ~libusb_managed_recv_buffer_impl()
- {
- if (!_endpoint->submit(_lut))
- std::cerr << "USB: failed to submit IN transfer" << std::endl;
+ ~libusb_managed_recv_buffer_impl(void){
+ _endpoint->submit(_lut);
}
private:
@@ -535,7 +311,7 @@ private:
}
libusb_transfer *_lut;
- usb_endpoint *_endpoint;
+ usb_endpoint::sptr _endpoint;
const boost::asio::const_buffer _buff;
};
@@ -551,7 +327,7 @@ private:
class libusb_managed_send_buffer_impl : public managed_send_buffer {
public:
libusb_managed_send_buffer_impl(libusb_transfer *lut,
- usb_endpoint *endpoint,
+ usb_endpoint::sptr endpoint,
size_t buff_size)
: _buff(lut->buffer, buff_size), _committed(false)
{
@@ -559,8 +335,7 @@ public:
_endpoint = endpoint;
}
- ~libusb_managed_send_buffer_impl()
- {
+ ~libusb_managed_send_buffer_impl(void){
if (!_committed) {
_lut->length = 0;
_lut->actual_length = 0;
@@ -580,12 +355,14 @@ public:
_lut->length = num_bytes;
_lut->actual_length = 0;
- if (_endpoint->submit(_lut)) {
+ try{
+ _endpoint->submit(_lut);
_committed = true;
return num_bytes;
}
- else {
- return 0;
+ catch(const std::exception &e){
+ std::cerr << "Error in commit: " << e.what() << std::endl;
+ return -1;
}
}
@@ -596,7 +373,7 @@ private:
}
libusb_transfer *_lut;
- usb_endpoint *_endpoint;
+ usb_endpoint::sptr _endpoint;
const boost::asio::mutable_buffer _buff;
bool _committed;
};
@@ -608,11 +385,9 @@ private:
class libusb_zero_copy_impl : public usb_zero_copy
{
private:
- usb_endpoint *_rx_ep;
- usb_endpoint *_tx_ep;
+ usb_endpoint::sptr _rx_ep, _tx_ep;
libusb::device_handle::sptr _handle;
- libusb::session::sptr _session;
size_t _recv_buff_size;
size_t _send_buff_size;
@@ -626,8 +401,6 @@ public:
unsigned int tx_endpoint,
size_t recv_buff_size,
size_t send_buff_size);
-
- ~libusb_zero_copy_impl();
managed_recv_buffer::sptr get_recv_buff(void);
managed_send_buffer::sptr get_send_buff(void);
@@ -646,45 +419,38 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle,
unsigned int tx_endpoint,
size_t buff_size,
size_t block_size)
- : _handle(handle), _session(libusb::session::get_global_session()),
+ : _handle(handle),
_recv_buff_size(block_size), _send_buff_size(block_size),
_num_frames(buff_size / block_size)
{
_handle->claim_interface(2 /*in interface*/);
_handle->claim_interface(1 /*out interface*/);
- _rx_ep = new usb_endpoint(_handle, // libusb device_handle
- _session, // libusb session w/ context
+ _rx_ep = usb_endpoint::sptr(new usb_endpoint(
+ _handle, // libusb device_handle
rx_endpoint, // USB endpoint number
true, // IN endpoint
_recv_buff_size, // buffer size per transfer
- _num_frames); // number of libusb transfers
+ _num_frames // number of libusb transfers
+ ));
- _tx_ep = new usb_endpoint(_handle, // libusb device_handle
- _session, // libusb session w/ context
+ _tx_ep = usb_endpoint::sptr(new usb_endpoint(
+ _handle, // libusb device_handle
tx_endpoint, // USB endpoint number
false, // OUT endpoint
_send_buff_size, // buffer size per transfer
- _num_frames); // number of libusb transfers
-}
-
-
-libusb_zero_copy_impl::~libusb_zero_copy_impl()
-{
- delete _rx_ep;
- delete _tx_ep;
+ _num_frames // number of libusb transfers
+ ));
}
-
/*
* Construct a managed receive buffer from a completed libusb transfer
* (happy with buffer full of data) obtained from the receive endpoint.
* Return empty pointer if no transfer is available (timeout or error).
* \return pointer to a managed receive buffer
*/
-managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff()
-{
- libusb_transfer *lut = _rx_ep->get_completed_transfer();
+managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){
+ libusb_transfer *lut = _rx_ep->get_lut_with_wait(/* TODO timeout API */);
if (lut == NULL) {
return managed_recv_buffer::sptr();
}
@@ -702,9 +468,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff()
* (timeout or error).
* \return pointer to a managed send buffer
*/
-managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff()
-{
- libusb_transfer *lut = _tx_ep->get_free_transfer();
+managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){
+ libusb_transfer *lut = _tx_ep->get_lut_with_wait(/* TODO timeout API */);
if (lut == NULL) {
return managed_send_buffer::sptr();
}
@@ -736,6 +501,3 @@ usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle,
buff_size,
block_size));
}
-
-
-