diff options
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 160 |
1 files changed, 75 insertions, 85 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 0fa856d34..4259c42ed 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -17,13 +17,9 @@ #include "libusb1_base.hpp" #include <uhd/transport/usb_zero_copy.hpp> -#include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp> #include <uhd/utils/msg.hpp> -#include <uhd/utils/tasks.hpp> #include <uhd/exception.hpp> -#include <boost/function.hpp> #include <boost/foreach.hpp> #include <boost/thread/thread.hpp> #include <list> @@ -44,18 +40,9 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes * to ensure that they are compiled with the same calling convention as libusb. */ -//! helper function: handles all rx async callbacks -static void LIBUSB_CALL libusb_async_rx_cb(libusb_transfer *lut){ - if(lut->actual_length == 0) { - UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); //get out until you find some real data - return; - } - (*static_cast<boost::function<void()> *>(lut->user_data))(); -} - -//! helper function: handles all tx async callbacks -static void LIBUSB_CALL libusb_async_tx_cb(libusb_transfer *lut) { - (*static_cast<boost::function<void()> *>(lut->user_data))(); +//! helper function: handles all async callbacks +static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ + *(static_cast<bool *>(lut->user_data)) = true; } //! callback to free transfer upon cancellation @@ -64,6 +51,34 @@ static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){ else UHD_MSG(error) << "libusb cancel_transfer unexpected status " << lut->status << std::endl; } +/*! + * Wait for a managed buffer to become complete. + * + * This routine processes async events until the transaction completes. + * We must call the libusb handle events in a loop because the handler + * may complete managed buffers other than the one we are waiting on. + * + * We cannot determine if handle events timed out or processed an event. + * Therefore, the timeout condition is handled by using boost system time. + * + * \param ctx the libusb context structure + * \param timeout the wait timeout in seconds + * \param completed a reference to the completed flag + * \return true for completion, false for timeout + */ +UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){ + const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); + + while (not completed and (boost::get_system_time() < timeout_time)){ + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 10000; /*10ms*/ + libusb_handle_events_timeout(ctx, &tv); + } + + return completed; +} + /*********************************************************************** * Reusable managed receiver buffer: * - Associated with a particular libusb transfer struct. @@ -72,23 +87,32 @@ static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){ class libusb_zero_copy_mrb : public managed_recv_buffer{ public: libusb_zero_copy_mrb(libusb_transfer *lut): - _lut(lut), _expired(true) { /* NOP */ } + _ctx(libusb::session::get_global_session()->get_context()), + _lut(lut), _expired(false) { /* NOP */ } void release(void){ if (_expired) return; + completed = false; UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); _expired = true; } - sptr get_new(void){ - _expired = false; - return make_managed_buffer(this); + sptr get_new(const double timeout, size_t &index){ + if (wait_for_completion(_ctx, timeout, completed)){ + index++; + _expired = false; + return make_managed_buffer(this); + } + return managed_recv_buffer::sptr(); } + bool completed; + private: const void *get_buff(void) const{return _lut->buffer;} size_t get_size(void) const{return _lut->actual_length;} + libusb_context *_ctx; libusb_transfer *_lut; bool _expired; }; @@ -101,25 +125,34 @@ private: class libusb_zero_copy_msb : public managed_send_buffer{ public: libusb_zero_copy_msb(libusb_transfer *lut): - _lut(lut), _expired(true) { /* NOP */ } + _ctx(libusb::session::get_global_session()->get_context()), + _lut(lut), _expired(false) { /* NOP */ } void commit(size_t len){ if (_expired) return; + completed = false; _lut->length = len; - if(len == 0) libusb_async_tx_cb(_lut); + if (len == 0) libusb_async_cb(_lut); else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); _expired = true; } - sptr get_new(void){ - _expired = false; - return make_managed_buffer(this); + sptr get_new(const double timeout, size_t &index){ + if (wait_for_completion(_ctx, timeout, completed)){ + index++; + _expired = false; + return make_managed_buffer(this); + } + return managed_send_buffer::sptr(); } + bool completed; + private: void *get_buff(void) const{return _lut->buffer;} size_t get_size(void) const{return _lut->length;} + libusb_context *_ctx; libusb_transfer *_lut; bool _expired; }; @@ -143,8 +176,8 @@ public: _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))), _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _pending_recv_buffs(_num_recv_frames), - _pending_send_buffs(_num_send_frames) + _next_recv_buff_index(0), + _next_send_buff_index(0) { _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); @@ -155,10 +188,7 @@ public: libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); - _mrb_pool.push_back(libusb_zero_copy_mrb(lut)); - _callbacks.push_back(boost::bind( - &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back() - )); + _mrb_pool.push_back(boost::shared_ptr<libusb_zero_copy_mrb>(new libusb_zero_copy_mrb(lut))); libusb_fill_bulk_transfer( lut, // transfer @@ -166,13 +196,13 @@ public: (recv_endpoint & 0x7f) | 0x80, // endpoint static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer this->get_recv_frame_size(), // length - libusb_transfer_cb_fn(&libusb_async_rx_cb), // callback - static_cast<void *>(&_callbacks.back()), // user_data + libusb_transfer_cb_fn(&libusb_async_cb), // callback + static_cast<void *>(&_mrb_pool.back()->completed), // user_data 0 // timeout (ms) ); _all_luts.push_back(lut); - _mrb_pool.back().get_new(); + _mrb_pool.back()->release(); } //allocate libusb transfer structs and managed send buffers @@ -181,10 +211,7 @@ public: libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); - _msb_pool.push_back(libusb_zero_copy_msb(lut)); - _callbacks.push_back(boost::bind( - &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back() - )); + _msb_pool.push_back(boost::shared_ptr<libusb_zero_copy_msb>(new libusb_zero_copy_msb(lut))); libusb_fill_bulk_transfer( lut, // transfer @@ -192,20 +219,14 @@ public: (send_endpoint & 0x7f) | 0x00, // endpoint static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer this->get_send_frame_size(), // length - libusb_transfer_cb_fn(&libusb_async_tx_cb), // callback - static_cast<void *>(&_callbacks.back()), // user_data + libusb_transfer_cb_fn(&libusb_async_cb), // callback + static_cast<void *>(&_msb_pool.back()->completed), // user_data 0 // timeout ); _all_luts.push_back(lut); - libusb_async_tx_cb(lut); + _msb_pool.back()->commit(0); } - - //spawn the event handler threads - const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); - for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make( - boost::bind(&libusb_zero_copy_impl::run_event_loop, this) - )); } ~libusb_zero_copy_impl(void){ @@ -222,19 +243,13 @@ public: } managed_recv_buffer::sptr get_recv_buff(double timeout){ - libusb_zero_copy_mrb *mrb = NULL; - if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - return mrb->get_new(); - } - return managed_recv_buffer::sptr(); + if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; + return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); } managed_send_buffer::sptr get_send_buff(double timeout){ - libusb_zero_copy_msb *msb = NULL; - if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ - return msb->get_new(); - } - return managed_send_buffer::sptr(); + if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; + return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); } size_t get_num_recv_frames(void) const { return _num_recv_frames; } @@ -244,44 +259,19 @@ public: size_t get_send_frame_size(void) const { return _send_frame_size; } private: - //! Handle a bound async callback for recv - void handle_recv(libusb_zero_copy_mrb *mrb){ - _pending_recv_buffs.push_with_haste(mrb); - } - - //! Handle a bound async callback for send - void handle_send(libusb_zero_copy_msb *msb){ - _pending_send_buffs.push_with_haste(msb); - } - libusb::device_handle::sptr _handle; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; //! Storage for transfer related objects buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs; - bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs; - std::list<libusb_zero_copy_mrb> _mrb_pool; - std::list<libusb_zero_copy_msb> _msb_pool; - std::list<boost::function<void()> > _callbacks; + std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool; + std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool; + size_t _next_recv_buff_index, _next_send_buff_index; //! a list of all transfer structs we allocated std::list<libusb_transfer *> _all_luts; - //! event handler threads - std::list<task::sptr> _event_loop_tasks; - - void run_event_loop(void){ - set_thread_priority_safe(); - libusb_context *context = libusb::session::get_global_session()->get_context(); - while (not boost::this_thread::interruption_requested()){ - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 100000; //100ms - libusb_handle_events_timeout(context, &tv); - } - } }; |