diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 7 | ||||
-rw-r--r-- | host/lib/transport/gen_vrt_if_packet.py | 143 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.cpp | 33 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.hpp | 4 | ||||
-rw-r--r-- | host/lib/transport/libusb1_control.cpp | 5 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 335 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 53 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 41 | ||||
-rw-r--r-- | host/lib/transport/usb_zero_copy_wrapper.cpp | 231 |
9 files changed, 465 insertions, 387 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 6524a8412..963edcf85 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2010-2011 Ettus Research LLC +# Copyright 2010-2013 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -37,6 +37,10 @@ IF(ENABLE_USB) ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.cpp ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.hpp ) + SET_SOURCE_FILES_PROPERTIES( + ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_zero_copy.cpp + PROPERTIES COMPILE_DEFINITIONS "${LIBUSB_DEFINITIONS}" + ) ELSE(ENABLE_USB) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/usb_dummy_impl.cpp @@ -118,5 +122,4 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/usb_zero_copy_wrapper.cpp ) diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index e28ce3aae..98f6804ae 100644 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2010-2011 Ettus Research LLC +# Copyright 2010-2013 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -48,12 +48,14 @@ TMPL_TEXT = """ using namespace uhd; using namespace uhd::transport; +using namespace uhd::transport::vrt; typedef size_t pred_type; typedef std::vector<pred_type> pred_table_type; #define pred_table_index(hdr) ((hdr >> 20) & 0x1ff) -static pred_table_type get_pred_unpack_table(void){ +static pred_table_type get_pred_unpack_table(void) +{ pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28) for (size_t i = 0; i < table.size(); i++){ boost::uint32_t vrt_hdr_word = i << 20; @@ -74,13 +76,45 @@ static const pred_table_type pred_unpack_table(get_pred_unpack_table()); //maps num empty bytes to trailer bits static const size_t occ_table[] = {0, 2, 1, 3}; +const boost::uint32_t VRLP = ('V' << 24) | ('R' << 16) | ('L' << 8) | ('P' << 0); +const boost::uint32_t VEND = ('V' << 24) | ('E' << 16) | ('N' << 8) | ('D' << 0); + +UHD_INLINE static boost::uint32_t chdr_to_vrt(const boost::uint32_t chdr, if_packet_info_t &info) +{ + const boost::uint32_t bytes = chdr & 0xffff; + boost::uint32_t vrt = (bytes + 3)/4; + info.packet_count = (chdr >> 16) & 0xfff; + vrt |= ((chdr >> 31) & 0x1) << 30; //context packet + vrt |= ((chdr >> 29) & 0x1) << 20; //has tsf + vrt |= ((chdr >> 28) & 0x1) << 24; //has eob + vrt |= (0x1) << 28; //has sid (always) + return vrt; +} + +UHD_INLINE static boost::uint32_t vrt_to_chdr(const boost::uint32_t vrt, const if_packet_info_t &info) +{ + const boost::uint32_t words32 = vrt & 0xffff; + int bytes_rem = info.num_payload_bytes % 4; + if (bytes_rem != 0) bytes_rem -= 4; //adjust for round up + boost::uint32_t chdr = (words32 * 4) + bytes_rem; + chdr |= (info.packet_count & 0xfff) << 16; + chdr |= ((vrt >> 30) & 0x1) << 31; //context packet + chdr |= ((vrt >> 20) & 0x1) << 29; //has tsf + chdr |= ((vrt >> 24) & 0x1) << 28; //has eob + return chdr; +} + ######################################################################## #def gen_code($XE_MACRO, $suffix) ######################################################################## -void vrt::if_hdr_pack_$(suffix)( +/*********************************************************************** + * interal impl of packing VRT IF header only + **********************************************************************/ +UHD_INLINE void __if_hdr_pack_$(suffix)( boost::uint32_t *packet_buff, - if_packet_info_t &if_packet_info + if_packet_info_t &if_packet_info, + boost::uint32_t &vrt_hdr_word32 ){ boost::uint32_t vrt_hdr_flags = 0; @@ -154,31 +188,33 @@ void vrt::if_hdr_pack_$(suffix)( } //fill in complete header word - packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 + vrt_hdr_word32 = boost::uint32_t(0 | (if_packet_info.packet_type << 29) | vrt_hdr_flags | ((if_packet_info.packet_count & 0xf) << 16) | (if_packet_info.num_packet_words32 & 0xffff) - )); + ); } -void vrt::if_hdr_unpack_$(suffix)( +/*********************************************************************** + * interal impl of unpacking VRT IF header only + **********************************************************************/ +UHD_INLINE void __if_hdr_unpack_$(suffix)( const boost::uint32_t *packet_buff, - if_packet_info_t &if_packet_info + if_packet_info_t &if_packet_info, + const boost::uint32_t vrt_hdr_word32 ){ - //extract vrt header - boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]); - size_t packet_words32 = vrt_hdr_word & 0xffff; + const size_t packet_words32 = vrt_hdr_word32 & 0xffff; //failure case if (if_packet_info.num_packet_words32 < packet_words32) throw uhd::value_error("bad vrt header or packet fragment"); //extract fields from the header - if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); - if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; + if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word32 >> 29); + if_packet_info.packet_count = (vrt_hdr_word32 >> 16) & 0xf; - const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; + const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word32)]; size_t empty_bytes = 0; @@ -259,6 +295,85 @@ void vrt::if_hdr_unpack_$(suffix)( } } +/*********************************************************************** + * link layer + VRT IF packing + **********************************************************************/ +void vrt::if_hdr_pack_$(suffix)( + boost::uint32_t *packet_buff, + if_packet_info_t &if_packet_info +){ + boost::uint32_t vrt_hdr_word32 = 0; + switch (if_packet_info.link_type) + { + case if_packet_info_t::LINK_TYPE_NONE: + __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + packet_buff[0] = $(XE_MACRO)(vrt_hdr_word32); + break; + + case if_packet_info_t::LINK_TYPE_CHDR: + { + __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + const boost::uint32_t chdr = vrt_to_chdr(vrt_hdr_word32, if_packet_info); + packet_buff[0] = $(XE_MACRO)(chdr); + break; + } + + case if_packet_info_t::LINK_TYPE_VRLP: + __if_hdr_pack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32); + if_packet_info.num_header_words32 += 2; + if_packet_info.num_packet_words32 += 3; + packet_buff[0] = $(XE_MACRO)(VRLP); + packet_buff[1] = $(XE_MACRO)(boost::uint32_t( + (if_packet_info.num_packet_words32 & 0xfffff) | + ((if_packet_info.packet_count & 0xfff) << 20) + )); + packet_buff[2] = $(XE_MACRO)(vrt_hdr_word32); + packet_buff[if_packet_info.num_packet_words32-1] = $(XE_MACRO)(VEND); + break; + } +} + +/*********************************************************************** + * link layer + VRT IF unpacking + **********************************************************************/ +void vrt::if_hdr_unpack_$(suffix)( + const boost::uint32_t *packet_buff, + if_packet_info_t &if_packet_info +){ + boost::uint32_t vrt_hdr_word32 = 0; + switch (if_packet_info.link_type) + { + case if_packet_info_t::LINK_TYPE_NONE: + vrt_hdr_word32 = $(XE_MACRO)(packet_buff[0]); + __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + break; + + case if_packet_info_t::LINK_TYPE_CHDR: + { + const boost::uint32_t chdr = $(XE_MACRO)(packet_buff[0]); + vrt_hdr_word32 = chdr_to_vrt(chdr, if_packet_info); + size_t packet_count = if_packet_info.packet_count; + __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + if_packet_info.num_payload_bytes -= (~chdr + 1) & 0x3; + if_packet_info.packet_count = packet_count; + break; + } + + case if_packet_info_t::LINK_TYPE_VRLP: + { + if ($(XE_MACRO)(packet_buff[0]) != VRLP) throw uhd::value_error("bad vrl header VRLP"); + const boost::uint32_t vrl_hdr = $(XE_MACRO)(packet_buff[1]); + vrt_hdr_word32 = $(XE_MACRO)(packet_buff[2]); + if (if_packet_info.num_packet_words32 < (vrl_hdr & 0xfffff)) throw uhd::value_error("bad vrl header or packet fragment"); + if ($(XE_MACRO)(packet_buff[(vrl_hdr & 0xfffff)-1]) != VEND) throw uhd::value_error("bad vrl trailer VEND"); + __if_hdr_unpack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32); + if_packet_info.num_header_words32 += 2; //add vrl header + if_packet_info.packet_count = (vrl_hdr >> 20) & 0xfff; + break; + } + } +} + ######################################################################## #end def ######################################################################## diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index d4ec874f1..0ef53db0a 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010-2011 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -23,6 +23,7 @@ #include <boost/weak_ptr.hpp> #include <boost/thread/mutex.hpp> #include <boost/foreach.hpp> +#include <cstdlib> #include <iostream> using namespace uhd; @@ -59,6 +60,15 @@ libusb::session::sptr libusb::session::get_global_session(void){ //create a new global session sptr new_global_session(new libusb_session_impl()); global_session = new_global_session; + + //set logging if envvar is set + const char *level_string = getenv("LIBUSB_DEBUG_LEVEL"); + if (level_string != NULL) + { + const int level = int(level_string[0] - '0'); //easy conversion to integer + if (level >= 0 and level <= 3) libusb_set_debug(new_global_session->get_context(), level); + } + return new_global_session; } @@ -137,8 +147,13 @@ public: return _desc; } - std::string get_ascii_serial(void) const{ - if (this->get().iSerialNumber == 0) return ""; + std::string get_ascii_property(const std::string &what) const + { + boost::uint8_t off = 0; + if (what == "serial") off = this->get().iSerialNumber; + if (what == "product") off = this->get().iProduct; + if (what == "manufacturer") off = this->get().iManufacturer; + if (off == 0) return ""; libusb::device_handle::sptr handle( libusb::device_handle::get_cached_handle(_dev) @@ -146,7 +161,7 @@ public: unsigned char buff[512]; ssize_t ret = libusb_get_string_descriptor_ascii( - handle->get(), this->get().iSerialNumber, buff, sizeof(buff) + handle->get(), off, buff, sizeof(buff) ); if (ret < 0) return ""; //on error, just return empty string @@ -240,7 +255,15 @@ public: } std::string get_serial(void) const{ - return libusb::device_descriptor::make(this->get_device())->get_ascii_serial(); + return libusb::device_descriptor::make(this->get_device())->get_ascii_property("serial"); + } + + std::string get_manufacturer() const{ + return libusb::device_descriptor::make(this->get_device())->get_ascii_property("manufacturer"); + } + + std::string get_product() const{ + return libusb::device_descriptor::make(this->get_device())->get_ascii_property("product"); } boost::uint16_t get_vendor_id(void) const{ diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp index 04c1d6574..7dab07fda 100644 --- a/host/lib/transport/libusb1_base.hpp +++ b/host/lib/transport/libusb1_base.hpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -102,7 +102,7 @@ namespace libusb { //! get the underlying device descriptor virtual const libusb_device_descriptor &get(void) const = 0; - virtual std::string get_ascii_serial(void) const = 0; + virtual std::string get_ascii_property(const std::string &what) const = 0; }; /*! diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp index 3d9b38785..c1b8fe6df 100644 --- a/host/lib/transport/libusb1_control.cpp +++ b/host/lib/transport/libusb1_control.cpp @@ -21,8 +21,6 @@ using namespace uhd::transport; -const int libusb_timeout = 0; - /*********************************************************************** * libusb-1.0 implementation of USB control transport **********************************************************************/ @@ -39,7 +37,8 @@ public: boost::uint16_t value, boost::uint16_t index, unsigned char *buff, - boost::uint16_t length + boost::uint16_t length, + boost::int32_t libusb_timeout = 0 ){ boost::mutex::scoped_lock lock(_mutex); return libusb_control_transfer(_handle->get(), diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 28bff9709..197e257da 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010-2012 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -18,11 +18,17 @@ #include "libusb1_base.hpp" #include <uhd/transport/usb_zero_copy.hpp> #include <uhd/transport/buffer_pool.hpp> +#include <uhd/transport/bounded_buffer.hpp> #include <uhd/utils/msg.hpp> #include <uhd/exception.hpp> #include <boost/foreach.hpp> +#include <boost/format.hpp> +#include <boost/function.hpp> +#include <boost/bind.hpp> #include <boost/make_shared.hpp> -#include <boost/thread/thread.hpp> +#include <boost/circular_buffer.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> #include <list> using namespace uhd; @@ -36,14 +42,51 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes #define LIBUSB_CALL #endif /*LIBUSB_CALL*/ +//! libusb_handle_events_timeout_completed is only in newer API +#ifndef HAVE_LIBUSB_HANDLE_EVENTS_TIMEOUT_COMPLETED + #define libusb_handle_events_timeout_completed(ctx, tx, completed) \ + libusb_handle_events_timeout(ctx, tx) +#endif + +//! libusb_error_name is only in newer API +#ifndef HAVE_LIBUSB_ERROR_NAME + #define libusb_error_name(code) \ + str(boost::format("LIBUSB_ERROR_CODE %d") % code) +#endif + +//! type for sharing the release queue with managed buffers +class libusb_zero_copy_mb; +typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb *> > mb_queue_sptr; + +/*! + * The libusb docs state that status and actual length can only be read in the callback. + * Therefore, this struct is intended to store data seen from the callback function. + */ +struct lut_result_t +{ + lut_result_t(void) + { + completed = 0; + status = LIBUSB_TRANSFER_COMPLETED; + actual_length = 0; + } + int completed; + libusb_transfer_status status; + int actual_length; +}; + /*! * All libusb callback functions should be marked with the LIBUSB_CALL macro * to ensure that they are compiled with the same calling convention as libusb. */ //! helper function: handles all async callbacks -static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ - *(static_cast<bool *>(lut->user_data)) = true; +static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut) +{ + lut_result_t *r = (lut_result_t *)lut->user_data; + r->completed = 1; + r->status = lut->status; + r->actual_length = lut->actual_length; } /*! @@ -61,7 +104,8 @@ static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ * \param completed a reference to the completed flag * \return true for completion, false for timeout */ -UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){ +UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, int &completed) +{ //already completed by a previous call? if (completed) return true; @@ -69,7 +113,7 @@ UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, b timeval tv; tv.tv_sec = 0; tv.tv_usec = 0; - libusb_handle_events_timeout(ctx, &tv); + libusb_handle_events_timeout_completed(ctx, &tv, &completed); if (completed) return true; //finish the rest with a timeout loop @@ -78,73 +122,54 @@ UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, b timeval tv; tv.tv_sec = 0; tv.tv_usec = 10000; /*10ms*/ - libusb_handle_events_timeout(ctx, &tv); + libusb_handle_events_timeout_completed(ctx, &tv, &completed); } return completed; } /*********************************************************************** - * Reusable managed receiver buffer: + * Reusable managed buffer: * - Associated with a particular libusb transfer struct. * - Submits the transfer to libusb in the release method. **********************************************************************/ -class libusb_zero_copy_mrb : public managed_recv_buffer{ +class libusb_zero_copy_mb : public managed_buffer +{ public: - libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size): + libusb_zero_copy_mb(libusb_transfer *lut, const size_t frame_size, boost::function<void(libusb_zero_copy_mb *)> release_cb, const bool is_recv, const std::string &name): + _release_cb(release_cb), _is_recv(is_recv), _name(name), _ctx(libusb::session::get_global_session()->get_context()), _lut(lut), _frame_size(frame_size) { /* NOP */ } - void release(void){ - completed = false; - _lut->length = _frame_size; //always reset length - UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); - } - - sptr get_new(const double timeout, size_t &index){ - if (wait_for_completion(_ctx, timeout, completed)){ - index++; - return make(this, _lut->buffer, _lut->actual_length); - } - return managed_recv_buffer::sptr(); - } - - bool completed; - -private: - libusb_context *_ctx; - libusb_transfer *_lut; - const size_t _frame_size; -}; - -/*********************************************************************** - * Reusable managed send buffer: - * - Associated with a particular libusb transfer struct. - * - Submits the transfer to libusb in the commit method. - **********************************************************************/ -class libusb_zero_copy_msb : public managed_send_buffer{ -public: - libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size): - _ctx(libusb::session::get_global_session()->get_context()), - _lut(lut), _frame_size(frame_size) { completed = true; } + void release(void){_release_cb(this);} - void release(void){ - completed = false; - _lut->length = size(); - UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); + UHD_INLINE void submit(void) + { + _lut->length = (_is_recv)? _frame_size : size(); //always set length + const int ret = libusb_submit_transfer(_lut); + if (ret != 0) throw uhd::runtime_error(str(boost::format( + "usb %s submit failed: %s") % _name % libusb_error_name(ret))); } - sptr get_new(const double timeout, size_t &index){ - if (wait_for_completion(_ctx, timeout, completed)){ - index++; - return make(this, _lut->buffer, _frame_size); + template <typename buffer_type> + UHD_INLINE typename buffer_type::sptr get_new(const double timeout) + { + if (wait_for_completion(_ctx, timeout, result.completed)) + { + if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format( + "usb %s transfer status: %d") % _name % int(result.status))); + result.completed = 0; + return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size); } - return managed_send_buffer::sptr(); + return typename buffer_type::sptr(); } - bool completed; + lut_result_t result; private: + boost::function<void(libusb_zero_copy_mb *)> _release_cb; + const bool _is_recv; + const std::string _name; libusb_context *_ctx; libusb_transfer *_lut; const size_t _frame_size; @@ -153,39 +178,33 @@ private: /*********************************************************************** * USB zero_copy device class **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy{ +class libusb_zero_copy_single +{ public: - - libusb_zero_copy_impl( + libusb_zero_copy_single( libusb::device_handle::sptr handle, - const size_t recv_interface, - const size_t recv_endpoint, - const size_t send_interface, - const size_t send_endpoint, - const device_addr_t &hints + const size_t interface, const size_t endpoint, + const size_t num_frames, const size_t frame_size ): _handle(handle), - _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), - _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), - _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), - _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))), - _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), - _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _next_recv_buff_index(0), - _next_send_buff_index(0) + _num_frames(num_frames), + _frame_size(frame_size), + _buffer_pool(buffer_pool::make(_num_frames, _frame_size)), + _enqueued(_num_frames), _released(_num_frames) { - _handle->claim_interface(recv_interface); - _handle->claim_interface(send_interface); + const bool is_recv = (endpoint & 0x80) != 0; + const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f)); + _handle->claim_interface(interface); //flush the buffers out of the recv endpoint //limit the flushing to at most one second - for (size_t i = 0; i < 100; i++) + if (is_recv) for (size_t i = 0; i < 100; i++) { unsigned char buff[512]; int transfered = 0; const int status = libusb_bulk_transfer( _handle->get(), // dev_handle - (recv_endpoint & 0x7f) | 0x80, // endpoint + endpoint, // endpoint static_cast<unsigned char *>(buff), sizeof(buff), &transfered, //bytes xfered @@ -194,102 +213,170 @@ public: if (status == LIBUSB_ERROR_TIMEOUT) break; } - //allocate libusb transfer structs and managed receive buffers - for (size_t i = 0; i < get_num_recv_frames(); i++){ - + //allocate libusb transfer structs and managed buffers + for (size_t i = 0; i < get_num_frames(); i++) + { libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); - _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size())); + _mb_pool.push_back(boost::make_shared<libusb_zero_copy_mb>( + lut, this->get_frame_size(), boost::bind(&libusb_zero_copy_single::enqueue_damn_buffer, this, _1), is_recv, name + )); libusb_fill_bulk_transfer( lut, // transfer _handle->get(), // dev_handle - (recv_endpoint & 0x7f) | 0x80, // endpoint - static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer - this->get_recv_frame_size(), // length + endpoint, // endpoint + static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer + this->get_frame_size(), // length libusb_transfer_cb_fn(&libusb_async_cb), // callback - static_cast<void *>(&_mrb_pool.back()->completed), // user_data + static_cast<void *>(&_mb_pool.back()->result), // user_data 0 // timeout (ms) ); _all_luts.push_back(lut); - _mrb_pool.back()->release(); } - //allocate libusb transfer structs and managed send buffers - for (size_t i = 0; i < get_num_send_frames(); i++){ - - libusb_transfer *lut = libusb_alloc_transfer(0); - UHD_ASSERT_THROW(lut != NULL); - - _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size())); - - libusb_fill_bulk_transfer( - lut, // transfer - _handle->get(), // dev_handle - (send_endpoint & 0x7f) | 0x00, // endpoint - static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer - this->get_send_frame_size(), // length - libusb_transfer_cb_fn(&libusb_async_cb), // callback - static_cast<void *>(&_msb_pool.back()->completed), // user_data - 0 // timeout - ); - - _all_luts.push_back(lut); + //initial release for all buffers + for (size_t i = 0; i < get_num_frames(); i++) + { + libusb_zero_copy_mb &mb = *(_mb_pool[i]); + if (is_recv) mb.release(); + else + { + mb.result.completed = 1; + _enqueued.push_back(&mb); + } } } - ~libusb_zero_copy_impl(void){ + ~libusb_zero_copy_single(void) + { libusb_context *ctx = libusb::session::get_global_session()->get_context(); //cancel all transfers - BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + BOOST_FOREACH(libusb_transfer *lut, _all_luts) + { libusb_cancel_transfer(lut); } //process all transfers until timeout occurs - bool completed = false; + int completed = 0; wait_for_completion(ctx, 0.01, completed); //free all transfers - BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + BOOST_FOREACH(libusb_transfer *lut, _all_luts) + { libusb_free_transfer(lut); } - } - managed_recv_buffer::sptr get_recv_buff(double timeout){ - if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; - return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); - } + template <typename buffer_type> + UHD_INLINE typename buffer_type::sptr get_buff(double timeout) + { + typename buffer_type::sptr buff; + libusb_zero_copy_mb *front = NULL; + { + boost::mutex::scoped_lock l(_mutex); + if (_enqueued.empty()) + { + _cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6))); + } + if (_enqueued.empty()) return buff; + front = _enqueued.front(); + } - managed_send_buffer::sptr get_send_buff(double timeout){ - if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; - return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); - } + buff = front->get_new<buffer_type>(timeout); - size_t get_num_recv_frames(void) const { return _num_recv_frames; } - size_t get_num_send_frames(void) const { return _num_send_frames; } + boost::mutex::scoped_lock l(_mutex); + if (buff) _enqueued.pop_front(); + this->submit_what_we_can(); + return buff; + } - size_t get_recv_frame_size(void) const { return _recv_frame_size; } - size_t get_send_frame_size(void) const { return _send_frame_size; } + UHD_INLINE size_t get_num_frames(void) const { return _num_frames; } + UHD_INLINE size_t get_frame_size(void) const { return _frame_size; } private: libusb::device_handle::sptr _handle; - const size_t _recv_frame_size, _num_recv_frames; - const size_t _send_frame_size, _num_send_frames; + const size_t _num_frames, _frame_size; //! Storage for transfer related objects - buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool; - std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool; - size_t _next_recv_buff_index, _next_send_buff_index; + buffer_pool::sptr _buffer_pool; + std::vector<boost::shared_ptr<libusb_zero_copy_mb> > _mb_pool; + + boost::mutex _mutex; + boost::condition_variable _cond; + + //! why 2 queues? there is room in the future to have > N buffers but only N in flight + boost::circular_buffer<libusb_zero_copy_mb *> _enqueued, _released; + + void enqueue_damn_buffer(libusb_zero_copy_mb *mb) + { + boost::mutex::scoped_lock l(_mutex); + _released.push_back(mb); + this->submit_what_we_can(); + l.unlock(); + _cond.notify_one(); + } + + void submit_what_we_can(void) + { + while (not _released.empty() and not _enqueued.full()) + { + _released.front()->submit(); + _enqueued.push_back(_released.front()); + _released.pop_front(); + } + } //! a list of all transfer structs we allocated std::list<libusb_transfer *> _all_luts; +}; + +/*********************************************************************** + * USB zero_copy device class + **********************************************************************/ +struct libusb_zero_copy_impl : usb_zero_copy +{ + libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + const size_t recv_interface, + const size_t recv_endpoint, + const size_t send_interface, + const size_t send_endpoint, + const device_addr_t &hints + ){ + _recv_impl.reset(new libusb_zero_copy_single( + handle, recv_interface, (recv_endpoint & 0x7f) | 0x80, + size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS)), + size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE)))); + _send_impl.reset(new libusb_zero_copy_single( + handle, send_interface, (send_endpoint & 0x7f) | 0x00, + size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)), + size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE)))); + } + + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + boost::mutex::scoped_lock l(_recv_mutex); + return _recv_impl->get_buff<managed_recv_buffer>(timeout); + } + + managed_send_buffer::sptr get_send_buff(double timeout) + { + boost::mutex::scoped_lock l(_send_mutex); + return _send_impl->get_buff<managed_send_buffer>(timeout); + } + + size_t get_num_recv_frames(void) const { return _recv_impl->get_num_frames(); } + size_t get_num_send_frames(void) const { return _send_impl->get_num_frames(); } + size_t get_recv_frame_size(void) const { return _recv_impl->get_frame_size(); } + size_t get_send_frame_size(void) const { return _send_impl->get_frame_size(); } + boost::shared_ptr<libusb_zero_copy_single> _recv_impl, _send_impl; + boost::mutex _recv_mutex, _send_mutex; }; /*********************************************************************** diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5a75d5f0d..688228e49 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -63,6 +63,8 @@ static inline void handle_overflow_nop(void){} class recv_packet_handler{ public: typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; + typedef boost::function<void(const size_t)> handle_flowctrl_type; + typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type; typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; @@ -139,6 +141,19 @@ public: _props.at(xport_chan).get_buff = get_buff; } + /*! + * Set the function to handle flow control + * \param xport_chan which transport channel + * \param handle_flowctrl the callback function + */ + void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false) + { + _props.at(xport_chan).handle_flowctrl = handle_flowctrl; + //we need the window size to be within the 0xfff (max 12 bit seq) + _props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff); + if (do_init) handle_flowctrl(0); + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_outputs = id.num_outputs; @@ -158,6 +173,21 @@ public: _converter->set_scalar(scale_factor); } + //! Set the callback to issue stream commands + void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd) + { + _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd; + } + + //! Overload call to issue stream commands + void issue_stream_cmd(const stream_cmd_t &stream_cmd) + { + for (size_t i = 0; i < _props.size(); i++) + { + if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); + } + } + /******************************************************************* * Receive: * The entry point for the fast-path receive calls. @@ -219,8 +249,11 @@ private: handle_overflow(&handle_overflow_nop) {} get_buff_type get_buff; + issue_stream_cmd_type issue_stream_cmd; size_t packet_count; handle_overflow_type handle_overflow; + handle_flowctrl_type handle_flowctrl; + size_t fc_update_window; }; std::vector<xport_chan_props_type> _props; size_t _num_outputs; @@ -302,6 +335,15 @@ private: info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + //handle flow control + if (_props[index].handle_flowctrl) + { + if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0) + { + _props[index].handle_flowctrl(info.ifpi.packet_count); + } + } + //-------------------------------------------------------------- //-- Determine return conditions: //-- The order of these checks is HOLY. @@ -314,8 +356,9 @@ private: //2) check for sequence errors #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff; const size_t expected_packet_count = _props[index].packet_count; - _props[index].packet_count = (info.ifpi.packet_count + 1)%16; + _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask; if (expected_packet_count != info.ifpi.packet_count){ return PACKET_SEQUENCE_ERROR; } @@ -459,7 +502,7 @@ private: curr_info.metadata.start_of_burst = false; curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; - UHD_MSG(fastpath) << "O"; + UHD_MSG(fastpath) << "D"; return; } @@ -479,6 +522,7 @@ private: curr_info.metadata.start_of_burst = false; curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + _props[index].handle_overflow(); return; } @@ -622,6 +666,11 @@ public: return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet); } + void issue_stream_cmd(const stream_cmd_t &stream_cmd) + { + return recv_packet_handler::issue_stream_cmd(stream_cmd); + } + private: size_t _max_num_samps; }; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 726742327..41f030ea6 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -47,6 +47,7 @@ namespace uhd{ namespace transport{ namespace sph{ class send_packet_handler{ public: typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; + typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type; typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; @@ -57,6 +58,7 @@ public: send_packet_handler(const size_t size = 1): _next_packet_seq(0) { + this->set_enable_trailer(true); this->resize(size); } @@ -96,6 +98,11 @@ public: _props.at(xport_chan).sid = sid; } + void set_enable_trailer(const bool enable) + { + _has_tlr = enable; + } + //! Set the rate of ticks per second void set_tick_rate(const double rate){ _tick_rate = rate; @@ -138,6 +145,21 @@ public: _converter->set_scalar(scale_factor); } + //! Set the callback to get async messages + void set_async_receiver(const async_receiver_type &async_receiver) + { + _async_receiver = async_receiver; + } + + //! Overload call to get async metadata + bool recv_async_msg( + uhd::async_metadata_t &async_metadata, double timeout = 0.1 + ){ + if (_async_receiver) return _async_receiver(async_metadata, timeout); + boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); + return false; + } + /******************************************************************* * Send: * The entry point for the fast-path send calls. @@ -154,7 +176,7 @@ public: if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; //if_packet_info.has_sid = false; //set per channel if_packet_info.has_cid = false; - if_packet_info.has_tlr = true; + if_packet_info.has_tlr = _has_tlr; if_packet_info.has_tsi = false; if_packet_info.has_tsf = metadata.has_time_spec; if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate); @@ -165,9 +187,12 @@ public: //TODO remove this code when sample counts of zero are supported by hardware #ifndef SSPH_DONT_PAD_TO_ONE - if (nsamps_per_buff == 0) return send_one_packet( - _zero_buffs, 1, if_packet_info, timeout - ) & 0x0; + static const boost::uint64_t zero = 0; + _zero_buffs.resize(buffs.size(), &zero); + + if (nsamps_per_buff == 0) return send_one_packet( + _zero_buffs, 1, if_packet_info, timeout + ) & 0x0; #endif return send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout); @@ -228,6 +253,8 @@ private: size_t _max_samples_per_packet; std::vector<const void *> _zero_buffs; size_t _next_packet_seq; + bool _has_tlr; + async_receiver_type _async_receiver; /******************************************************************* * Send a single packet: @@ -337,6 +364,12 @@ public: return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout); } + bool recv_async_msg( + uhd::async_metadata_t &async_metadata, double timeout = 0.1 + ){ + return send_packet_handler::recv_async_msg(async_metadata, timeout); + } + private: size_t _max_num_samps; }; diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp deleted file mode 100644 index d04244ca9..000000000 --- a/host/lib/transport/usb_zero_copy_wrapper.cpp +++ /dev/null @@ -1,231 +0,0 @@ -// -// Copyright 2011-2012 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// - -#include <uhd/transport/usb_zero_copy.hpp> -#include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/byteswap.hpp> -#include <uhd/utils/msg.hpp> -#include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> -#include <boost/foreach.hpp> -#include <boost/make_shared.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> -#include <boost/bind.hpp> -#include <vector> -#include <iostream> - -using namespace uhd; -using namespace uhd::transport; - -static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1)); - -/*********************************************************************** - * USB zero copy wrapper - managed receive buffer - **********************************************************************/ -class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{ -public: - usb_zero_copy_wrapper_mrb(void){/*NOP*/} - - void release(void){ - _mrb.reset(); //decrement ref count, other MRB's may hold a ref - _claimer.release(); - } - - UHD_INLINE sptr get_new( - managed_recv_buffer::sptr &mrb, size_t &offset_bytes, - const double timeout, size_t &index - ){ - if (not mrb or not _claimer.claim_with_wait(timeout)) return sptr(); - - index++; //advances the caller's buffer - - //hold a copy of the buffer shared pointer - _mrb = mrb; - - //extract this packet's memory address and length in bytes - char *mem = mrb->cast<char *>() + offset_bytes; - const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem); - const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header) - const size_t len = words32*sizeof(boost::uint32_t); //length in bytes - - //check if this receive buffer has been exhausted - offset_bytes += len; - if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref - else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset(); - - return make(this, mem, len); - } - -private: - managed_recv_buffer::sptr _mrb; - simple_claimer _claimer; -}; - -/*********************************************************************** - * USB zero copy wrapper - managed send buffer - **********************************************************************/ -class usb_zero_copy_wrapper_msb : public managed_send_buffer{ -public: - usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size): - _internal(internal), _fragmentation_size(fragmentation_size) - { - _ok_to_auto_flush = false; - _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this)); - } - - ~usb_zero_copy_wrapper_msb(void) - { - //ensure the task has exited before anything auto deconstructs - _task.reset(); - } - - void release(void){ - boost::mutex::scoped_lock lock(_mutex); - _ok_to_auto_flush = true; - - //get a reference to the VITA header before incrementing - const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0]; - - _bytes_in_buffer += size(); - _mem_buffer_tip += size(); - - //extract VITA end of packet flag, we must force flush under eof conditions - const bool eop = (uhd::wtohx(vita_header) & (0x1 << 24)) != 0; - const bool full = _bytes_in_buffer >= (_last_send_buff->size() - _fragmentation_size); - if (eop or full){ - _last_send_buff->commit(_bytes_in_buffer); - _last_send_buff.reset(); - - //notify the auto-flusher to restart its timed_wait - lock.unlock(); _cond.notify_one(); - } - } - - UHD_INLINE sptr get_new(const double timeout){ - boost::mutex::scoped_lock lock(_mutex); - _ok_to_auto_flush = false; - - if (not _last_send_buff){ - _last_send_buff = _internal->get_send_buff(timeout); - if (not _last_send_buff) return sptr(); - _mem_buffer_tip = _last_send_buff->cast<char *>(); - _bytes_in_buffer = 0; - } - - return make(this, _mem_buffer_tip, _fragmentation_size); - } - -private: - usb_zero_copy::sptr _internal; - const size_t _fragmentation_size; - managed_send_buffer::sptr _last_send_buff; - size_t _bytes_in_buffer; - char *_mem_buffer_tip; - - //private variables for auto flusher - boost::mutex _mutex; - boost::condition_variable _cond; - uhd::task::sptr _task; - bool _ok_to_auto_flush; - - /*! - * The auto flusher ensures that buffers are force committed when - * the user has not called get_new() within a certain time window. - */ - void auto_flush(void) - { - boost::mutex::scoped_lock lock(_mutex); - const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT); - if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0) - { - _last_send_buff->commit(_bytes_in_buffer); - _last_send_buff.reset(); - } - } -}; - -/*********************************************************************** - * USB zero copy wrapper implementation - **********************************************************************/ -class usb_zero_copy_wrapper : public usb_zero_copy{ -public: - usb_zero_copy_wrapper(sptr usb_zc, const size_t frame_boundary): - _internal_zc(usb_zc), - _frame_boundary(frame_boundary), - _next_recv_buff_index(0) - { - for (size_t i = 0; i < this->get_num_recv_frames(); i++){ - _mrb_pool.push_back(boost::make_shared<usb_zero_copy_wrapper_mrb>()); - } - _the_only_msb = boost::make_shared<usb_zero_copy_wrapper_msb>(usb_zc, frame_boundary); - } - - managed_recv_buffer::sptr get_recv_buff(double timeout){ - //attempt to get a managed recv buffer - if (not _last_recv_buff){ - _last_recv_buff = _internal_zc->get_recv_buff(timeout); - _last_recv_offset = 0; //reset offset into buffer - } - - //get the buffer to be returned to the user - if (_next_recv_buff_index == _mrb_pool.size()) _next_recv_buff_index = 0; - return _mrb_pool[_next_recv_buff_index]->get_new( - _last_recv_buff, _last_recv_offset, timeout, _next_recv_buff_index - ); - } - - size_t get_num_recv_frames(void) const{ - return _internal_zc->get_num_recv_frames(); - } - - size_t get_recv_frame_size(void) const{ - return std::min(_frame_boundary, _internal_zc->get_recv_frame_size()); - } - - managed_send_buffer::sptr get_send_buff(double timeout){ - return _the_only_msb->get_new(timeout); - } - - size_t get_num_send_frames(void) const{ - return _internal_zc->get_num_send_frames(); - } - - size_t get_send_frame_size(void) const{ - return std::min(_frame_boundary, _internal_zc->get_send_frame_size()); - } - -private: - sptr _internal_zc; - size_t _frame_boundary; - std::vector<boost::shared_ptr<usb_zero_copy_wrapper_mrb> > _mrb_pool; - boost::shared_ptr<usb_zero_copy_wrapper_msb> _the_only_msb; - - //state for last recv buffer to create multiple managed buffers - managed_recv_buffer::sptr _last_recv_buff; - size_t _last_recv_offset; - size_t _next_recv_buff_index; -}; - -/*********************************************************************** - * USB zero copy wrapper factory function - **********************************************************************/ -usb_zero_copy::sptr usb_zero_copy::make_wrapper( - sptr usb_zc, size_t usb_frame_boundary -){ - return sptr(new usb_zero_copy_wrapper(usb_zc, usb_frame_boundary)); -} |