diff options
| -rw-r--r-- | host/lib/transport/libusb1_base.cpp | 16 | ||||
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 474 | 
2 files changed, 134 insertions, 356 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 5ff996642..26e864459 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -20,6 +20,7 @@  #include <uhd/types/dict.hpp>  #include <boost/weak_ptr.hpp>  #include <boost/foreach.hpp> +#include <boost/thread.hpp>  #include <iostream>  using namespace uhd::transport; @@ -32,9 +33,12 @@ public:      libusb_session_impl(void){          UHD_ASSERT_THROW(libusb_init(&_context) == 0);          libusb_set_debug(_context, debug_level); +        _thread_group.create_thread(boost::bind(&libusb_session_impl::run_event_loop, this));      }      ~libusb_session_impl(void){ +        _running = false; +        _thread_group.join_all();          libusb_exit(_context);      } @@ -44,6 +48,18 @@ public:  private:      libusb_context *_context; +    boost::thread_group _thread_group; +    bool _running; + +    void run_event_loop(void){ +        _running = true; +        timeval tv; +        while(_running){ +            tv.tv_sec = 0; +            tv.tv_usec = 100000; //100ms +            libusb_handle_events_timeout(this->get_context(), &tv); +        } +    }  };  libusb::session::sptr libusb::session::get_global_session(void){ diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 41cd4b3fc..e84793e88 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -17,8 +17,11 @@  #include "libusb1_base.hpp"  #include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/transport/bounded_buffer.hpp>  #include <uhd/utils/assert.hpp> -#include <boost/format.hpp> +#include <boost/shared_array.hpp> +#include <boost/foreach.hpp> +#include <vector>  #include <iostream>  #include <iomanip> @@ -55,52 +58,57 @@ void pp_transfer(libusb_transfer *lut)   *   interface provided by the kernel.    **********************************************************************/  class usb_endpoint { +public: +    typedef boost::shared_ptr<usb_endpoint> sptr; + +    usb_endpoint( +        libusb::device_handle::sptr handle, +        int endpoint, +        bool input, +        size_t transfer_size, +        size_t num_transfers +    ); + +    ~usb_endpoint(void); + +    // Exposed interface for submitting / retrieving transfer buffers + +    //! Submit a new transfer that was presumably just filled or emptied. +    void submit(libusb_transfer *lut); + +    /*! +     * Get an available transfer: +     * For inputs, this is a just filled transfer. +     * For outputs, this is a just emptied transfer. +     * \param timeout_ms the timeout to wait for a lut +     * \return the transfer pointer or NULL if timeout +     */ +    libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); + +    //Callback use only +    void callback_handle_transfer(libusb_transfer *lut); +  private:      libusb::device_handle::sptr _handle; -    libusb::session::sptr _session;      int  _endpoint;      bool _input;      size_t _transfer_size;      size_t _num_transfers; -    // Transfer state lists (transfers are free, pending, or completed) -    std::list<libusb_transfer *>  _free_list; -    std::list<libusb_transfer *>  _pending_list; -    std::list<libusb_transfer *>  _completed_list; +    //! hold a bounded buffer of completed transfers +    typedef bounded_buffer<libusb_transfer *> lut_buff_type; +    lut_buff_type::sptr _completed_list; + +    //! a list of all transfer structs we allocated +    std::vector<libusb_transfer *>  _all_luts; + +    //! a list of shared arrays for the transfer buffers +    std::vector<boost::shared_array<boost::uint8_t> > _buffers;      // Calls for processing asynchronous I/O       libusb_transfer *allocate_transfer(int buff_len); -    bool cancel(libusb_transfer *lut); -    bool cancel_all(); -    bool reap_pending_list(); -    bool reap_pending_list_timeout(); -    bool reap_completed_list(); - -    // Transfer state manipulators  -    void free_list_add(libusb_transfer *lut); -    void pending_list_add(libusb_transfer *lut); -    void completed_list_add(libusb_transfer *lut); -    libusb_transfer *free_list_get(); -    libusb_transfer *completed_list_get(); -    bool pending_list_remove(libusb_transfer *lut); - -    // Debug use      void print_transfer_status(libusb_transfer *lut); - -public: -    usb_endpoint(libusb::device_handle::sptr handle, libusb::session::sptr sess, -                 int endpoint, bool input, size_t transfer_size, size_t num_transfers); - -    ~usb_endpoint(); - -    // Exposed interface for submitting / retrieving transfer buffers -    bool submit(libusb_transfer *lut); -    libusb_transfer *get_completed_transfer(); -    libusb_transfer *get_free_transfer(); - -    //Callback use only -    void callback_handle_transfer(libusb_transfer *lut);  }; @@ -113,9 +121,8 @@ public:   * it from the pending to completed status list.   * \param lut pointer to libusb_transfer   */ -static void callback(libusb_transfer *lut) -{ -    usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;  +static void callback(libusb_transfer *lut){ +    usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;      endpoint->callback_handle_transfer(lut);  } @@ -124,14 +131,8 @@ static void callback(libusb_transfer *lut)   * Accessor call to allow list access from callback space   * \param pointer to libusb_transfer   */ -void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) -{ -    if (!pending_list_remove(lut)) { -        std::cerr << "USB: pending remove failed" << std::endl; -        return; -    } - -    completed_list_add(lut);     +void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ +    _completed_list->push_with_wait(lut);  } @@ -142,18 +143,27 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut)   * data is available.    */  usb_endpoint::usb_endpoint( -    libusb::device_handle::sptr handle, libusb::session::sptr sess, -    int endpoint, bool input, size_t transfer_size, size_t num_transfers) -    : _handle(handle), _session(sess), -      _endpoint(endpoint), _input(input), -      _transfer_size(transfer_size), _num_transfers(num_transfers) +    libusb::device_handle::sptr handle, +    int endpoint, +    bool input, +    size_t transfer_size, +    size_t num_transfers +): +    _handle(handle), +    _endpoint(endpoint), +    _input(input), +    _transfer_size(transfer_size), +    _num_transfers(num_transfers)  { -    unsigned int i; -    for (i = 0; i < _num_transfers; i++) { -        free_list_add(allocate_transfer(_transfer_size)); +    _completed_list = lut_buff_type::make(num_transfers); -        if (_input) -            submit(free_list_get()); +    for (size_t i = 0; i < _num_transfers; i++){ +        _all_luts.push_back(allocate_transfer(_transfer_size)); + +        //input luts are immediately submitted to be filled +        //output luts go into the completed list as free buffers +        if (_input) this->submit(_all_luts.back()); +        else _completed_list->push_with_wait(_all_luts.back());      }  } @@ -165,22 +175,21 @@ usb_endpoint::usb_endpoint(   * the transfers. Libusb will deallocate the data buffer held by   * each transfer.   */ -usb_endpoint::~usb_endpoint() -{ -    cancel_all(); - -    while (!_pending_list.empty()) { -        if (!reap_pending_list()) -            std::cerr << "error: destructor failed to reap" << std::endl; +usb_endpoint::~usb_endpoint(void){ +    //cancel all transfers +    BOOST_FOREACH(libusb_transfer *lut, _all_luts){ +        libusb_cancel_transfer(lut);      } -    while (!_completed_list.empty()) { -        if (!reap_completed_list()) -            std::cerr << "error: destructor failed to reap" << std::endl; -    } +    //collect canceled transfers (drain the queue) +    libusb_transfer *lut; +    while(_completed_list->pop_with_timed_wait( +        lut, boost::posix_time::milliseconds(100) +    )); -    while (!_free_list.empty()) { -        libusb_free_transfer(free_list_get()); +    //free all transfers +    BOOST_FOREACH(libusb_transfer *lut, _all_luts){ +        libusb_free_transfer(lut);      }  } @@ -192,20 +201,21 @@ usb_endpoint::~usb_endpoint()   * \param buff_len size of the individual buffer held by each transfer   * \return pointer to an allocated libusb_transfer   */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) -{ +libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){      libusb_transfer *lut = libusb_alloc_transfer(0); -    unsigned char *buff = new unsigned char[buff_len]; +    boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]); +    _buffers.push_back(buff); //store a reference to this shared array      unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); +    libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);      libusb_fill_bulk_transfer(lut,                // transfer                                _handle->get(),     // dev_handle                                endpoint,           // endpoint -                              buff,               // buffer +                              buff.get(),         // buffer                                buff_len,           // length -       libusb_transfer_cb_fn(callback),           // callback +                              lut_callback,       // callback                                this,               // user_data                                0);                 // timeout      return lut; @@ -218,95 +228,15 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len)   * \param lut pointer to libusb_transfer   * \return true on success or false on error    */ -bool usb_endpoint::submit(libusb_transfer *lut) -{ -    int retval; -    if ((retval = libusb_submit_transfer(lut)) < 0) { -        std::cerr << "error: libusb_submit_transfer: " << retval << std::endl; -        return false; -    } - -    pending_list_add(lut); -    return true; -} - - -/* - * Cancel a pending transfer  - * Search the pending list for the transfer and cancel if found. - * \param lut pointer to libusb_transfer to cancel - * \return true on success or false if transfer is not found - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel(libusb_transfer *lut) -{ -    std::list<libusb_transfer*>::iterator iter; -    for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { -        if (*iter == lut) {  -            libusb_cancel_transfer(lut);  -            return true; -        } -    } -    return false; -} - - -/* - * Cancel all pending transfers  - * \return bool true if cancelation request is submitted - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel_all() -{ -    std::list<libusb_transfer*>::iterator iter; - -    for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { -        if (libusb_cancel_transfer(*iter) < 0) { -            std::cerr << "error: libusb_cancal_transfer() failed" << std::endl; -            return false; -        } -    } - -    return true; -} - - -/* - * Reap completed transfers - * return true if at least one transfer was reaped, false otherwise.  - * Check completed transfers for errors and mark as free. This is a - * blocking call.  - * \return bool true if a libusb transfer is reaped, false otherwise - */ -bool usb_endpoint::reap_completed_list() -{ -    libusb_transfer *lut; - -    if (_completed_list.empty()) { -        if (!reap_pending_list_timeout()) -            return false; -    } - -    while (!_completed_list.empty()) { -        lut = completed_list_get(); -        print_transfer_status(lut); -        free_list_add(lut); -    } - -    return true; +void usb_endpoint::submit(libusb_transfer *lut){ +    UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0);  } -  /*   * Print status errors of a completed transfer   * \param lut pointer to an libusb_transfer   */ -void usb_endpoint::print_transfer_status(libusb_transfer *lut) -{ +void usb_endpoint::print_transfer_status(libusb_transfer *lut){      switch (lut->status) {      case LIBUSB_TRANSFER_COMPLETED:          if (lut->actual_length < lut->length) { @@ -342,166 +272,14 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut)      }  } - -/* - * Reap pending transfers without timeout  - * This is a blocking call. Reaping submitted transfers is - * handled by libusb and the assigned callback function. - * Block until at least one transfer is reaped. - * \return true true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list() -{ -    int retval; - -    if ((retval = libusb_handle_events(_session->get_context())) < 0) { -        std::cerr << "error: libusb_handle_events: " << retval << std::endl; -        return false; -    } - -    return true; -} - - -/* - * Reap pending transfers with timeout  - * This call blocks until a transfer is reaped or timeout. - * Reaping submitted transfers is handled by libusb and the - * assigned callback function. Block until at least one - * transfer is reaped or timeout occurs. - * \return true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list_timeout() -{ -    int retval; -    timeval tv; - -    tv.tv_sec = 0; -    tv.tv_usec = 100000; //100ms - -    size_t pending_list_size = _pending_list.size(); - -    if ((retval = libusb_handle_events_timeout(_session->get_context(), &tv)) < 0) { -        std::cerr << "error: libusb_handle_events: " << retval << std::endl; -        return false; -    } - -    if (_pending_list.size() < pending_list_size) { -        return true; -    } -    else { -        return false; -    } -} - - -/* - * Get a free transfer - * The transfer has an empty data bufer for OUT requests  - * \return pointer to a libusb_transfer - */ -libusb_transfer *usb_endpoint::get_free_transfer() -{ -    if (_free_list.empty()) { -        if (!reap_completed_list()) -            return NULL;  -    } - -    return free_list_get(); -} - - -/* - * Get a completed transfer  - * The transfer has a full data buffer for IN requests - * \return pointer to libusb_transfer - */ -libusb_transfer *usb_endpoint::get_completed_transfer() -{ -    if (_completed_list.empty()) { -        if (!reap_pending_list_timeout()) -            return NULL;  -    } - -    return completed_list_get(); -} - -/* - * List operations  - */ -void usb_endpoint::free_list_add(libusb_transfer *lut) -{ -    _free_list.push_back(lut); -} - -void usb_endpoint::pending_list_add(libusb_transfer *lut) -{ -    _pending_list.push_back(lut); -} - -void usb_endpoint::completed_list_add(libusb_transfer *lut) -{ -    _completed_list.push_back(lut); -} - - -/* - * Free and completed lists don't have ordered content  - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::free_list_get() -{ -    libusb_transfer *lut; - -    if (_free_list.size() == 0) { -        return NULL;  -    } -    else {  -        lut = _free_list.front(); -        _free_list.pop_front(); -        return lut; -    } -} - - -/* - * Free and completed lists don't have ordered content  - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::completed_list_get() -{ +libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){      libusb_transfer *lut; - -    if (_completed_list.empty()) { -        return NULL; -    } -    else {  -        lut = _completed_list.front(); -        _completed_list.pop_front(); -        return lut; -    } +    if (_completed_list->pop_with_timed_wait( +        lut, boost::posix_time::milliseconds(timeout_ms) +    )) return lut; +    return NULL;  } - -/* - * Search and remove transfer from pending list - * Assuming that the callbacks occur in order, the front element - * should yield the correct transfer. If not, then something else - * is going on. If no transfers match, then something went wrong. - */ -bool usb_endpoint::pending_list_remove(libusb_transfer *lut) -{ -    std::list<libusb_transfer*>::iterator iter; -    for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { -        if (*iter == lut) {  -            _pending_list.erase(iter); -            return true; -        } -    } -    return false; -} - -  /***********************************************************************   * Managed buffers    **********************************************************************/ @@ -515,17 +293,15 @@ bool usb_endpoint::pending_list_remove(libusb_transfer *lut)  class libusb_managed_recv_buffer_impl : public managed_recv_buffer {  public:      libusb_managed_recv_buffer_impl(libusb_transfer *lut, -                                    usb_endpoint *endpoint) +                                    usb_endpoint::sptr endpoint)          : _buff(lut->buffer, lut->length)      {          _lut = lut;          _endpoint = endpoint;      } -    ~libusb_managed_recv_buffer_impl() -    { -       if (!_endpoint->submit(_lut)) -           std::cerr << "USB: failed to submit IN transfer" << std::endl; +    ~libusb_managed_recv_buffer_impl(void){ +        _endpoint->submit(_lut);      }  private: @@ -535,7 +311,7 @@ private:      }      libusb_transfer *_lut; -    usb_endpoint *_endpoint; +    usb_endpoint::sptr _endpoint;      const boost::asio::const_buffer _buff;  }; @@ -551,7 +327,7 @@ private:  class libusb_managed_send_buffer_impl : public managed_send_buffer {  public:      libusb_managed_send_buffer_impl(libusb_transfer *lut, -                                    usb_endpoint *endpoint, +                                    usb_endpoint::sptr endpoint,                                      size_t buff_size)          : _buff(lut->buffer, buff_size), _committed(false)      { @@ -559,8 +335,7 @@ public:          _endpoint = endpoint;      } -    ~libusb_managed_send_buffer_impl() -    { +    ~libusb_managed_send_buffer_impl(void){          if (!_committed) {              _lut->length = 0;              _lut->actual_length = 0; @@ -580,12 +355,14 @@ public:          _lut->length = num_bytes;          _lut->actual_length = 0; -        if (_endpoint->submit(_lut)) { +        try{ +            _endpoint->submit(_lut);              _committed = true;              return num_bytes;          } -        else { -            return 0; +        catch(const std::exception &e){ +            std::cerr << "Error in commit: " << e.what() << std::endl; +            return -1;          }      } @@ -596,7 +373,7 @@ private:      }      libusb_transfer *_lut; -    usb_endpoint *_endpoint; +    usb_endpoint::sptr _endpoint;      const boost::asio::mutable_buffer _buff;      bool _committed;  }; @@ -608,11 +385,9 @@ private:  class libusb_zero_copy_impl : public usb_zero_copy  {  private: -    usb_endpoint          *_rx_ep; -    usb_endpoint          *_tx_ep; +    usb_endpoint::sptr _rx_ep, _tx_ep;      libusb::device_handle::sptr _handle; -    libusb::session::sptr _session;      size_t _recv_buff_size;      size_t _send_buff_size; @@ -626,8 +401,6 @@ public:                            unsigned int tx_endpoint,                            size_t recv_buff_size,                            size_t send_buff_size); -     -    ~libusb_zero_copy_impl();      managed_recv_buffer::sptr get_recv_buff(void);      managed_send_buffer::sptr get_send_buff(void); @@ -646,45 +419,38 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle,                                               unsigned int tx_endpoint,                                               size_t buff_size,                                               size_t block_size) - : _handle(handle), _session(libusb::session::get_global_session()), + : _handle(handle),     _recv_buff_size(block_size), _send_buff_size(block_size),     _num_frames(buff_size / block_size)  {      _handle->claim_interface(2 /*in interface*/);      _handle->claim_interface(1 /*out interface*/); -    _rx_ep = new usb_endpoint(_handle,         // libusb device_handle -                              _session,        // libusb session w/ context +    _rx_ep = usb_endpoint::sptr(new usb_endpoint( +                              _handle,         // libusb device_handle                                rx_endpoint,     // USB endpoint number                                true,            // IN endpoint                                _recv_buff_size, // buffer size per transfer  -                              _num_frames);    // number of libusb transfers +                              _num_frames      // number of libusb transfers +    )); -    _tx_ep = new usb_endpoint(_handle,         // libusb device_handle -                              _session,        // libusb session w/ context +    _tx_ep = usb_endpoint::sptr(new usb_endpoint( +                              _handle,         // libusb device_handle                                tx_endpoint,     // USB endpoint number                                false,           // OUT endpoint                                _send_buff_size, // buffer size per transfer -                              _num_frames);    // number of libusb transfers -} - - -libusb_zero_copy_impl::~libusb_zero_copy_impl() -{ -    delete _rx_ep; -    delete _tx_ep; +                              _num_frames      // number of libusb transfers +    ));  } -  /*   * Construct a managed receive buffer from a completed libusb transfer   * (happy with buffer full of data) obtained from the receive endpoint.   * Return empty pointer if no transfer is available (timeout or error).   * \return pointer to a managed receive buffer    */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff() -{ -    libusb_transfer *lut = _rx_ep->get_completed_transfer(); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){ +    libusb_transfer *lut = _rx_ep->get_lut_with_wait(/* TODO timeout API */);      if (lut == NULL) {          return managed_recv_buffer::sptr();      } @@ -702,9 +468,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff()   * (timeout or error).   * \return pointer to a managed send buffer    */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff() -{ -    libusb_transfer *lut = _tx_ep->get_free_transfer(); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ +    libusb_transfer *lut = _tx_ep->get_lut_with_wait(/* TODO timeout API */);      if (lut == NULL) {          return managed_send_buffer::sptr();      } @@ -736,6 +501,3 @@ usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle,                                            buff_size,                                             block_size));  } - - -  | 
