diff options
Diffstat (limited to 'host/lib/transport/libusb1_zero_copy.cpp')
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 745 | 
1 files changed, 210 insertions, 535 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index b890a87f9..f589d7c77 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -17,16 +17,22 @@  #include "libusb1_base.hpp"  #include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp> -#include <boost/asio.hpp> -#include <boost/format.hpp> +#include <boost/shared_array.hpp> +#include <boost/foreach.hpp> +#include <boost/thread.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <vector>  #include <iostream> -#include <iomanip> +using namespace uhd;  using namespace uhd::transport; -const int libusb_debug_level = 0; -const int libusb_timeout = 0; +static const double CLEANUP_TIMEOUT   = 0.2;    //seconds +static const size_t DEFAULT_NUM_XFERS = 16;     //num xfers +static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes  /***********************************************************************   * Helper functions @@ -54,56 +60,57 @@ void pp_transfer(libusb_transfer *lut)   *   create a bidirectional interface. It is a zero copy implementation   *   with respect to libusb, however, each send and recv requires a copy   *   operation from kernel to userspace; this is due to the usbfs - *   interface provided by the kernel.  + *   interface provided by the kernel.   **********************************************************************/  class usb_endpoint { -private: -    libusb_device_handle *_dev_handle; -    libusb_context *_ctx; -    int  _endpoint; -    bool _input; - -    size_t _transfer_size; -    size_t _num_transfers; - -    // Transfer state lists (transfers are free, pending, or completed) -    std::list<libusb_transfer *>  _free_list; -    std::list<libusb_transfer *>  _pending_list; -    std::list<libusb_transfer *>  _completed_list; - -    // Calls for processing asynchronous I/O  -    libusb_transfer *allocate_transfer(int buff_len); -    bool cancel(libusb_transfer *lut); -    bool cancel_all(); -    bool reap_pending_list(); -    bool reap_pending_list_timeout(); -    bool reap_completed_list(); - -    // Transfer state manipulators  -    void free_list_add(libusb_transfer *lut); -    void pending_list_add(libusb_transfer *lut); -    void completed_list_add(libusb_transfer *lut); -    libusb_transfer *free_list_get(); -    libusb_transfer *completed_list_get(); -    bool pending_list_remove(libusb_transfer *lut); - -    // Debug use -    void print_transfer_status(libusb_transfer *lut); -  public: -    usb_endpoint(libusb_device_handle *dev_handle, -                 libusb_context *ctx, int endpoint, bool input, -                 size_t transfer_size, size_t num_transfers); +    typedef boost::shared_ptr<usb_endpoint> sptr; -    ~usb_endpoint(); +    usb_endpoint( +        libusb::device_handle::sptr handle, +        int endpoint, +        bool input, +        size_t transfer_size, +        size_t num_transfers +    ); + +    ~usb_endpoint(void);      // Exposed interface for submitting / retrieving transfer buffers -    bool submit(libusb_transfer *lut); -    libusb_transfer *get_completed_transfer(); -    libusb_transfer *get_free_transfer(); + +    //! Submit a new transfer that was presumably just filled or emptied. +    void submit(libusb_transfer *lut); + +    /*! +     * Get an available transfer: +     * For inputs, this is a just filled transfer. +     * For outputs, this is a just emptied transfer. +     * \param timeout the timeout to wait for a lut +     * \return the transfer pointer or NULL if timeout +     */ +    libusb_transfer *get_lut_with_wait(double timeout);      //Callback use only      void callback_handle_transfer(libusb_transfer *lut); + +private: +    libusb::device_handle::sptr _handle; +    int  _endpoint; +    bool _input; + +    //! hold a bounded buffer of completed transfers +    typedef bounded_buffer<libusb_transfer *> lut_buff_type; +    lut_buff_type::sptr _completed_list; + +    //! a list of all transfer structs we allocated +    std::vector<libusb_transfer *> _all_luts; + +    //! a block of memory for the transfer buffers +    boost::shared_array<char> _buffer; + +    // Calls for processing asynchronous I/O +    libusb_transfer *allocate_transfer(void *mem, size_t len); +    void print_transfer_status(libusb_transfer *lut);  }; @@ -116,9 +123,8 @@ public:   * it from the pending to completed status list.   * \param lut pointer to libusb_transfer   */ -static void callback(libusb_transfer *lut) -{ -    usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;  +static void callback(libusb_transfer *lut){ +    usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;      endpoint->callback_handle_transfer(lut);  } @@ -127,14 +133,9 @@ static void callback(libusb_transfer *lut)   * Accessor call to allow list access from callback space   * \param pointer to libusb_transfer   */ -void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) -{ -    if (!pending_list_remove(lut)) { -        std::cerr << "USB: pending remove failed" << std::endl; -        return; -    } - -    completed_list_add(lut);     +void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ +    boost::this_thread::disable_interruption di; //disable because the wait can throw +    _completed_list->push_with_wait(lut);  } @@ -142,21 +143,28 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut)   * Constructor   * Allocate libusb transfers and mark as free.  For IN endpoints,   * submit the transfers so that they're ready to return when - * data is available.  + * data is available.   */ -usb_endpoint::usb_endpoint(libusb_device_handle *dev_handle, -                          libusb_context *ctx, int endpoint, bool input, -                          size_t transfer_size, size_t num_transfers) -    : _dev_handle(dev_handle), -      _ctx(ctx), _endpoint(endpoint), _input(input), -      _transfer_size(transfer_size), _num_transfers(num_transfers) +usb_endpoint::usb_endpoint( +    libusb::device_handle::sptr handle, +    int endpoint, +    bool input, +    size_t transfer_size, +    size_t num_transfers +): +    _handle(handle), +    _endpoint(endpoint), +    _input(input)  { -    unsigned int i; -    for (i = 0; i < _num_transfers; i++) { -        free_list_add(allocate_transfer(_transfer_size)); +    _completed_list = lut_buff_type::make(num_transfers); +    _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]); +    for (size_t i = 0; i < num_transfers; i++){ +        _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size)); -        if (_input) -            submit(free_list_get()); +        //input luts are immediately submitted to be filled +        //output luts go into the completed list as free buffers +        if (_input) this->submit(_all_luts.back()); +        else _completed_list->push_with_wait(_all_luts.back());      }  } @@ -168,47 +176,44 @@ usb_endpoint::usb_endpoint(libusb_device_handle *dev_handle,   * the transfers. Libusb will deallocate the data buffer held by   * each transfer.   */ -usb_endpoint::~usb_endpoint() -{ -    cancel_all(); - -    while (!_pending_list.empty()) { -        if (!reap_pending_list()) -            std::cerr << "error: destructor failed to reap" << std::endl; +usb_endpoint::~usb_endpoint(void){ +    //cancel all transfers +    BOOST_FOREACH(libusb_transfer *lut, _all_luts){ +        libusb_cancel_transfer(lut);      } -    while (!_completed_list.empty()) { -        if (!reap_completed_list()) -            std::cerr << "error: destructor failed to reap" << std::endl; -    } +    //collect canceled transfers (drain the queue) +    while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){}; -    while (!_free_list.empty()) { -        libusb_free_transfer(free_list_get()); +    //free all transfers +    BOOST_FOREACH(libusb_transfer *lut, _all_luts){ +        libusb_free_transfer(lut);      }  }  /* - * Allocate a libusb transfer  + * Allocate a libusb transfer   * The allocated transfer - and buffer it contains - is repeatedly   * submitted, reaped, and reused and should not be freed until shutdown. - * \param buff_len size of the individual buffer held by each transfer + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer   * \return pointer to an allocated libusb_transfer   */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) -{ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){      libusb_transfer *lut = libusb_alloc_transfer(0); - -    unsigned char *buff = new unsigned char[buff_len]; +    UHD_ASSERT_THROW(lut != NULL);      unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); +    unsigned char *buff = reinterpret_cast<unsigned char *>(mem); +    libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);      libusb_fill_bulk_transfer(lut,                // transfer -                              _dev_handle,        // dev_handle +                              _handle->get(),     // dev_handle                                endpoint,           // endpoint                                buff,               // buffer -                              buff_len,           // length -                              callback,           // callback +                              len,                // length +                              lut_callback,       // callback                                this,               // user_data                                0);                 // timeout      return lut; @@ -219,97 +224,18 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len)   * Asynchonous transfer submission   * Submit a libusb transfer to libusb add pending status   * \param lut pointer to libusb_transfer - * \return true on success or false on error  - */ -bool usb_endpoint::submit(libusb_transfer *lut) -{ -    int retval; -    if ((retval = libusb_submit_transfer(lut)) < 0) { -        std::cerr << "error: libusb_submit_transfer: " << retval << std::endl; -        return false; -    } - -    pending_list_add(lut); -    return true; -} - - -/* - * Cancel a pending transfer  - * Search the pending list for the transfer and cancel if found. - * \param lut pointer to libusb_transfer to cancel - * \return true on success or false if transfer is not found - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. + * \return true on success or false on error   */ -bool usb_endpoint::cancel(libusb_transfer *lut) -{ -    std::list<libusb_transfer*>::iterator iter; -    for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { -        if (*iter == lut) {  -            libusb_cancel_transfer(lut);  -            return true; -        } -    } -    return false; +void usb_endpoint::submit(libusb_transfer *lut){ +    UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0);  } - -/* - * Cancel all pending transfers  - * \return bool true if cancelation request is submitted - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel_all() -{ -    std::list<libusb_transfer*>::iterator iter; - -    for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { -        if (libusb_cancel_transfer(*iter) < 0) { -            std::cerr << "error: libusb_cancal_transfer() failed" << std::endl; -            return false; -        } -    } - -    return true; -} - - -/* - * Reap completed transfers - * return true if at least one transfer was reaped, false otherwise.  - * Check completed transfers for errors and mark as free. This is a - * blocking call.  - * \return bool true if a libusb transfer is reaped, false otherwise - */ -bool usb_endpoint::reap_completed_list() -{ -    libusb_transfer *lut; - -    if (_completed_list.empty()) { -        if (!reap_pending_list_timeout()) -            return false; -    } - -    while (!_completed_list.empty()) { -        lut = completed_list_get(); -        print_transfer_status(lut); -        free_list_add(lut); -    } - -    return true; -} - -  /*   * Print status errors of a completed transfer   * \param lut pointer to an libusb_transfer   */ -void usb_endpoint::print_transfer_status(libusb_transfer *lut) -{ +void usb_endpoint::print_transfer_status(libusb_transfer *lut){ +    std::cout << "here " << lut->status << std::endl;      switch (lut->status) {      case LIBUSB_TRANSFER_COMPLETED:          if (lut->actual_length < lut->length) { @@ -345,382 +271,136 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut)      }  } - -/* - * Reap pending transfers without timeout  - * This is a blocking call. Reaping submitted transfers is - * handled by libusb and the assigned callback function. - * Block until at least one transfer is reaped. - * \return true true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list() -{ -    int retval; - -    if ((retval = libusb_handle_events(_ctx)) < 0) { -        std::cerr << "error: libusb_handle_events: " << retval << std::endl; -        return false; -    } - -    return true; -} - - -/* - * Reap pending transfers with timeout  - * This call blocks until a transfer is reaped or timeout. - * Reaping submitted transfers is handled by libusb and the - * assigned callback function. Block until at least one - * transfer is reaped or timeout occurs. - * \return true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list_timeout() -{ -    int retval; -    timeval tv; - -    tv.tv_sec = 0; -    tv.tv_usec = 100000; //100ms - -    size_t pending_list_size = _pending_list.size(); - -    if ((retval = libusb_handle_events_timeout(_ctx, &tv)) < 0) { -        std::cerr << "error: libusb_handle_events: " << retval << std::endl; -        return false; -    } - -    if (_pending_list.size() < pending_list_size) { -        return true; -    } -    else { -        return false; -    } -} - - -/* - * Get a free transfer - * The transfer has an empty data bufer for OUT requests  - * \return pointer to a libusb_transfer - */ -libusb_transfer *usb_endpoint::get_free_transfer() -{ -    if (_free_list.empty()) { -        if (!reap_completed_list()) -            return NULL;  -    } - -    return free_list_get(); -} - - -/* - * Get a completed transfer  - * The transfer has a full data buffer for IN requests - * \return pointer to libusb_transfer - */ -libusb_transfer *usb_endpoint::get_completed_transfer() -{ -    if (_completed_list.empty()) { -        if (!reap_pending_list_timeout()) -            return NULL;  -    } - -    return completed_list_get(); -} - -/* - * List operations  - */ -void usb_endpoint::free_list_add(libusb_transfer *lut) -{ -    _free_list.push_back(lut); -} - -void usb_endpoint::pending_list_add(libusb_transfer *lut) -{ -    _pending_list.push_back(lut); -} - -void usb_endpoint::completed_list_add(libusb_transfer *lut) -{ -    _completed_list.push_back(lut); -} - - -/* - * Free and completed lists don't have ordered content  - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::free_list_get() -{ +libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ +    boost::this_thread::disable_interruption di; //disable because the wait can throw      libusb_transfer *lut; - -    if (_free_list.size() == 0) { -        return NULL;  -    } -    else {  -        lut = _free_list.front(); -        _free_list.pop_front(); -        return lut; -    } +    if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; +    return NULL;  } - -/* - * Free and completed lists don't have ordered content  - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::completed_list_get() -{ -    libusb_transfer *lut; - -    if (_completed_list.empty()) { -        return NULL; -    } -    else {  -        lut = _completed_list.front(); -        _completed_list.pop_front(); -        return lut; -    } -} - - -/* - * Search and remove transfer from pending list - * Assuming that the callbacks occur in order, the front element - * should yield the correct transfer. If not, then something else - * is going on. If no transfers match, then something went wrong. - */ -bool usb_endpoint::pending_list_remove(libusb_transfer *lut) -{ -    std::list<libusb_transfer*>::iterator iter; -    for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { -        if (*iter == lut) {  -            _pending_list.erase(iter); -            return true; -        } -    } -    return false; -} - -  /*********************************************************************** - * Managed buffers  + * USB zero_copy device class   **********************************************************************/ -/* - * Libusb managed receive buffer - * Construct a recv buffer from a libusb transfer. The memory held by - * the libusb transfer is exposed through the managed buffer interface. - * Upon destruction, the transfer and buffer are resubmitted to the - * endpoint for further use.  - */ -class libusb_managed_recv_buffer_impl : public managed_recv_buffer { +class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> {  public: -    libusb_managed_recv_buffer_impl(libusb_transfer *lut, -                                    usb_endpoint *endpoint) -        : _buff(lut->buffer, lut->length) -    { -        _lut = lut; -        _endpoint = endpoint; -    } -    ~libusb_managed_recv_buffer_impl() -    { -       if (!_endpoint->submit(_lut)) -           std::cerr << "USB: failed to submit IN transfer" << std::endl; -    } +    libusb_zero_copy_impl( +        libusb::device_handle::sptr handle, +        size_t recv_endpoint, +        size_t send_endpoint, +        const device_addr_t &hints +    ); -private: -    const boost::asio::const_buffer &get() const -    { -        return _buff;  +    ~libusb_zero_copy_impl(void){ +        _threads_running = false; +        _thread_group.join_all();      } -    libusb_transfer *_lut; -    usb_endpoint *_endpoint; -    const boost::asio::const_buffer _buff; -}; - -/* - * Libusb managed send buffer - * Construct a send buffer from a libusb transfer. The memory held by - * the libusb transfer is exposed through the managed buffer interface. - * Committing the buffer will set the data length and submit the buffer - * to the endpoint. Submitting a buffer multiple times or destroying - * the buffer before committing is an error. For the latter, the transfer - * is returned to the endpoint with no data for reuse. - */ -class libusb_managed_send_buffer_impl : public managed_send_buffer { -public: -    libusb_managed_send_buffer_impl(libusb_transfer *lut, -                                    usb_endpoint *endpoint, -                                    size_t buff_size) -        : _buff(lut->buffer, buff_size), _committed(false) -    { -        _lut = lut; -        _endpoint = endpoint; -    } +    managed_recv_buffer::sptr get_recv_buff(double); +    managed_send_buffer::sptr get_send_buff(double); -    ~libusb_managed_send_buffer_impl() -    { -        if (!_committed) { -            _lut->length = 0; -            _lut->actual_length = 0; -            _endpoint->submit(_lut); -        } -    } +    size_t get_num_recv_frames(void) const { return _num_recv_frames; } +    size_t get_num_send_frames(void) const { return _num_send_frames; } -    ssize_t commit(size_t num_bytes) -    { -        if (_committed) { -            std::cerr << "UHD: send buffer already committed" << std::endl; -            return 0; -        } -         -        UHD_ASSERT_THROW(num_bytes <= boost::asio::buffer_size(_buff)); +    size_t get_recv_frame_size(void) const { return _recv_frame_size; } +    size_t get_send_frame_size(void) const { return _send_frame_size; } -        _lut->length = num_bytes; -        _lut->actual_length = 0; +private: +    void release(libusb_transfer *lut){ +        _recv_ep->submit(lut); +    } -        if (_endpoint->submit(_lut)) { -            _committed = true; -            return num_bytes; +    void commit(libusb_transfer *lut, size_t num_bytes){ +        lut->length = num_bytes; +        try{ +            _send_ep->submit(lut);          } -        else { -            return 0; +        catch(const std::exception &e){ +            std::cerr << "Error in commit: " << e.what() << std::endl;          }      } -private: -    const boost::asio::mutable_buffer &get() const -    { -        return _buff;  +    libusb::device_handle::sptr _handle; +    const size_t _recv_frame_size, _num_recv_frames; +    const size_t _send_frame_size, _num_send_frames; +    usb_endpoint::sptr _recv_ep, _send_ep; + +    //event handler threads +    boost::thread_group _thread_group; +    bool _threads_running; + +    void run_event_loop(void){ +        set_thread_priority_safe(); +        libusb::session::sptr session = libusb::session::get_global_session(); +        _threads_running = true; +        while(_threads_running){ +            timeval tv; +            tv.tv_sec = 0; +            tv.tv_usec = 100000; //100ms +            libusb_handle_events_timeout(session->get_context(), &tv); +        }      } - -    libusb_transfer *_lut; -    usb_endpoint *_endpoint; -    const boost::asio::mutable_buffer _buff; -    bool _committed; -}; - - -/*********************************************************************** - * USB zero_copy device class - **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy -{ -private: -    usb_endpoint          *_rx_ep; -    usb_endpoint          *_tx_ep; - -    // Maintain libusb values -    libusb_context       *_rx_ctx; -    libusb_context       *_tx_ctx; -    libusb_device_handle *_rx_dev_handle; -    libusb_device_handle *_tx_dev_handle; - -    size_t _recv_buff_size; -    size_t _send_buff_size; -    size_t _num_frames; - -public: -    typedef boost::shared_ptr<libusb_zero_copy_impl> sptr; - -    libusb_zero_copy_impl(usb_device_handle::sptr handle, -                          unsigned int rx_endpoint, -                          unsigned int tx_endpoint, -                          size_t recv_buff_size, -                          size_t send_buff_size); -     -    ~libusb_zero_copy_impl(); - -    managed_recv_buffer::sptr get_recv_buff(void); -    managed_send_buffer::sptr get_send_buff(void); - -    size_t get_num_recv_frames(void) const { return _num_frames; } -    size_t get_num_send_frames(void) const { return _num_frames; }  };  /*   * Constructor   * Initializes libusb, opens devices, and sets up interfaces for I/O. - * Finally, creates endpoints for asynchronous I/O.  + * Finally, creates endpoints for asynchronous I/O.   */ -libusb_zero_copy_impl::libusb_zero_copy_impl(usb_device_handle::sptr handle, -                                             unsigned int rx_endpoint, -                                             unsigned int tx_endpoint, -                                             size_t buff_size, -                                             size_t block_size) - : _rx_ctx(NULL), _tx_ctx(NULL), _rx_dev_handle(NULL), _tx_dev_handle(NULL), -   _recv_buff_size(block_size), _send_buff_size(block_size), -   _num_frames(buff_size / block_size) +libusb_zero_copy_impl::libusb_zero_copy_impl( +    libusb::device_handle::sptr handle, +    size_t recv_endpoint, +    size_t send_endpoint, +    const device_addr_t &hints +): +    _handle(handle), +    _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), +    _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), +    _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), +    _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)))  { -    // Initialize libusb with separate contexts to allow -    // thread safe operation of transmit and receive  -    libusb::init(&_rx_ctx, libusb_debug_level); -    libusb::init(&_tx_ctx, libusb_debug_level); - -    UHD_ASSERT_THROW((_rx_ctx != NULL) && (_tx_ctx != NULL)); - -    // Find and open the libusb_device corresponding to the -    // given handle and return the libusb_device_handle -    // that can be used for I/O purposes. -    _rx_dev_handle = libusb::open_device(_rx_ctx, handle); -    _tx_dev_handle = libusb::open_device(_tx_ctx, handle); - -    // Open USB interfaces for tx/rx using magic values. -    // IN interface:      2 -    // OUT interface:     1 -    // Control interface: 0 -    libusb::open_interface(_rx_dev_handle, 2); -    libusb::open_interface(_tx_dev_handle, 1); - -    _rx_ep = new usb_endpoint(_rx_dev_handle,  // libusb device_handle -                              _rx_ctx,         // libusb context -                              rx_endpoint,     // USB endpoint number +    _handle->claim_interface(2 /*in interface*/); +    _handle->claim_interface(1 /*out interface*/); + +    _recv_ep = usb_endpoint::sptr(new usb_endpoint( +                              _handle,         // libusb device_handle +                              recv_endpoint,   // USB endpoint number                                true,            // IN endpoint -                              _recv_buff_size, // buffer size per transfer  -                              _num_frames);    // number of libusb transfers +                              this->get_recv_frame_size(),  // buffer size per transfer +                              this->get_num_recv_frames()   // number of libusb transfers +    )); -    _tx_ep = new usb_endpoint(_tx_dev_handle,  // libusb device_handle -                              _tx_ctx,         // libusb context -                              tx_endpoint,     // USB endpoint number +    _send_ep = usb_endpoint::sptr(new usb_endpoint( +                              _handle,         // libusb device_handle +                              send_endpoint,   // USB endpoint number                                false,           // OUT endpoint -                              _send_buff_size, // buffer size per transfer -                              _num_frames);    // number of libusb transfers +                              this->get_send_frame_size(),  // buffer size per transfer +                              this->get_num_send_frames()   // number of libusb transfers +    )); + +    //spawn the event handler threads +    size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); +    for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( +        boost::bind(&libusb_zero_copy_impl::run_event_loop, this) +    );  } - -libusb_zero_copy_impl::~libusb_zero_copy_impl() -{ -    delete _rx_ep; -    delete _tx_ep;  - -    libusb_close(_rx_dev_handle); -    libusb_close(_tx_dev_handle); - -    libusb_exit(_rx_ctx); -    libusb_exit(_tx_ctx); -} - -  /*   * Construct a managed receive buffer from a completed libusb transfer   * (happy with buffer full of data) obtained from the receive endpoint.   * Return empty pointer if no transfer is available (timeout or error). - * \return pointer to a managed receive buffer  + * \return pointer to a managed receive buffer   */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff() -{ -    libusb_transfer *lut = _rx_ep->get_completed_transfer(); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ +    libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout);      if (lut == NULL) {          return managed_recv_buffer::sptr();      }      else { -        return managed_recv_buffer::sptr( -            new libusb_managed_recv_buffer_impl(lut, -                                                _rx_ep)); +        return managed_recv_buffer::make_safe( +            boost::asio::const_buffer(lut->buffer, lut->actual_length), +            boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) +        );      }  } @@ -729,39 +409,34 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff()   * Construct a managed send buffer from a free libusb transfer (with   * empty buffer). Return empty pointer of no transfer is available   * (timeout or error). - * \return pointer to a managed send buffer  + * \return pointer to a managed send buffer   */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff() -{ -    libusb_transfer *lut = _tx_ep->get_free_transfer(); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ +    libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout);      if (lut == NULL) {          return managed_send_buffer::sptr();      }      else { -        return managed_send_buffer::sptr( -            new libusb_managed_send_buffer_impl(lut, -                                                _tx_ep, -                                                _send_buff_size)); +        return managed_send_buffer::make_safe( +            boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), +            boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) +        );      }  } -  /***********************************************************************   * USB zero_copy make functions   **********************************************************************/ -usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle, -                                        unsigned int rx_endpoint, -                                        unsigned int tx_endpoint, -                                        size_t buff_size, -                                        size_t block_size) - -{ -    return sptr(new libusb_zero_copy_impl(handle, -                                          rx_endpoint, -                                          tx_endpoint, -                                          buff_size,  -                                          block_size)); +usb_zero_copy::sptr usb_zero_copy::make( +    usb_device_handle::sptr handle, +    size_t recv_endpoint, +    size_t send_endpoint, +    const device_addr_t &hints +){ +    libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( +        boost::static_pointer_cast<libusb::special_handle>(handle)->get_device() +    )); +    return sptr(new libusb_zero_copy_impl( +        dev_handle, recv_endpoint, send_endpoint, hints +    ));  } - - -  | 
