diff options
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 160 | 
1 files changed, 75 insertions, 85 deletions
| diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 0fa856d34..4259c42ed 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -17,13 +17,9 @@  #include "libusb1_base.hpp"  #include <uhd/transport/usb_zero_copy.hpp> -#include <uhd/transport/bounded_buffer.hpp>  #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/msg.hpp> -#include <uhd/utils/tasks.hpp>  #include <uhd/exception.hpp> -#include <boost/function.hpp>  #include <boost/foreach.hpp>  #include <boost/thread/thread.hpp>  #include <list> @@ -44,18 +40,9 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes   * to ensure that they are compiled with the same calling convention as libusb.   */ -//! helper function: handles all rx async callbacks -static void LIBUSB_CALL libusb_async_rx_cb(libusb_transfer *lut){ -    if(lut->actual_length == 0) { -        UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); //get out until you find some real data -        return; -    } -    (*static_cast<boost::function<void()> *>(lut->user_data))(); -} - -//! helper function: handles all tx async callbacks -static void LIBUSB_CALL libusb_async_tx_cb(libusb_transfer *lut) { -    (*static_cast<boost::function<void()> *>(lut->user_data))(); +//! helper function: handles all async callbacks +static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ +    *(static_cast<bool *>(lut->user_data)) = true;  }  //! callback to free transfer upon cancellation @@ -64,6 +51,34 @@ static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){      else UHD_MSG(error) << "libusb cancel_transfer unexpected status " << lut->status << std::endl;  } +/*! + * Wait for a managed buffer to become complete. + * + * This routine processes async events until the transaction completes. + * We must call the libusb handle events in a loop because the handler + * may complete managed buffers other than the one we are waiting on. + * + * We cannot determine if handle events timed out or processed an event. + * Therefore, the timeout condition is handled by using boost system time. + * + * \param ctx the libusb context structure + * \param timeout the wait timeout in seconds + * \param completed a reference to the completed flag + * \return true for completion, false for timeout + */ +UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){ +    const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); + +    while (not completed and (boost::get_system_time() < timeout_time)){ +        timeval tv; +        tv.tv_sec = 0; +        tv.tv_usec = 10000; /*10ms*/ +        libusb_handle_events_timeout(ctx, &tv); +    } + +    return completed; +} +  /***********************************************************************   * Reusable managed receiver buffer:   *  - Associated with a particular libusb transfer struct. @@ -72,23 +87,32 @@ static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){  class libusb_zero_copy_mrb : public managed_recv_buffer{  public:      libusb_zero_copy_mrb(libusb_transfer *lut): -        _lut(lut), _expired(true) { /* NOP */ } +        _ctx(libusb::session::get_global_session()->get_context()), +        _lut(lut), _expired(false) { /* NOP */ }      void release(void){          if (_expired) return; +        completed = false;          UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);          _expired = true;      } -    sptr get_new(void){ -        _expired = false; -        return make_managed_buffer(this); +    sptr get_new(const double timeout, size_t &index){ +        if (wait_for_completion(_ctx, timeout, completed)){ +            index++; +            _expired = false; +            return make_managed_buffer(this); +        } +        return managed_recv_buffer::sptr();      } +    bool completed; +  private:      const void *get_buff(void) const{return _lut->buffer;}      size_t get_size(void) const{return _lut->actual_length;} +    libusb_context *_ctx;      libusb_transfer *_lut;      bool _expired;  }; @@ -101,25 +125,34 @@ private:  class libusb_zero_copy_msb : public managed_send_buffer{  public:      libusb_zero_copy_msb(libusb_transfer *lut): -        _lut(lut), _expired(true) { /* NOP */ } +        _ctx(libusb::session::get_global_session()->get_context()), +        _lut(lut), _expired(false) { /* NOP */ }      void commit(size_t len){          if (_expired) return; +        completed = false;          _lut->length = len; -        if(len == 0) libusb_async_tx_cb(_lut); +        if (len == 0) libusb_async_cb(_lut);          else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);          _expired = true;      } -    sptr get_new(void){ -        _expired = false; -        return make_managed_buffer(this); +    sptr get_new(const double timeout, size_t &index){ +        if (wait_for_completion(_ctx, timeout, completed)){ +            index++; +            _expired = false; +            return make_managed_buffer(this); +        } +        return managed_send_buffer::sptr();      } +    bool completed; +  private:      void *get_buff(void) const{return _lut->buffer;}      size_t get_size(void) const{return _lut->length;} +    libusb_context *_ctx;      libusb_transfer *_lut;      bool _expired;  }; @@ -143,8 +176,8 @@ public:          _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),          _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),          _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), -        _pending_recv_buffs(_num_recv_frames), -        _pending_send_buffs(_num_send_frames) +        _next_recv_buff_index(0), +        _next_send_buff_index(0)      {          _handle->claim_interface(2 /*in interface*/);          _handle->claim_interface(1 /*out interface*/); @@ -155,10 +188,7 @@ public:              libusb_transfer *lut = libusb_alloc_transfer(0);              UHD_ASSERT_THROW(lut != NULL); -            _mrb_pool.push_back(libusb_zero_copy_mrb(lut)); -            _callbacks.push_back(boost::bind( -                &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back() -            )); +            _mrb_pool.push_back(boost::shared_ptr<libusb_zero_copy_mrb>(new libusb_zero_copy_mrb(lut)));              libusb_fill_bulk_transfer(                  lut,                                                    // transfer @@ -166,13 +196,13 @@ public:                  (recv_endpoint & 0x7f) | 0x80,                          // endpoint                  static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer                  this->get_recv_frame_size(),                            // length -                libusb_transfer_cb_fn(&libusb_async_rx_cb),             // callback -                static_cast<void *>(&_callbacks.back()),                // user_data +                libusb_transfer_cb_fn(&libusb_async_cb),                // callback +                static_cast<void *>(&_mrb_pool.back()->completed),      // user_data                  0                                                       // timeout (ms)              );              _all_luts.push_back(lut); -            _mrb_pool.back().get_new(); +            _mrb_pool.back()->release();          }          //allocate libusb transfer structs and managed send buffers @@ -181,10 +211,7 @@ public:              libusb_transfer *lut = libusb_alloc_transfer(0);              UHD_ASSERT_THROW(lut != NULL); -            _msb_pool.push_back(libusb_zero_copy_msb(lut)); -            _callbacks.push_back(boost::bind( -                &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back() -            )); +            _msb_pool.push_back(boost::shared_ptr<libusb_zero_copy_msb>(new libusb_zero_copy_msb(lut)));              libusb_fill_bulk_transfer(                  lut,                                                    // transfer @@ -192,20 +219,14 @@ public:                  (send_endpoint & 0x7f) | 0x00,                          // endpoint                  static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer                  this->get_send_frame_size(),                            // length -                libusb_transfer_cb_fn(&libusb_async_tx_cb),             // callback -                static_cast<void *>(&_callbacks.back()),                // user_data +                libusb_transfer_cb_fn(&libusb_async_cb),                // callback +                static_cast<void *>(&_msb_pool.back()->completed),      // user_data                  0                                                       // timeout              );              _all_luts.push_back(lut); -            libusb_async_tx_cb(lut); +            _msb_pool.back()->commit(0);          } - -        //spawn the event handler threads -        const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); -        for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make( -            boost::bind(&libusb_zero_copy_impl::run_event_loop, this) -        ));      }      ~libusb_zero_copy_impl(void){ @@ -222,19 +243,13 @@ public:      }      managed_recv_buffer::sptr get_recv_buff(double timeout){ -        libusb_zero_copy_mrb *mrb = NULL; -        if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ -            return mrb->get_new(); -        } -        return managed_recv_buffer::sptr(); +        if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; +        return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);      }      managed_send_buffer::sptr get_send_buff(double timeout){ -        libusb_zero_copy_msb *msb = NULL; -        if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ -            return msb->get_new(); -        } -        return managed_send_buffer::sptr(); +        if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; +        return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);      }      size_t get_num_recv_frames(void) const { return _num_recv_frames; } @@ -244,44 +259,19 @@ public:      size_t get_send_frame_size(void) const { return _send_frame_size; }  private: -    //! Handle a bound async callback for recv -    void handle_recv(libusb_zero_copy_mrb *mrb){ -        _pending_recv_buffs.push_with_haste(mrb); -    } - -    //! Handle a bound async callback for send -    void handle_send(libusb_zero_copy_msb *msb){ -        _pending_send_buffs.push_with_haste(msb); -    } -      libusb::device_handle::sptr _handle;      const size_t _recv_frame_size, _num_recv_frames;      const size_t _send_frame_size, _num_send_frames;      //! Storage for transfer related objects      buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; -    bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs; -    bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs; -    std::list<libusb_zero_copy_mrb> _mrb_pool; -    std::list<libusb_zero_copy_msb> _msb_pool; -    std::list<boost::function<void()> > _callbacks; +    std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool; +    std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool; +    size_t _next_recv_buff_index, _next_send_buff_index;      //! a list of all transfer structs we allocated      std::list<libusb_transfer *> _all_luts; -    //! event handler threads -    std::list<task::sptr> _event_loop_tasks; - -    void run_event_loop(void){ -        set_thread_priority_safe(); -        libusb_context *context = libusb::session::get_global_session()->get_context(); -        while (not boost::this_thread::interruption_requested()){ -            timeval tv; -            tv.tv_sec = 0; -            tv.tv_usec = 100000; //100ms -            libusb_handle_events_timeout(context, &tv); -        } -    }  }; | 
