summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp160
1 files changed, 75 insertions, 85 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 0fa856d34..4259c42ed 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -17,13 +17,9 @@
#include "libusb1_base.hpp"
#include <uhd/transport/usb_zero_copy.hpp>
-#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
-#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/msg.hpp>
-#include <uhd/utils/tasks.hpp>
#include <uhd/exception.hpp>
-#include <boost/function.hpp>
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
#include <list>
@@ -44,18 +40,9 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
* to ensure that they are compiled with the same calling convention as libusb.
*/
-//! helper function: handles all rx async callbacks
-static void LIBUSB_CALL libusb_async_rx_cb(libusb_transfer *lut){
- if(lut->actual_length == 0) {
- UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); //get out until you find some real data
- return;
- }
- (*static_cast<boost::function<void()> *>(lut->user_data))();
-}
-
-//! helper function: handles all tx async callbacks
-static void LIBUSB_CALL libusb_async_tx_cb(libusb_transfer *lut) {
- (*static_cast<boost::function<void()> *>(lut->user_data))();
+//! helper function: handles all async callbacks
+static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){
+ *(static_cast<bool *>(lut->user_data)) = true;
}
//! callback to free transfer upon cancellation
@@ -64,6 +51,34 @@ static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){
else UHD_MSG(error) << "libusb cancel_transfer unexpected status " << lut->status << std::endl;
}
+/*!
+ * Wait for a managed buffer to become complete.
+ *
+ * This routine processes async events until the transaction completes.
+ * We must call the libusb handle events in a loop because the handler
+ * may complete managed buffers other than the one we are waiting on.
+ *
+ * We cannot determine if handle events timed out or processed an event.
+ * Therefore, the timeout condition is handled by using boost system time.
+ *
+ * \param ctx the libusb context structure
+ * \param timeout the wait timeout in seconds
+ * \param completed a reference to the completed flag
+ * \return true for completion, false for timeout
+ */
+UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){
+ const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
+
+ while (not completed and (boost::get_system_time() < timeout_time)){
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 10000; /*10ms*/
+ libusb_handle_events_timeout(ctx, &tv);
+ }
+
+ return completed;
+}
+
/***********************************************************************
* Reusable managed receiver buffer:
* - Associated with a particular libusb transfer struct.
@@ -72,23 +87,32 @@ static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){
class libusb_zero_copy_mrb : public managed_recv_buffer{
public:
libusb_zero_copy_mrb(libusb_transfer *lut):
- _lut(lut), _expired(true) { /* NOP */ }
+ _ctx(libusb::session::get_global_session()->get_context()),
+ _lut(lut), _expired(false) { /* NOP */ }
void release(void){
if (_expired) return;
+ completed = false;
UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
_expired = true;
}
- sptr get_new(void){
- _expired = false;
- return make_managed_buffer(this);
+ sptr get_new(const double timeout, size_t &index){
+ if (wait_for_completion(_ctx, timeout, completed)){
+ index++;
+ _expired = false;
+ return make_managed_buffer(this);
+ }
+ return managed_recv_buffer::sptr();
}
+ bool completed;
+
private:
const void *get_buff(void) const{return _lut->buffer;}
size_t get_size(void) const{return _lut->actual_length;}
+ libusb_context *_ctx;
libusb_transfer *_lut;
bool _expired;
};
@@ -101,25 +125,34 @@ private:
class libusb_zero_copy_msb : public managed_send_buffer{
public:
libusb_zero_copy_msb(libusb_transfer *lut):
- _lut(lut), _expired(true) { /* NOP */ }
+ _ctx(libusb::session::get_global_session()->get_context()),
+ _lut(lut), _expired(false) { /* NOP */ }
void commit(size_t len){
if (_expired) return;
+ completed = false;
_lut->length = len;
- if(len == 0) libusb_async_tx_cb(_lut);
+ if (len == 0) libusb_async_cb(_lut);
else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
_expired = true;
}
- sptr get_new(void){
- _expired = false;
- return make_managed_buffer(this);
+ sptr get_new(const double timeout, size_t &index){
+ if (wait_for_completion(_ctx, timeout, completed)){
+ index++;
+ _expired = false;
+ return make_managed_buffer(this);
+ }
+ return managed_send_buffer::sptr();
}
+ bool completed;
+
private:
void *get_buff(void) const{return _lut->buffer;}
size_t get_size(void) const{return _lut->length;}
+ libusb_context *_ctx;
libusb_transfer *_lut;
bool _expired;
};
@@ -143,8 +176,8 @@ public:
_num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _pending_recv_buffs(_num_recv_frames),
- _pending_send_buffs(_num_send_frames)
+ _next_recv_buff_index(0),
+ _next_send_buff_index(0)
{
_handle->claim_interface(2 /*in interface*/);
_handle->claim_interface(1 /*out interface*/);
@@ -155,10 +188,7 @@ public:
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _mrb_pool.push_back(libusb_zero_copy_mrb(lut));
- _callbacks.push_back(boost::bind(
- &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back()
- ));
+ _mrb_pool.push_back(boost::shared_ptr<libusb_zero_copy_mrb>(new libusb_zero_copy_mrb(lut)));
libusb_fill_bulk_transfer(
lut, // transfer
@@ -166,13 +196,13 @@ public:
(recv_endpoint & 0x7f) | 0x80, // endpoint
static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer
this->get_recv_frame_size(), // length
- libusb_transfer_cb_fn(&libusb_async_rx_cb), // callback
- static_cast<void *>(&_callbacks.back()), // user_data
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void *>(&_mrb_pool.back()->completed), // user_data
0 // timeout (ms)
);
_all_luts.push_back(lut);
- _mrb_pool.back().get_new();
+ _mrb_pool.back()->release();
}
//allocate libusb transfer structs and managed send buffers
@@ -181,10 +211,7 @@ public:
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _msb_pool.push_back(libusb_zero_copy_msb(lut));
- _callbacks.push_back(boost::bind(
- &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back()
- ));
+ _msb_pool.push_back(boost::shared_ptr<libusb_zero_copy_msb>(new libusb_zero_copy_msb(lut)));
libusb_fill_bulk_transfer(
lut, // transfer
@@ -192,20 +219,14 @@ public:
(send_endpoint & 0x7f) | 0x00, // endpoint
static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer
this->get_send_frame_size(), // length
- libusb_transfer_cb_fn(&libusb_async_tx_cb), // callback
- static_cast<void *>(&_callbacks.back()), // user_data
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void *>(&_msb_pool.back()->completed), // user_data
0 // timeout
);
_all_luts.push_back(lut);
- libusb_async_tx_cb(lut);
+ _msb_pool.back()->commit(0);
}
-
- //spawn the event handler threads
- const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
- for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make(
- boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
- ));
}
~libusb_zero_copy_impl(void){
@@ -222,19 +243,13 @@ public:
}
managed_recv_buffer::sptr get_recv_buff(double timeout){
- libusb_zero_copy_mrb *mrb = NULL;
- if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
- return mrb->get_new();
- }
- return managed_recv_buffer::sptr();
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
}
managed_send_buffer::sptr get_send_buff(double timeout){
- libusb_zero_copy_msb *msb = NULL;
- if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
- return msb->get_new();
- }
- return managed_send_buffer::sptr();
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
}
size_t get_num_recv_frames(void) const { return _num_recv_frames; }
@@ -244,44 +259,19 @@ public:
size_t get_send_frame_size(void) const { return _send_frame_size; }
private:
- //! Handle a bound async callback for recv
- void handle_recv(libusb_zero_copy_mrb *mrb){
- _pending_recv_buffs.push_with_haste(mrb);
- }
-
- //! Handle a bound async callback for send
- void handle_send(libusb_zero_copy_msb *msb){
- _pending_send_buffs.push_with_haste(msb);
- }
-
libusb::device_handle::sptr _handle;
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
//! Storage for transfer related objects
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs;
- bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs;
- std::list<libusb_zero_copy_mrb> _mrb_pool;
- std::list<libusb_zero_copy_msb> _msb_pool;
- std::list<boost::function<void()> > _callbacks;
+ std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool;
+ std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
//! a list of all transfer structs we allocated
std::list<libusb_transfer *> _all_luts;
- //! event handler threads
- std::list<task::sptr> _event_loop_tasks;
-
- void run_event_loop(void){
- set_thread_priority_safe();
- libusb_context *context = libusb::session::get_global_session()->get_context();
- while (not boost::this_thread::interruption_requested()){
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 100000; //100ms
- libusb_handle_events_timeout(context, &tv);
- }
- }
};