// // Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . // #include "libusb1_base.hpp" #include #include #include #include #include #include #include #include #include #include #include using namespace uhd; using namespace uhd::transport; static const size_t DEFAULT_NUM_XFERS = 16; //num xfers static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes //! Define LIBUSB_CALL when its missing (non-windows) #ifndef LIBUSB_CALL #define LIBUSB_CALL #endif /*LIBUSB_CALL*/ /*! * All libusb callback functions should be marked with the LIBUSB_CALL macro * to ensure that they are compiled with the same calling convention as libusb. */ //! helper function: handles all async callbacks static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ (*static_cast *>(lut->user_data))(); } //! callback to free transfer upon cancellation static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){ if (lut->status == LIBUSB_TRANSFER_CANCELLED) libusb_free_transfer(lut); else UHD_LOGV(rarely) << "libusb cancel_transfer unexpected status " << lut->status << std::endl; } /*********************************************************************** * Reusable managed receiver buffer: * - Associated with a particular libusb transfer struct. * - Submits the transfer to libusb in the release method. **********************************************************************/ class libusb_zero_copy_mrb : public managed_recv_buffer{ public: libusb_zero_copy_mrb(libusb_transfer *lut): _lut(lut), _expired(true) { /* NOP */ } void release(void){ if (_expired) return; UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); _expired = true; } sptr get_new(void){ _expired = false; return make_managed_buffer(this); } private: const void *get_buff(void) const{return _lut->buffer;} size_t get_size(void) const{return _lut->actual_length;} libusb_transfer *_lut; bool _expired; }; /*********************************************************************** * Reusable managed send buffer: * - Associated with a particular libusb transfer struct. * - Submits the transfer to libusb in the commit method. **********************************************************************/ class libusb_zero_copy_msb : public managed_send_buffer{ public: libusb_zero_copy_msb(libusb_transfer *lut): _lut(lut), _expired(true) { /* NOP */ } void commit(size_t len){ if (_expired) return; _lut->length = len; if(len == 0) libusb_async_cb(_lut); else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); _expired = true; } sptr get_new(void){ _expired = false; return make_managed_buffer(this); } private: void *get_buff(void) const{return _lut->buffer;} size_t get_size(void) const{return _lut->length;} libusb_transfer *_lut; bool _expired; }; /*********************************************************************** * USB zero_copy device class **********************************************************************/ class libusb_zero_copy_impl : public usb_zero_copy{ public: libusb_zero_copy_impl( libusb::device_handle::sptr handle, size_t recv_endpoint, size_t send_endpoint, const device_addr_t &hints ): _handle(handle), _recv_frame_size(size_t(hints.cast("recv_frame_size", DEFAULT_XFER_SIZE))), _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_XFERS))), _send_frame_size(size_t(hints.cast("send_frame_size", DEFAULT_XFER_SIZE))), _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_XFERS))), _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames) { _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); //allocate libusb transfer structs and managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); _mrb_pool.push_back(libusb_zero_copy_mrb(lut)); _callbacks.push_back(boost::bind( &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back() )); libusb_fill_bulk_transfer( lut, // transfer _handle->get(), // dev_handle (recv_endpoint & 0x7f) | 0x80, // endpoint static_cast(_recv_buffer_pool->at(i)), // buffer this->get_recv_frame_size(), // length libusb_transfer_cb_fn(&libusb_async_cb), // callback static_cast(&_callbacks.back()), // user_data 0 // timeout ); _all_luts.push_back(lut); _mrb_pool.back().get_new(); } //allocate libusb transfer structs and managed send buffers for (size_t i = 0; i < get_num_send_frames(); i++){ libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); _msb_pool.push_back(libusb_zero_copy_msb(lut)); _callbacks.push_back(boost::bind( &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back() )); libusb_fill_bulk_transfer( lut, // transfer _handle->get(), // dev_handle (send_endpoint & 0x7f) | 0x00, // endpoint static_cast(_send_buffer_pool->at(i)), // buffer this->get_send_frame_size(), // length libusb_transfer_cb_fn(&libusb_async_cb), // callback static_cast(&_callbacks.back()), // user_data 0 // timeout ); _all_luts.push_back(lut); libusb_async_cb(lut); } //spawn the event handler threads size_t concurrency = hints.cast("concurrency_hint", 1); boost::barrier spawn_barrier(concurrency+1); for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier)) ); spawn_barrier.wait(); } ~libusb_zero_copy_impl(void){ //cancel and free all transfers BOOST_FOREACH(libusb_transfer *lut, _all_luts){ lut->callback = libusb_transfer_cb_fn(&cancel_transfer_cb); libusb_cancel_transfer(lut); while(lut->status != LIBUSB_TRANSFER_CANCELLED && lut->status != LIBUSB_TRANSFER_COMPLETED) { boost::this_thread::sleep(boost::posix_time::milliseconds(10)); } } //shutdown the threads _threads_running = false; _thread_group.interrupt_all(); _thread_group.join_all(); } managed_recv_buffer::sptr get_recv_buff(double timeout){ libusb_zero_copy_mrb *mrb = NULL; if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ return mrb->get_new(); } return managed_recv_buffer::sptr(); } managed_send_buffer::sptr get_send_buff(double timeout){ libusb_zero_copy_msb *msb = NULL; if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ return msb->get_new(); } return managed_send_buffer::sptr(); } size_t get_num_recv_frames(void) const { return _num_recv_frames; } size_t get_num_send_frames(void) const { return _num_send_frames; } size_t get_recv_frame_size(void) const { return _recv_frame_size; } size_t get_send_frame_size(void) const { return _send_frame_size; } private: //! Handle a bound async callback for recv void handle_recv(libusb_zero_copy_mrb *mrb){ _pending_recv_buffs.push_with_haste(mrb); } //! Handle a bound async callback for send void handle_send(libusb_zero_copy_msb *msb){ _pending_send_buffs.push_with_haste(msb); } libusb::device_handle::sptr _handle; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; //! Storage for transfer related objects buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; bounded_buffer _pending_recv_buffs; bounded_buffer _pending_send_buffs; std::list _mrb_pool; std::list _msb_pool; std::list > _callbacks; //! a list of all transfer structs we allocated std::list _all_luts; //! event handler threads boost::thread_group _thread_group; bool _threads_running; void run_event_loop(boost::barrier &spawn_barrier){ _threads_running = true; spawn_barrier.wait(); set_thread_priority_safe(); libusb_context *context = libusb::session::get_global_session()->get_context(); try{ while(_threads_running){ timeval tv; tv.tv_sec = 0; tv.tv_usec = 100000; //100ms libusb_handle_events_timeout(context, &tv); } } catch(const boost::thread_interrupted &){} } }; /*********************************************************************** * USB zero_copy make functions **********************************************************************/ usb_zero_copy::sptr usb_zero_copy::make( usb_device_handle::sptr handle, size_t recv_endpoint, size_t send_endpoint, const device_addr_t &hints ){ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( boost::static_pointer_cast(handle)->get_device() )); return sptr(new libusb_zero_copy_impl( dev_handle, recv_endpoint, send_endpoint, hints )); }