diff options
-rw-r--r-- | host/lib/transport/libusb1_base.cpp | 16 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 474 |
2 files changed, 134 insertions, 356 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 5ff996642..26e864459 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -20,6 +20,7 @@ #include <uhd/types/dict.hpp> #include <boost/weak_ptr.hpp> #include <boost/foreach.hpp> +#include <boost/thread.hpp> #include <iostream> using namespace uhd::transport; @@ -32,9 +33,12 @@ public: libusb_session_impl(void){ UHD_ASSERT_THROW(libusb_init(&_context) == 0); libusb_set_debug(_context, debug_level); + _thread_group.create_thread(boost::bind(&libusb_session_impl::run_event_loop, this)); } ~libusb_session_impl(void){ + _running = false; + _thread_group.join_all(); libusb_exit(_context); } @@ -44,6 +48,18 @@ public: private: libusb_context *_context; + boost::thread_group _thread_group; + bool _running; + + void run_event_loop(void){ + _running = true; + timeval tv; + while(_running){ + tv.tv_sec = 0; + tv.tv_usec = 100000; //100ms + libusb_handle_events_timeout(this->get_context(), &tv); + } + } }; libusb::session::sptr libusb::session::get_global_session(void){ diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 41cd4b3fc..e84793e88 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -17,8 +17,11 @@ #include "libusb1_base.hpp" #include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/transport/bounded_buffer.hpp> #include <uhd/utils/assert.hpp> -#include <boost/format.hpp> +#include <boost/shared_array.hpp> +#include <boost/foreach.hpp> +#include <vector> #include <iostream> #include <iomanip> @@ -55,52 +58,57 @@ void pp_transfer(libusb_transfer *lut) * interface provided by the kernel. **********************************************************************/ class usb_endpoint { +public: + typedef boost::shared_ptr<usb_endpoint> sptr; + + usb_endpoint( + libusb::device_handle::sptr handle, + int endpoint, + bool input, + size_t transfer_size, + size_t num_transfers + ); + + ~usb_endpoint(void); + + // Exposed interface for submitting / retrieving transfer buffers + + //! Submit a new transfer that was presumably just filled or emptied. + void submit(libusb_transfer *lut); + + /*! + * Get an available transfer: + * For inputs, this is a just filled transfer. + * For outputs, this is a just emptied transfer. + * \param timeout_ms the timeout to wait for a lut + * \return the transfer pointer or NULL if timeout + */ + libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); + + //Callback use only + void callback_handle_transfer(libusb_transfer *lut); + private: libusb::device_handle::sptr _handle; - libusb::session::sptr _session; int _endpoint; bool _input; size_t _transfer_size; size_t _num_transfers; - // Transfer state lists (transfers are free, pending, or completed) - std::list<libusb_transfer *> _free_list; - std::list<libusb_transfer *> _pending_list; - std::list<libusb_transfer *> _completed_list; + //! hold a bounded buffer of completed transfers + typedef bounded_buffer<libusb_transfer *> lut_buff_type; + lut_buff_type::sptr _completed_list; + + //! a list of all transfer structs we allocated + std::vector<libusb_transfer *> _all_luts; + + //! a list of shared arrays for the transfer buffers + std::vector<boost::shared_array<boost::uint8_t> > _buffers; // Calls for processing asynchronous I/O libusb_transfer *allocate_transfer(int buff_len); - bool cancel(libusb_transfer *lut); - bool cancel_all(); - bool reap_pending_list(); - bool reap_pending_list_timeout(); - bool reap_completed_list(); - - // Transfer state manipulators - void free_list_add(libusb_transfer *lut); - void pending_list_add(libusb_transfer *lut); - void completed_list_add(libusb_transfer *lut); - libusb_transfer *free_list_get(); - libusb_transfer *completed_list_get(); - bool pending_list_remove(libusb_transfer *lut); - - // Debug use void print_transfer_status(libusb_transfer *lut); - -public: - usb_endpoint(libusb::device_handle::sptr handle, libusb::session::sptr sess, - int endpoint, bool input, size_t transfer_size, size_t num_transfers); - - ~usb_endpoint(); - - // Exposed interface for submitting / retrieving transfer buffers - bool submit(libusb_transfer *lut); - libusb_transfer *get_completed_transfer(); - libusb_transfer *get_free_transfer(); - - //Callback use only - void callback_handle_transfer(libusb_transfer *lut); }; @@ -113,9 +121,8 @@ public: * it from the pending to completed status list. * \param lut pointer to libusb_transfer */ -static void callback(libusb_transfer *lut) -{ - usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; +static void callback(libusb_transfer *lut){ + usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; endpoint->callback_handle_transfer(lut); } @@ -124,14 +131,8 @@ static void callback(libusb_transfer *lut) * Accessor call to allow list access from callback space * \param pointer to libusb_transfer */ -void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) -{ - if (!pending_list_remove(lut)) { - std::cerr << "USB: pending remove failed" << std::endl; - return; - } - - completed_list_add(lut); +void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ + _completed_list->push_with_wait(lut); } @@ -142,18 +143,27 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut) * data is available. */ usb_endpoint::usb_endpoint( - libusb::device_handle::sptr handle, libusb::session::sptr sess, - int endpoint, bool input, size_t transfer_size, size_t num_transfers) - : _handle(handle), _session(sess), - _endpoint(endpoint), _input(input), - _transfer_size(transfer_size), _num_transfers(num_transfers) + libusb::device_handle::sptr handle, + int endpoint, + bool input, + size_t transfer_size, + size_t num_transfers +): + _handle(handle), + _endpoint(endpoint), + _input(input), + _transfer_size(transfer_size), + _num_transfers(num_transfers) { - unsigned int i; - for (i = 0; i < _num_transfers; i++) { - free_list_add(allocate_transfer(_transfer_size)); + _completed_list = lut_buff_type::make(num_transfers); - if (_input) - submit(free_list_get()); + for (size_t i = 0; i < _num_transfers; i++){ + _all_luts.push_back(allocate_transfer(_transfer_size)); + + //input luts are immediately submitted to be filled + //output luts go into the completed list as free buffers + if (_input) this->submit(_all_luts.back()); + else _completed_list->push_with_wait(_all_luts.back()); } } @@ -165,22 +175,21 @@ usb_endpoint::usb_endpoint( * the transfers. Libusb will deallocate the data buffer held by * each transfer. */ -usb_endpoint::~usb_endpoint() -{ - cancel_all(); - - while (!_pending_list.empty()) { - if (!reap_pending_list()) - std::cerr << "error: destructor failed to reap" << std::endl; +usb_endpoint::~usb_endpoint(void){ + //cancel all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_cancel_transfer(lut); } - while (!_completed_list.empty()) { - if (!reap_completed_list()) - std::cerr << "error: destructor failed to reap" << std::endl; - } + //collect canceled transfers (drain the queue) + libusb_transfer *lut; + while(_completed_list->pop_with_timed_wait( + lut, boost::posix_time::milliseconds(100) + )); - while (!_free_list.empty()) { - libusb_free_transfer(free_list_get()); + //free all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_free_transfer(lut); } } @@ -192,20 +201,21 @@ usb_endpoint::~usb_endpoint() * \param buff_len size of the individual buffer held by each transfer * \return pointer to an allocated libusb_transfer */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) -{ +libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ libusb_transfer *lut = libusb_alloc_transfer(0); - unsigned char *buff = new unsigned char[buff_len]; + boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]); + _buffers.push_back(buff); //store a reference to this shared array unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); + libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); libusb_fill_bulk_transfer(lut, // transfer _handle->get(), // dev_handle endpoint, // endpoint - buff, // buffer + buff.get(), // buffer buff_len, // length - libusb_transfer_cb_fn(callback), // callback + lut_callback, // callback this, // user_data 0); // timeout return lut; @@ -218,95 +228,15 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len) * \param lut pointer to libusb_transfer * \return true on success or false on error */ -bool usb_endpoint::submit(libusb_transfer *lut) -{ - int retval; - if ((retval = libusb_submit_transfer(lut)) < 0) { - std::cerr << "error: libusb_submit_transfer: " << retval << std::endl; - return false; - } - - pending_list_add(lut); - return true; -} - - -/* - * Cancel a pending transfer - * Search the pending list for the transfer and cancel if found. - * \param lut pointer to libusb_transfer to cancel - * \return true on success or false if transfer is not found - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel(libusb_transfer *lut) -{ - std::list<libusb_transfer*>::iterator iter; - for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { - if (*iter == lut) { - libusb_cancel_transfer(lut); - return true; - } - } - return false; -} - - -/* - * Cancel all pending transfers - * \return bool true if cancelation request is submitted - * - * Note: success only indicates submission of cancelation request. - * Sucessful cancelation is not known until the callback occurs. - */ -bool usb_endpoint::cancel_all() -{ - std::list<libusb_transfer*>::iterator iter; - - for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { - if (libusb_cancel_transfer(*iter) < 0) { - std::cerr << "error: libusb_cancal_transfer() failed" << std::endl; - return false; - } - } - - return true; -} - - -/* - * Reap completed transfers - * return true if at least one transfer was reaped, false otherwise. - * Check completed transfers for errors and mark as free. This is a - * blocking call. - * \return bool true if a libusb transfer is reaped, false otherwise - */ -bool usb_endpoint::reap_completed_list() -{ - libusb_transfer *lut; - - if (_completed_list.empty()) { - if (!reap_pending_list_timeout()) - return false; - } - - while (!_completed_list.empty()) { - lut = completed_list_get(); - print_transfer_status(lut); - free_list_add(lut); - } - - return true; +void usb_endpoint::submit(libusb_transfer *lut){ + UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); } - /* * Print status errors of a completed transfer * \param lut pointer to an libusb_transfer */ -void usb_endpoint::print_transfer_status(libusb_transfer *lut) -{ +void usb_endpoint::print_transfer_status(libusb_transfer *lut){ switch (lut->status) { case LIBUSB_TRANSFER_COMPLETED: if (lut->actual_length < lut->length) { @@ -342,166 +272,14 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut) } } - -/* - * Reap pending transfers without timeout - * This is a blocking call. Reaping submitted transfers is - * handled by libusb and the assigned callback function. - * Block until at least one transfer is reaped. - * \return true true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list() -{ - int retval; - - if ((retval = libusb_handle_events(_session->get_context())) < 0) { - std::cerr << "error: libusb_handle_events: " << retval << std::endl; - return false; - } - - return true; -} - - -/* - * Reap pending transfers with timeout - * This call blocks until a transfer is reaped or timeout. - * Reaping submitted transfers is handled by libusb and the - * assigned callback function. Block until at least one - * transfer is reaped or timeout occurs. - * \return true if a transfer was reaped or false otherwise - */ -bool usb_endpoint::reap_pending_list_timeout() -{ - int retval; - timeval tv; - - tv.tv_sec = 0; - tv.tv_usec = 100000; //100ms - - size_t pending_list_size = _pending_list.size(); - - if ((retval = libusb_handle_events_timeout(_session->get_context(), &tv)) < 0) { - std::cerr << "error: libusb_handle_events: " << retval << std::endl; - return false; - } - - if (_pending_list.size() < pending_list_size) { - return true; - } - else { - return false; - } -} - - -/* - * Get a free transfer - * The transfer has an empty data bufer for OUT requests - * \return pointer to a libusb_transfer - */ -libusb_transfer *usb_endpoint::get_free_transfer() -{ - if (_free_list.empty()) { - if (!reap_completed_list()) - return NULL; - } - - return free_list_get(); -} - - -/* - * Get a completed transfer - * The transfer has a full data buffer for IN requests - * \return pointer to libusb_transfer - */ -libusb_transfer *usb_endpoint::get_completed_transfer() -{ - if (_completed_list.empty()) { - if (!reap_pending_list_timeout()) - return NULL; - } - - return completed_list_get(); -} - -/* - * List operations - */ -void usb_endpoint::free_list_add(libusb_transfer *lut) -{ - _free_list.push_back(lut); -} - -void usb_endpoint::pending_list_add(libusb_transfer *lut) -{ - _pending_list.push_back(lut); -} - -void usb_endpoint::completed_list_add(libusb_transfer *lut) -{ - _completed_list.push_back(lut); -} - - -/* - * Free and completed lists don't have ordered content - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::free_list_get() -{ - libusb_transfer *lut; - - if (_free_list.size() == 0) { - return NULL; - } - else { - lut = _free_list.front(); - _free_list.pop_front(); - return lut; - } -} - - -/* - * Free and completed lists don't have ordered content - * Pop transfers from the front as needed - */ -libusb_transfer *usb_endpoint::completed_list_get() -{ +libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ libusb_transfer *lut; - - if (_completed_list.empty()) { - return NULL; - } - else { - lut = _completed_list.front(); - _completed_list.pop_front(); - return lut; - } + if (_completed_list->pop_with_timed_wait( + lut, boost::posix_time::milliseconds(timeout_ms) + )) return lut; + return NULL; } - -/* - * Search and remove transfer from pending list - * Assuming that the callbacks occur in order, the front element - * should yield the correct transfer. If not, then something else - * is going on. If no transfers match, then something went wrong. - */ -bool usb_endpoint::pending_list_remove(libusb_transfer *lut) -{ - std::list<libusb_transfer*>::iterator iter; - for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) { - if (*iter == lut) { - _pending_list.erase(iter); - return true; - } - } - return false; -} - - /*********************************************************************** * Managed buffers **********************************************************************/ @@ -515,17 +293,15 @@ bool usb_endpoint::pending_list_remove(libusb_transfer *lut) class libusb_managed_recv_buffer_impl : public managed_recv_buffer { public: libusb_managed_recv_buffer_impl(libusb_transfer *lut, - usb_endpoint *endpoint) + usb_endpoint::sptr endpoint) : _buff(lut->buffer, lut->length) { _lut = lut; _endpoint = endpoint; } - ~libusb_managed_recv_buffer_impl() - { - if (!_endpoint->submit(_lut)) - std::cerr << "USB: failed to submit IN transfer" << std::endl; + ~libusb_managed_recv_buffer_impl(void){ + _endpoint->submit(_lut); } private: @@ -535,7 +311,7 @@ private: } libusb_transfer *_lut; - usb_endpoint *_endpoint; + usb_endpoint::sptr _endpoint; const boost::asio::const_buffer _buff; }; @@ -551,7 +327,7 @@ private: class libusb_managed_send_buffer_impl : public managed_send_buffer { public: libusb_managed_send_buffer_impl(libusb_transfer *lut, - usb_endpoint *endpoint, + usb_endpoint::sptr endpoint, size_t buff_size) : _buff(lut->buffer, buff_size), _committed(false) { @@ -559,8 +335,7 @@ public: _endpoint = endpoint; } - ~libusb_managed_send_buffer_impl() - { + ~libusb_managed_send_buffer_impl(void){ if (!_committed) { _lut->length = 0; _lut->actual_length = 0; @@ -580,12 +355,14 @@ public: _lut->length = num_bytes; _lut->actual_length = 0; - if (_endpoint->submit(_lut)) { + try{ + _endpoint->submit(_lut); _committed = true; return num_bytes; } - else { - return 0; + catch(const std::exception &e){ + std::cerr << "Error in commit: " << e.what() << std::endl; + return -1; } } @@ -596,7 +373,7 @@ private: } libusb_transfer *_lut; - usb_endpoint *_endpoint; + usb_endpoint::sptr _endpoint; const boost::asio::mutable_buffer _buff; bool _committed; }; @@ -608,11 +385,9 @@ private: class libusb_zero_copy_impl : public usb_zero_copy { private: - usb_endpoint *_rx_ep; - usb_endpoint *_tx_ep; + usb_endpoint::sptr _rx_ep, _tx_ep; libusb::device_handle::sptr _handle; - libusb::session::sptr _session; size_t _recv_buff_size; size_t _send_buff_size; @@ -626,8 +401,6 @@ public: unsigned int tx_endpoint, size_t recv_buff_size, size_t send_buff_size); - - ~libusb_zero_copy_impl(); managed_recv_buffer::sptr get_recv_buff(void); managed_send_buffer::sptr get_send_buff(void); @@ -646,45 +419,38 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(libusb::device_handle::sptr handle, unsigned int tx_endpoint, size_t buff_size, size_t block_size) - : _handle(handle), _session(libusb::session::get_global_session()), + : _handle(handle), _recv_buff_size(block_size), _send_buff_size(block_size), _num_frames(buff_size / block_size) { _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); - _rx_ep = new usb_endpoint(_handle, // libusb device_handle - _session, // libusb session w/ context + _rx_ep = usb_endpoint::sptr(new usb_endpoint( + _handle, // libusb device_handle rx_endpoint, // USB endpoint number true, // IN endpoint _recv_buff_size, // buffer size per transfer - _num_frames); // number of libusb transfers + _num_frames // number of libusb transfers + )); - _tx_ep = new usb_endpoint(_handle, // libusb device_handle - _session, // libusb session w/ context + _tx_ep = usb_endpoint::sptr(new usb_endpoint( + _handle, // libusb device_handle tx_endpoint, // USB endpoint number false, // OUT endpoint _send_buff_size, // buffer size per transfer - _num_frames); // number of libusb transfers -} - - -libusb_zero_copy_impl::~libusb_zero_copy_impl() -{ - delete _rx_ep; - delete _tx_ep; + _num_frames // number of libusb transfers + )); } - /* * Construct a managed receive buffer from a completed libusb transfer * (happy with buffer full of data) obtained from the receive endpoint. * Return empty pointer if no transfer is available (timeout or error). * \return pointer to a managed receive buffer */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff() -{ - libusb_transfer *lut = _rx_ep->get_completed_transfer(); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(void){ + libusb_transfer *lut = _rx_ep->get_lut_with_wait(/* TODO timeout API */); if (lut == NULL) { return managed_recv_buffer::sptr(); } @@ -702,9 +468,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff() * (timeout or error). * \return pointer to a managed send buffer */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff() -{ - libusb_transfer *lut = _tx_ep->get_free_transfer(); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ + libusb_transfer *lut = _tx_ep->get_lut_with_wait(/* TODO timeout API */); if (lut == NULL) { return managed_send_buffer::sptr(); } @@ -736,6 +501,3 @@ usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle, buff_size, block_size)); } - - - |