aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt9
-rw-r--r--host/lib/transport/convert_types_impl.hpp20
-rwxr-xr-xhost/lib/transport/gen_convert_types.py10
-rw-r--r--host/lib/transport/libusb1_base.cpp299
-rw-r--r--host/lib/transport/libusb1_base.hpp177
-rw-r--r--host/lib/transport/libusb1_control.cpp75
-rw-r--r--host/lib/transport/libusb1_device_handle.cpp117
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp745
-rw-r--r--host/lib/transport/msvc/stdint.h35
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp293
-rw-r--r--host/lib/transport/usb_dummy_impl.cpp39
-rw-r--r--host/lib/transport/vrt_packet_handler.hpp107
-rw-r--r--host/lib/transport/zero_copy.cpp121
13 files changed, 1016 insertions, 1031 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 96ef8e505..b95d46381 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -30,9 +30,16 @@ IF(LIBUSB_FOUND)
${CMAKE_SOURCE_DIR}/lib/transport/libusb1_control.cpp
${CMAKE_SOURCE_DIR}/lib/transport/libusb1_zero_copy.cpp
${CMAKE_SOURCE_DIR}/lib/transport/libusb1_base.cpp
- ${CMAKE_SOURCE_DIR}/lib/transport/libusb1_device_handle.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/libusb1_base.hpp
)
+ IF(MSVC) #include our custom stdint for libusb
+ INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/lib/transport/msvc)
+ ENDIF(MSVC)
SET(HAVE_USB_SUPPORT TRUE)
+ELSE(LIBUSB_FOUND)
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_SOURCE_DIR}/lib/transport/usb_dummy_impl.cpp
+ )
ENDIF(LIBUSB_FOUND)
IF(HAVE_USB_SUPPORT)
diff --git a/host/lib/transport/convert_types_impl.hpp b/host/lib/transport/convert_types_impl.hpp
index b9a47eb0e..48ff99725 100644
--- a/host/lib/transport/convert_types_impl.hpp
+++ b/host/lib/transport/convert_types_impl.hpp
@@ -266,6 +266,26 @@ static UHD_INLINE void item32_to_fc32_nswap(
}
}
+#elif defined(USE_ARM_NEON_H)
+static UHD_INLINE void item32_to_fc32_nswap(
+ const item32_t *input, fc32_t *output, size_t nsamps)
+{
+ size_t i;
+
+ float32x4_t Q1 = vdupq_n_f32(floats_per_short);
+ for (i=0; i < (nsamps & ~0x03); i+=2) {
+ int16x4_t D0 = vld1_s16(reinterpret_cast<const int16_t *>(&input[i]));
+ int16x4_t D1 = vrev32_s16(D0);
+ int32x4_t Q2 = vmovl_s16(D1);
+ float32x4_t Q3 = vcvtq_f32_s32(Q2);
+ float32x4_t Q4 = vmulq_f32(Q3, Q1);
+ vst1q_f32((reinterpret_cast<float *>(&output[i])), Q4);
+ }
+
+ for (; i < nsamps; i++)
+ output[i] = item32_to_fc32(input[i]);
+}
+
#else
static UHD_INLINE void item32_to_fc32_nswap(
const item32_t *input, fc32_t *output, size_t nsamps
diff --git a/host/lib/transport/gen_convert_types.py b/host/lib/transport/gen_convert_types.py
index adbd22868..f9509c81d 100755
--- a/host/lib/transport/gen_convert_types.py
+++ b/host/lib/transport/gen_convert_types.py
@@ -99,9 +99,9 @@ void transport::convert_io_type_to_otw_type(
nsamps_per_io_buff
);
#else
- for (size_t i = 0; i < nsamps_per_io_buff; i++){
+ for (size_t i = 0, j = 0; i < nsamps_per_io_buff; i++){
#for $j in range($num_chans)
- reinterpret_cast<$(out_type)_t *>(otw_buff)[i*$num_chans + $j] =
+ reinterpret_cast<$(out_type)_t *>(otw_buff)[j++] =
#if $ph.get_swap_type($pred) == 'bswap'
uhd::byteswap($(converter)(reinterpret_cast<const $(in_type)_t *>(io_buffs[$j])[i]));
#else
@@ -139,13 +139,13 @@ void transport::convert_otw_type_to_io_type(
nsamps_per_io_buff
);
#else
- for (size_t i = 0; i < nsamps_per_io_buff; i++){
+ for (size_t i = 0, j = 0; i < nsamps_per_io_buff; i++){
#for $j in range($num_chans)
reinterpret_cast<$(out_type)_t *>(io_buffs[$j])[i] =
#if $ph.get_swap_type($pred) == 'bswap'
- $(converter)(uhd::byteswap(reinterpret_cast<const $(in_type)_t *>(otw_buff)[i*$num_chans + $j]));
+ $(converter)(uhd::byteswap(reinterpret_cast<const $(in_type)_t *>(otw_buff)[j++]));
#else
- $(converter)(reinterpret_cast<const $(in_type)_t *>(otw_buff)[i*$num_chans + $j]);
+ $(converter)(reinterpret_cast<const $(in_type)_t *>(otw_buff)[j++]);
#end if
#end for
}
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index e21c39aa3..cfa77d9ca 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -17,110 +17,249 @@
#include "libusb1_base.hpp"
#include <uhd/utils/assert.hpp>
+#include <uhd/types/dict.hpp>
+#include <boost/weak_ptr.hpp>
+#include <boost/foreach.hpp>
#include <iostream>
+using namespace uhd;
using namespace uhd::transport;
-/**********************************************************
- * Helper Methods
- **********************************************************/
+/***********************************************************************
+ * libusb session
+ **********************************************************************/
+class libusb_session_impl : public libusb::session{
+public:
+ libusb_session_impl(void){
+ UHD_ASSERT_THROW(libusb_init(&_context) == 0);
+ libusb_set_debug(_context, debug_level);
+ }
+
+ ~libusb_session_impl(void){
+ libusb_exit(_context);
+ }
+
+ libusb_context *get_context(void) const{
+ return _context;
+ }
+
+private:
+ libusb_context *_context;
+};
+
+libusb::session::sptr libusb::session::get_global_session(void){
+ static boost::weak_ptr<session> global_session;
-/**********************************************************
- * libusb namespace
- **********************************************************/
-void libusb::init(libusb_context **ctx, int debug_level)
-{
- if (libusb_init(ctx) < 0)
- std::cerr << "error: libusb_init" << std::endl;
+ //not expired -> get existing session
+ if (not global_session.expired()) return global_session.lock();
- libusb_set_debug(*ctx, debug_level);
+ //create a new global session
+ sptr new_global_session(new libusb_session_impl());
+ global_session = new_global_session;
+ return new_global_session;
}
-libusb_device_handle *libusb::open_device(libusb_context *ctx,
- usb_device_handle::sptr handle)
-{
- libusb_device_handle *dev_handle = NULL;
- libusb_device **libusb_dev_list;
- size_t dev_cnt = libusb_get_device_list(ctx, &libusb_dev_list);
-
- //find and open the USB device
- for (size_t i = 0; i < dev_cnt; i++) {
- libusb_device *dev = libusb_dev_list[i];
-
- if (compare_device(dev, handle)) {
- libusb_open(dev, &dev_handle);
- libusb_unref_device(dev);
- break;
- }
-
- libusb_unref_device(dev);
+/***********************************************************************
+ * libusb device
+ **********************************************************************/
+class libusb_device_impl : public libusb::device{
+public:
+ libusb_device_impl(libusb_device *dev){
+ _session = libusb::session::get_global_session();
+ _dev = dev;
}
- return dev_handle;
-}
+ ~libusb_device_impl(void){
+ libusb_unref_device(this->get());
+ }
+
+ libusb_device *get(void) const{
+ return _dev;
+ }
+
+private:
+ libusb::session::sptr _session; //always keep a reference to session
+ libusb_device *_dev;
+};
+
+/***********************************************************************
+ * libusb device list
+ **********************************************************************/
+class libusb_device_list_impl : public libusb::device_list{
+public:
+ libusb_device_list_impl(void){
+ libusb::session::sptr sess = libusb::session::get_global_session();
-//note: changed order of checks so it only tries to get_serial and get_device_address if vid and pid match
-//doing this so it doesn't try to open the device if it's not ours
-bool libusb::compare_device(libusb_device *dev,
- usb_device_handle::sptr handle)
-{
- std::string serial = handle->get_serial();
- boost::uint16_t vendor_id = handle->get_vendor_id();
- boost::uint16_t product_id = handle->get_product_id();
- boost::uint8_t device_addr = handle->get_device_addr();
-
- libusb_device_descriptor libusb_desc;
- if (libusb_get_device_descriptor(dev, &libusb_desc) < 0)
- return false;
- if (vendor_id != libusb_desc.idVendor)
- return false;
- if (product_id != libusb_desc.idProduct)
- return false;
- if (serial != get_serial(dev))
- return false;
- if (device_addr != libusb_get_device_address(dev))
- return false;
-
- return true;
+ //allocate a new list of devices
+ libusb_device** dev_list;
+ ssize_t ret = libusb_get_device_list(sess->get_context(), &dev_list);
+ if (ret < 0) throw std::runtime_error("cannot enumerate usb devices");
+
+ //fill the vector of device references
+ for (size_t i = 0; i < size_t(ret); i++) _devs.push_back(
+ libusb::device::sptr(new libusb_device_impl(dev_list[i]))
+ );
+
+ //free the device list but dont unref (done in ~device)
+ libusb_free_device_list(dev_list, false/*dont unref*/);
+ }
+
+ size_t size(void) const{
+ return _devs.size();
+ }
+
+ libusb::device::sptr at(size_t i) const{
+ return _devs.at(i);
+ }
+
+private:
+ std::vector<libusb::device::sptr> _devs;
+};
+
+libusb::device_list::sptr libusb::device_list::make(void){
+ return sptr(new libusb_device_list_impl());
}
+/***********************************************************************
+ * libusb device descriptor
+ **********************************************************************/
+class libusb_device_descriptor_impl : public libusb::device_descriptor{
+public:
+ libusb_device_descriptor_impl(libusb::device::sptr dev){
+ _dev = dev;
+ UHD_ASSERT_THROW(libusb_get_device_descriptor(_dev->get(), &_desc) == 0);
+ }
-bool libusb::open_interface(libusb_device_handle *dev_handle,
- int interface)
-{
- int ret = libusb_claim_interface(dev_handle, interface);
- if (ret < 0) {
- std::cerr << "error: libusb_claim_interface() " << ret << std::endl;
- return false;
+ const libusb_device_descriptor &get(void) const{
+ return _desc;
}
- else {
- return true;
+
+ std::string get_ascii_serial(void) const{
+ if (this->get().iSerialNumber == 0) return "";
+
+ libusb::device_handle::sptr handle(
+ libusb::device_handle::get_cached_handle(_dev)
+ );
+
+ unsigned char buff[512];
+ ssize_t ret = libusb_get_string_descriptor_ascii(
+ handle->get(), this->get().iSerialNumber, buff, sizeof(buff)
+ );
+ if (ret < 0) return ""; //on error, just return empty string
+
+ return std::string((char *)buff, ret);
}
+
+private:
+ libusb::device::sptr _dev; //always keep a reference to device
+ libusb_device_descriptor _desc;
+};
+
+libusb::device_descriptor::sptr libusb::device_descriptor::make(device::sptr dev){
+ return sptr(new libusb_device_descriptor_impl(dev));
}
+/***********************************************************************
+ * libusb device handle
+ **********************************************************************/
+class libusb_device_handle_impl : public libusb::device_handle{
+public:
+ libusb_device_handle_impl(libusb::device::sptr dev){
+ _dev = dev;
+ UHD_ASSERT_THROW(libusb_open(_dev->get(), &_handle) == 0);
+ }
+
+ ~libusb_device_handle_impl(void){
+ //release all claimed interfaces
+ for (size_t i = 0; i < _claimed.size(); i++){
+ libusb_release_interface(this->get(), _claimed[i]);
+ }
+ libusb_close(_handle);
+ }
+
+ libusb_device_handle *get(void) const{
+ return _handle;
+ }
+
+ void claim_interface(int interface){
+ UHD_ASSERT_THROW(libusb_claim_interface(this->get(), interface) == 0);
+ _claimed.push_back(interface);
+ }
+
+private:
+ libusb::device::sptr _dev; //always keep a reference to device
+ libusb_device_handle *_handle;
+ std::vector<int> _claimed;
+};
+
+libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::sptr dev){
+ static uhd::dict<libusb_device *, boost::weak_ptr<device_handle> > handles;
+
+ //not expired -> get existing handle
+ if (handles.has_key(dev->get()) and not handles[dev->get()].expired()){
+ return handles[dev->get()].lock();
+ }
+
+ //create a new cached handle
+ try{
+ sptr new_handle(new libusb_device_handle_impl(dev));
+ handles[dev->get()] = new_handle;
+ return new_handle;
+ }
+ catch(const std::exception &e){
+ std::cerr << "USB open failed: see the application notes for your device." << std::endl;
+ throw std::runtime_error(e.what());
+ }
+}
-std::string libusb::get_serial(libusb_device *dev)
-{
- unsigned char buff[32];
+/***********************************************************************
+ * libusb special handle
+ **********************************************************************/
+class libusb_special_handle_impl : public libusb::special_handle{
+public:
+ libusb_special_handle_impl(libusb::device::sptr dev){
+ _dev = dev;
+ }
- libusb_device_descriptor desc;
- if (libusb_get_device_descriptor(dev, &desc) < 0)
- return "";
+ libusb::device::sptr get_device(void) const{
+ return _dev;
+ }
- if (desc.iSerialNumber == 0)
- return "";
+ std::string get_serial(void) const{
+ return libusb::device_descriptor::make(this->get_device())->get_ascii_serial();
+ }
- //open the device because we have to
- libusb_device_handle *dev_handle;
- if (libusb_open(dev, &dev_handle) < 0)
- return "";
+ boost::uint16_t get_vendor_id(void) const{
+ return libusb::device_descriptor::make(this->get_device())->get().idVendor;
+ }
- if (libusb_get_string_descriptor_ascii(dev_handle, desc.iSerialNumber,
- buff, sizeof(buff)) < 0) {
- return "";
+ boost::uint16_t get_product_id(void) const{
+ return libusb::device_descriptor::make(this->get_device())->get().idProduct;
}
- libusb_close(dev_handle);
+private:
+ libusb::device::sptr _dev; //always keep a reference to device
+};
+
+libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){
+ return sptr(new libusb_special_handle_impl(dev));
+}
+
+/***********************************************************************
+ * list device handles implementations
+ **********************************************************************/
+std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(
+ boost::uint16_t vid, boost::uint16_t pid
+){
+ std::vector<usb_device_handle::sptr> handles;
+
+ libusb::device_list::sptr dev_list = libusb::device_list::make();
+ for (size_t i = 0; i < dev_list->size(); i++){
+ usb_device_handle::sptr handle = libusb::special_handle::make(dev_list->at(i));
+ if (handle->get_vendor_id() == vid and handle->get_product_id() == pid){
+ handles.push_back(handle);
+ }
+ }
- return (char*) buff;
+ return handles;
}
diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp
index abe5e22a2..04c1d6574 100644
--- a/host/lib/transport/libusb1_base.hpp
+++ b/host/lib/transport/libusb1_base.hpp
@@ -15,80 +15,135 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
-#ifndef INCLUDED_TRANSPORT_LIBUSB_HPP
-#define INCLUDED_TRANSPORT_LIBUSB_HPP
+#ifndef INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP
#include <uhd/config.hpp>
+#include <boost/utility.hpp>
+#include <boost/shared_ptr.hpp>
#include <uhd/transport/usb_device_handle.hpp>
-#include <libusb-1.0/libusb.h>
+#include <libusb.h>
+/***********************************************************************
+ * Libusb object oriented smart pointer wrappers:
+ * The following wrappers provide allocation and automatic deallocation
+ * for various libusb data types and handles. The construction routines
+ * also store tables of already allocated structures to avoid multiple
+ * occurrences of opened handles (for example).
+ **********************************************************************/
namespace uhd { namespace transport {
namespace libusb {
- /*
- * Initialize libusb and set debug level
- * Takes a pointer to context pointer because that's
- * how libusb rolls. Debug levels.
- *
- * Level 0: no messages ever printed by the library (default)
- * Level 1: error messages are printed to stderr
- * Level 2: warning and error messages are printed to stderr
- * Level 3: informational messages are printed to stdout, warning
- * and error messages are printed to stderr
- *
- * \param ctx pointer to context pointer
- * \param debug_level
+
+ /*!
+ * This session class holds a global libusb context for this process.
+ * The get global session call will create a new context if none exists.
+ * When all references to session are destroyed, the context will be freed.
*/
- void init(libusb_context **ctx, int debug_level);
-
- /*
- * Open the device specified by a generic handle
- * Find the libusb_device cooresponding to the generic handle
- * and open it for I/O, which returns a libusb_device_handle
- * ready for an interface
- * \param ctx the libusb context used for init
- * \return a libusb_device_handle ready for action
+ class session : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<session> sptr;
+
+ /*!
+ * Level 0: no messages ever printed by the library (default)
+ * Level 1: error messages are printed to stderr
+ * Level 2: warning and error messages are printed to stderr
+ * Level 3: informational messages are printed to stdout, warning
+ * and error messages are printed to stderr
+ */
+ static const int debug_level = 0;
+
+ //! get a shared pointer to the global session
+ static sptr get_global_session(void);
+
+ //! get the underlying libusb context pointer
+ virtual libusb_context *get_context(void) const = 0;
+ };
+
+ /*!
+ * Holds a device pointer with a reference to the session.
*/
- libusb_device_handle *open_device(libusb_context *ctx,
- usb_device_handle::sptr handle);
-
- /*
- * Compare a libusb device with a generic handle
- * Check the descriptors and open the device to check the
- * serial number string. Compare values against the given
- * handle. The libusb context is already implied in the
- * libusb_device.
- * \param dev a libusb_device pointer
- * \param handle a generic handle specifier
- * \return true if handle and device match, false otherwise
+ class device : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device> sptr;
+
+ //! get the underlying device pointer
+ virtual libusb_device *get(void) const = 0;
+ };
+
+ /*!
+ * This device list class holds a device list that will be
+ * automatically freed when the last reference is destroyed.
*/
- bool compare_device(libusb_device *dev, usb_device_handle::sptr handle);
-
- /*
- * Open an interface to the device
- * This is a logical operation for operating system housekeeping as
- * nothing is sent over the bus. The interface much correspond
- * to the USB device descriptors.
- * \param dev_handle libusb handle to an opened device
- * \param interface integer of the interface to use
- * \return true on success, false on error
+ class device_list : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device_list> sptr;
+
+ //! make a new device list
+ static sptr make(void);
+
+ //! the number of devices in this list
+ virtual size_t size() const = 0;
+
+ //! get the device pointer at a particular index
+ virtual device::sptr at(size_t index) const = 0;
+ };
+
+ /*!
+ * Holds a device descriptor and a reference to the device.
*/
- bool open_interface(libusb_device_handle *dev_handle, int interface);
-
- /*
- * Get serial number
- * The standard USB device descriptor contains an index to an
- * actual serial number string descriptor. The index is readily
- * readble, but the string descriptor requires probing the device.
- * Because this call attempts to open the device, it may not
- * succeed because not all USB devices are readily opened.
- * The default language is used for the request (English).
- * \param dev a libusb_device pointer
- * \return string serial number or 0 on error or unavailablity
+ class device_descriptor : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device_descriptor> sptr;
+
+ //! make a new descriptor from a device reference
+ static sptr make(device::sptr);
+
+ //! get the underlying device descriptor
+ virtual const libusb_device_descriptor &get(void) const = 0;
+
+ virtual std::string get_ascii_serial(void) const = 0;
+ };
+
+ /*!
+ * Holds a device handle and a reference to the device.
*/
- std::string get_serial(libusb_device *dev);
+ class device_handle : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device_handle> sptr;
+
+ //! get a cached handle or make a new one given the device
+ static sptr get_cached_handle(device::sptr);
+
+ //! get the underlying device handle
+ virtual libusb_device_handle *get(void) const = 0;
+
+ /*!
+ * Open USB interfaces for control using magic value
+ * IN interface: 2
+ * OUT interface: 1
+ * Control interface: 0
+ */
+ virtual void claim_interface(int) = 0;
+ };
+
+ /*!
+ * The special handle is our internal implementation of the
+ * usb device handle which is used publicly to identify a device.
+ */
+ class special_handle : public usb_device_handle {
+ public:
+ typedef boost::shared_ptr<special_handle> sptr;
+
+ //! make a new special handle from device
+ static sptr make(device::sptr);
+
+ //! get the underlying device reference
+ virtual device::sptr get_device(void) const = 0;
+ };
+
}
}} //namespace
-#endif /* INCLUDED_TRANSPORT_LIBUSB_HPP */
+#endif /* INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP */
diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp
index 3531128b2..f903907d0 100644
--- a/host/lib/transport/libusb1_control.cpp
+++ b/host/lib/transport/libusb1_control.cpp
@@ -20,7 +20,6 @@
using namespace uhd::transport;
-const int libusb_debug_level = 0;
const int libusb_timeout = 0;
/***********************************************************************
@@ -28,68 +27,38 @@ const int libusb_timeout = 0;
**********************************************************************/
class libusb_control_impl : public usb_control {
public:
- libusb_control_impl(usb_device_handle::sptr handle);
- ~libusb_control_impl();
+ libusb_control_impl(libusb::device_handle::sptr handle):
+ _handle(handle)
+ {
+ _handle->claim_interface(0 /* control interface */);
+ }
- size_t submit(boost::uint8_t request_type,
+ ssize_t submit(boost::uint8_t request_type,
boost::uint8_t request,
boost::uint16_t value,
boost::uint16_t index,
unsigned char *buff,
- boost::uint16_t length);
+ boost::uint16_t length
+ ){
+ return libusb_control_transfer(_handle->get(),
+ request_type,
+ request,
+ value,
+ index,
+ buff,
+ length,
+ libusb_timeout);
+ }
private:
- libusb_context *_ctx;
- libusb_device_handle *_dev_handle;
+ libusb::device_handle::sptr _handle;
};
-
-libusb_control_impl::libusb_control_impl(usb_device_handle::sptr handle)
-{
- libusb::init(&_ctx, libusb_debug_level);
-
- // Find and open the libusb_device corresponding to the
- // given handle and return the libusb_device_handle
- // that can be used for I/O purposes.
- _dev_handle = libusb::open_device(_ctx, handle);
-
- // Open USB interfaces for control using magic value
- // IN interface: 2
- // OUT interface: 1
- // Control interface: 0
- libusb::open_interface(_dev_handle, 0);
-}
-
-
-libusb_control_impl::~libusb_control_impl()
-{
- libusb_close(_dev_handle);
- libusb_exit(_ctx);
-}
-
-
-size_t libusb_control_impl::submit(boost::uint8_t request_type,
- boost::uint8_t request,
- boost::uint16_t value,
- boost::uint16_t index,
- unsigned char *buff,
- boost::uint16_t length)
-{
- return libusb_control_transfer(_dev_handle,
- request_type,
- request,
- value,
- index,
- buff,
- length,
- libusb_timeout);
-}
-
-
/***********************************************************************
* USB control public make functions
**********************************************************************/
-usb_control::sptr usb_control::make(usb_device_handle::sptr handle)
-{
- return sptr(new libusb_control_impl(handle));
+usb_control::sptr usb_control::make(usb_device_handle::sptr handle){
+ return sptr(new libusb_control_impl(libusb::device_handle::get_cached_handle(
+ boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()
+ )));
}
diff --git a/host/lib/transport/libusb1_device_handle.cpp b/host/lib/transport/libusb1_device_handle.cpp
deleted file mode 100644
index 43d0f0e26..000000000
--- a/host/lib/transport/libusb1_device_handle.cpp
+++ /dev/null
@@ -1,117 +0,0 @@
-//
-// Copyright 2010 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#include "libusb1_base.hpp"
-#include <uhd/utils/assert.hpp>
-#include <iostream>
-
-using namespace uhd::transport;
-
-const int libusb_debug_level = 0;
-
-/****************************************************************
- * libusb USB device handle implementation class
- ***************************************************************/
-class libusb1_device_handle_impl : public usb_device_handle {
-public:
- libusb1_device_handle_impl(std::string serial,
- boost::uint32_t product_id,
- boost::uint32_t vendor_id,
- boost::uint32_t device_addr)
- : _serial(serial), _product_id(product_id),
- _vendor_id(vendor_id), _device_addr(device_addr)
- {
- /* NOP */
- }
-
- ~libusb1_device_handle_impl()
- {
- /* NOP */
- }
-
- std::string get_serial() const
- {
- return _serial;
- }
-
- boost::uint16_t get_vendor_id() const
- {
- return _vendor_id;
- }
-
-
- boost::uint16_t get_product_id() const
- {
- return _product_id;
- }
-
- boost::uint16_t get_device_addr() const
- {
- return _device_addr;
- }
-
-private:
- std::string _serial;
- boost::uint32_t _product_id;
- boost::uint32_t _vendor_id;
- boost::uint32_t _device_addr;
-};
-
-
-usb_device_handle::sptr make_usb_device_handle(libusb_device *dev)
-{
- libusb_device_descriptor desc;
-
- if (libusb_get_device_descriptor(dev, &desc) < 0) {
- UHD_ASSERT_THROW("USB: failed to get device descriptor");
- }
-
- std::string serial = libusb::get_serial(dev);
- boost::uint32_t product_id = desc.idProduct;
- boost::uint32_t vendor_id = desc.idVendor;
- boost::uint32_t device_addr = libusb_get_device_address(dev);
-
- return usb_device_handle::sptr(new libusb1_device_handle_impl(
- serial,
- product_id,
- vendor_id,
- device_addr));
-}
-
-std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::uint16_t vid, boost::uint16_t pid)
-{
- libusb_context *ctx = NULL;
- libusb_device** libusb_device_list;
- std::vector<usb_device_handle::sptr> device_handle_list;
- libusb_device_descriptor desc;
-
- libusb::init(&ctx, libusb_debug_level);
-
- size_t dev_size = libusb_get_device_list(ctx, &libusb_device_list);
- for (size_t i = 0; i < dev_size; i++) {
- libusb_device *dev = libusb_device_list[i];
- if(libusb_get_device_descriptor(dev, &desc) < 0) {
- UHD_ASSERT_THROW("USB: failed to get device descriptor");
- }
- if(desc.idVendor == vid && desc.idProduct == pid) {
- device_handle_list.push_back(make_usb_device_handle(dev));
- }
- }
-
- libusb_exit(ctx);
- return device_handle_list;
-}
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index b890a87f9..f589d7c77 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -17,16 +17,22 @@
#include "libusb1_base.hpp"
#include <uhd/transport/usb_zero_copy.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
-#include <boost/asio.hpp>
-#include <boost/format.hpp>
+#include <boost/shared_array.hpp>
+#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <vector>
#include <iostream>
-#include <iomanip>
+using namespace uhd;
using namespace uhd::transport;
-const int libusb_debug_level = 0;
-const int libusb_timeout = 0;
+static const double CLEANUP_TIMEOUT = 0.2; //seconds
+static const size_t DEFAULT_NUM_XFERS = 16; //num xfers
+static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
/***********************************************************************
* Helper functions
@@ -54,56 +60,57 @@ void pp_transfer(libusb_transfer *lut)
* create a bidirectional interface. It is a zero copy implementation
* with respect to libusb, however, each send and recv requires a copy
* operation from kernel to userspace; this is due to the usbfs
- * interface provided by the kernel.
+ * interface provided by the kernel.
**********************************************************************/
class usb_endpoint {
-private:
- libusb_device_handle *_dev_handle;
- libusb_context *_ctx;
- int _endpoint;
- bool _input;
-
- size_t _transfer_size;
- size_t _num_transfers;
-
- // Transfer state lists (transfers are free, pending, or completed)
- std::list<libusb_transfer *> _free_list;
- std::list<libusb_transfer *> _pending_list;
- std::list<libusb_transfer *> _completed_list;
-
- // Calls for processing asynchronous I/O
- libusb_transfer *allocate_transfer(int buff_len);
- bool cancel(libusb_transfer *lut);
- bool cancel_all();
- bool reap_pending_list();
- bool reap_pending_list_timeout();
- bool reap_completed_list();
-
- // Transfer state manipulators
- void free_list_add(libusb_transfer *lut);
- void pending_list_add(libusb_transfer *lut);
- void completed_list_add(libusb_transfer *lut);
- libusb_transfer *free_list_get();
- libusb_transfer *completed_list_get();
- bool pending_list_remove(libusb_transfer *lut);
-
- // Debug use
- void print_transfer_status(libusb_transfer *lut);
-
public:
- usb_endpoint(libusb_device_handle *dev_handle,
- libusb_context *ctx, int endpoint, bool input,
- size_t transfer_size, size_t num_transfers);
+ typedef boost::shared_ptr<usb_endpoint> sptr;
- ~usb_endpoint();
+ usb_endpoint(
+ libusb::device_handle::sptr handle,
+ int endpoint,
+ bool input,
+ size_t transfer_size,
+ size_t num_transfers
+ );
+
+ ~usb_endpoint(void);
// Exposed interface for submitting / retrieving transfer buffers
- bool submit(libusb_transfer *lut);
- libusb_transfer *get_completed_transfer();
- libusb_transfer *get_free_transfer();
+
+ //! Submit a new transfer that was presumably just filled or emptied.
+ void submit(libusb_transfer *lut);
+
+ /*!
+ * Get an available transfer:
+ * For inputs, this is a just filled transfer.
+ * For outputs, this is a just emptied transfer.
+ * \param timeout the timeout to wait for a lut
+ * \return the transfer pointer or NULL if timeout
+ */
+ libusb_transfer *get_lut_with_wait(double timeout);
//Callback use only
void callback_handle_transfer(libusb_transfer *lut);
+
+private:
+ libusb::device_handle::sptr _handle;
+ int _endpoint;
+ bool _input;
+
+ //! hold a bounded buffer of completed transfers
+ typedef bounded_buffer<libusb_transfer *> lut_buff_type;
+ lut_buff_type::sptr _completed_list;
+
+ //! a list of all transfer structs we allocated
+ std::vector<libusb_transfer *> _all_luts;
+
+ //! a block of memory for the transfer buffers
+ boost::shared_array<char> _buffer;
+
+ // Calls for processing asynchronous I/O
+ libusb_transfer *allocate_transfer(void *mem, size_t len);
+ void print_transfer_status(libusb_transfer *lut);
};
@@ -116,9 +123,8 @@ public:
* it from the pending to completed status list.
* \param lut pointer to libusb_transfer
*/
-static void callback(libusb_transfer *lut)
-{
- usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;
+static void callback(libusb_transfer *lut){
+ usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;
endpoint->callback_handle_transfer(lut);
}
@@ -127,14 +133,9 @@ static void callback(libusb_transfer *lut)
* Accessor call to allow list access from callback space
* \param pointer to libusb_transfer
*/
-void usb_endpoint::callback_handle_transfer(libusb_transfer *lut)
-{
- if (!pending_list_remove(lut)) {
- std::cerr << "USB: pending remove failed" << std::endl;
- return;
- }
-
- completed_list_add(lut);
+void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _completed_list->push_with_wait(lut);
}
@@ -142,21 +143,28 @@ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut)
* Constructor
* Allocate libusb transfers and mark as free. For IN endpoints,
* submit the transfers so that they're ready to return when
- * data is available.
+ * data is available.
*/
-usb_endpoint::usb_endpoint(libusb_device_handle *dev_handle,
- libusb_context *ctx, int endpoint, bool input,
- size_t transfer_size, size_t num_transfers)
- : _dev_handle(dev_handle),
- _ctx(ctx), _endpoint(endpoint), _input(input),
- _transfer_size(transfer_size), _num_transfers(num_transfers)
+usb_endpoint::usb_endpoint(
+ libusb::device_handle::sptr handle,
+ int endpoint,
+ bool input,
+ size_t transfer_size,
+ size_t num_transfers
+):
+ _handle(handle),
+ _endpoint(endpoint),
+ _input(input)
{
- unsigned int i;
- for (i = 0; i < _num_transfers; i++) {
- free_list_add(allocate_transfer(_transfer_size));
+ _completed_list = lut_buff_type::make(num_transfers);
+ _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]);
+ for (size_t i = 0; i < num_transfers; i++){
+ _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size));
- if (_input)
- submit(free_list_get());
+ //input luts are immediately submitted to be filled
+ //output luts go into the completed list as free buffers
+ if (_input) this->submit(_all_luts.back());
+ else _completed_list->push_with_wait(_all_luts.back());
}
}
@@ -168,47 +176,44 @@ usb_endpoint::usb_endpoint(libusb_device_handle *dev_handle,
* the transfers. Libusb will deallocate the data buffer held by
* each transfer.
*/
-usb_endpoint::~usb_endpoint()
-{
- cancel_all();
-
- while (!_pending_list.empty()) {
- if (!reap_pending_list())
- std::cerr << "error: destructor failed to reap" << std::endl;
+usb_endpoint::~usb_endpoint(void){
+ //cancel all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_cancel_transfer(lut);
}
- while (!_completed_list.empty()) {
- if (!reap_completed_list())
- std::cerr << "error: destructor failed to reap" << std::endl;
- }
+ //collect canceled transfers (drain the queue)
+ while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){};
- while (!_free_list.empty()) {
- libusb_free_transfer(free_list_get());
+ //free all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_free_transfer(lut);
}
}
/*
- * Allocate a libusb transfer
+ * Allocate a libusb transfer
* The allocated transfer - and buffer it contains - is repeatedly
* submitted, reaped, and reused and should not be freed until shutdown.
- * \param buff_len size of the individual buffer held by each transfer
+ * \param mem a pointer to the buffer memory
+ * \param len size of the individual buffer
* \return pointer to an allocated libusb_transfer
*/
-libusb_transfer *usb_endpoint::allocate_transfer(int buff_len)
-{
+libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){
libusb_transfer *lut = libusb_alloc_transfer(0);
-
- unsigned char *buff = new unsigned char[buff_len];
+ UHD_ASSERT_THROW(lut != NULL);
unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0));
+ unsigned char *buff = reinterpret_cast<unsigned char *>(mem);
+ libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);
libusb_fill_bulk_transfer(lut, // transfer
- _dev_handle, // dev_handle
+ _handle->get(), // dev_handle
endpoint, // endpoint
buff, // buffer
- buff_len, // length
- callback, // callback
+ len, // length
+ lut_callback, // callback
this, // user_data
0); // timeout
return lut;
@@ -219,97 +224,18 @@ libusb_transfer *usb_endpoint::allocate_transfer(int buff_len)
* Asynchonous transfer submission
* Submit a libusb transfer to libusb add pending status
* \param lut pointer to libusb_transfer
- * \return true on success or false on error
- */
-bool usb_endpoint::submit(libusb_transfer *lut)
-{
- int retval;
- if ((retval = libusb_submit_transfer(lut)) < 0) {
- std::cerr << "error: libusb_submit_transfer: " << retval << std::endl;
- return false;
- }
-
- pending_list_add(lut);
- return true;
-}
-
-
-/*
- * Cancel a pending transfer
- * Search the pending list for the transfer and cancel if found.
- * \param lut pointer to libusb_transfer to cancel
- * \return true on success or false if transfer is not found
- *
- * Note: success only indicates submission of cancelation request.
- * Sucessful cancelation is not known until the callback occurs.
+ * \return true on success or false on error
*/
-bool usb_endpoint::cancel(libusb_transfer *lut)
-{
- std::list<libusb_transfer*>::iterator iter;
- for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) {
- if (*iter == lut) {
- libusb_cancel_transfer(lut);
- return true;
- }
- }
- return false;
+void usb_endpoint::submit(libusb_transfer *lut){
+ UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0);
}
-
-/*
- * Cancel all pending transfers
- * \return bool true if cancelation request is submitted
- *
- * Note: success only indicates submission of cancelation request.
- * Sucessful cancelation is not known until the callback occurs.
- */
-bool usb_endpoint::cancel_all()
-{
- std::list<libusb_transfer*>::iterator iter;
-
- for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) {
- if (libusb_cancel_transfer(*iter) < 0) {
- std::cerr << "error: libusb_cancal_transfer() failed" << std::endl;
- return false;
- }
- }
-
- return true;
-}
-
-
-/*
- * Reap completed transfers
- * return true if at least one transfer was reaped, false otherwise.
- * Check completed transfers for errors and mark as free. This is a
- * blocking call.
- * \return bool true if a libusb transfer is reaped, false otherwise
- */
-bool usb_endpoint::reap_completed_list()
-{
- libusb_transfer *lut;
-
- if (_completed_list.empty()) {
- if (!reap_pending_list_timeout())
- return false;
- }
-
- while (!_completed_list.empty()) {
- lut = completed_list_get();
- print_transfer_status(lut);
- free_list_add(lut);
- }
-
- return true;
-}
-
-
/*
* Print status errors of a completed transfer
* \param lut pointer to an libusb_transfer
*/
-void usb_endpoint::print_transfer_status(libusb_transfer *lut)
-{
+void usb_endpoint::print_transfer_status(libusb_transfer *lut){
+ std::cout << "here " << lut->status << std::endl;
switch (lut->status) {
case LIBUSB_TRANSFER_COMPLETED:
if (lut->actual_length < lut->length) {
@@ -345,382 +271,136 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut)
}
}
-
-/*
- * Reap pending transfers without timeout
- * This is a blocking call. Reaping submitted transfers is
- * handled by libusb and the assigned callback function.
- * Block until at least one transfer is reaped.
- * \return true true if a transfer was reaped or false otherwise
- */
-bool usb_endpoint::reap_pending_list()
-{
- int retval;
-
- if ((retval = libusb_handle_events(_ctx)) < 0) {
- std::cerr << "error: libusb_handle_events: " << retval << std::endl;
- return false;
- }
-
- return true;
-}
-
-
-/*
- * Reap pending transfers with timeout
- * This call blocks until a transfer is reaped or timeout.
- * Reaping submitted transfers is handled by libusb and the
- * assigned callback function. Block until at least one
- * transfer is reaped or timeout occurs.
- * \return true if a transfer was reaped or false otherwise
- */
-bool usb_endpoint::reap_pending_list_timeout()
-{
- int retval;
- timeval tv;
-
- tv.tv_sec = 0;
- tv.tv_usec = 100000; //100ms
-
- size_t pending_list_size = _pending_list.size();
-
- if ((retval = libusb_handle_events_timeout(_ctx, &tv)) < 0) {
- std::cerr << "error: libusb_handle_events: " << retval << std::endl;
- return false;
- }
-
- if (_pending_list.size() < pending_list_size) {
- return true;
- }
- else {
- return false;
- }
-}
-
-
-/*
- * Get a free transfer
- * The transfer has an empty data bufer for OUT requests
- * \return pointer to a libusb_transfer
- */
-libusb_transfer *usb_endpoint::get_free_transfer()
-{
- if (_free_list.empty()) {
- if (!reap_completed_list())
- return NULL;
- }
-
- return free_list_get();
-}
-
-
-/*
- * Get a completed transfer
- * The transfer has a full data buffer for IN requests
- * \return pointer to libusb_transfer
- */
-libusb_transfer *usb_endpoint::get_completed_transfer()
-{
- if (_completed_list.empty()) {
- if (!reap_pending_list_timeout())
- return NULL;
- }
-
- return completed_list_get();
-}
-
-/*
- * List operations
- */
-void usb_endpoint::free_list_add(libusb_transfer *lut)
-{
- _free_list.push_back(lut);
-}
-
-void usb_endpoint::pending_list_add(libusb_transfer *lut)
-{
- _pending_list.push_back(lut);
-}
-
-void usb_endpoint::completed_list_add(libusb_transfer *lut)
-{
- _completed_list.push_back(lut);
-}
-
-
-/*
- * Free and completed lists don't have ordered content
- * Pop transfers from the front as needed
- */
-libusb_transfer *usb_endpoint::free_list_get()
-{
+libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
libusb_transfer *lut;
-
- if (_free_list.size() == 0) {
- return NULL;
- }
- else {
- lut = _free_list.front();
- _free_list.pop_front();
- return lut;
- }
+ if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut;
+ return NULL;
}
-
-/*
- * Free and completed lists don't have ordered content
- * Pop transfers from the front as needed
- */
-libusb_transfer *usb_endpoint::completed_list_get()
-{
- libusb_transfer *lut;
-
- if (_completed_list.empty()) {
- return NULL;
- }
- else {
- lut = _completed_list.front();
- _completed_list.pop_front();
- return lut;
- }
-}
-
-
-/*
- * Search and remove transfer from pending list
- * Assuming that the callbacks occur in order, the front element
- * should yield the correct transfer. If not, then something else
- * is going on. If no transfers match, then something went wrong.
- */
-bool usb_endpoint::pending_list_remove(libusb_transfer *lut)
-{
- std::list<libusb_transfer*>::iterator iter;
- for (iter = _pending_list.begin(); iter != _pending_list.end(); iter++) {
- if (*iter == lut) {
- _pending_list.erase(iter);
- return true;
- }
- }
- return false;
-}
-
-
/***********************************************************************
- * Managed buffers
+ * USB zero_copy device class
**********************************************************************/
-/*
- * Libusb managed receive buffer
- * Construct a recv buffer from a libusb transfer. The memory held by
- * the libusb transfer is exposed through the managed buffer interface.
- * Upon destruction, the transfer and buffer are resubmitted to the
- * endpoint for further use.
- */
-class libusb_managed_recv_buffer_impl : public managed_recv_buffer {
+class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> {
public:
- libusb_managed_recv_buffer_impl(libusb_transfer *lut,
- usb_endpoint *endpoint)
- : _buff(lut->buffer, lut->length)
- {
- _lut = lut;
- _endpoint = endpoint;
- }
- ~libusb_managed_recv_buffer_impl()
- {
- if (!_endpoint->submit(_lut))
- std::cerr << "USB: failed to submit IN transfer" << std::endl;
- }
+ libusb_zero_copy_impl(
+ libusb::device_handle::sptr handle,
+ size_t recv_endpoint,
+ size_t send_endpoint,
+ const device_addr_t &hints
+ );
-private:
- const boost::asio::const_buffer &get() const
- {
- return _buff;
+ ~libusb_zero_copy_impl(void){
+ _threads_running = false;
+ _thread_group.join_all();
}
- libusb_transfer *_lut;
- usb_endpoint *_endpoint;
- const boost::asio::const_buffer _buff;
-};
-
-/*
- * Libusb managed send buffer
- * Construct a send buffer from a libusb transfer. The memory held by
- * the libusb transfer is exposed through the managed buffer interface.
- * Committing the buffer will set the data length and submit the buffer
- * to the endpoint. Submitting a buffer multiple times or destroying
- * the buffer before committing is an error. For the latter, the transfer
- * is returned to the endpoint with no data for reuse.
- */
-class libusb_managed_send_buffer_impl : public managed_send_buffer {
-public:
- libusb_managed_send_buffer_impl(libusb_transfer *lut,
- usb_endpoint *endpoint,
- size_t buff_size)
- : _buff(lut->buffer, buff_size), _committed(false)
- {
- _lut = lut;
- _endpoint = endpoint;
- }
+ managed_recv_buffer::sptr get_recv_buff(double);
+ managed_send_buffer::sptr get_send_buff(double);
- ~libusb_managed_send_buffer_impl()
- {
- if (!_committed) {
- _lut->length = 0;
- _lut->actual_length = 0;
- _endpoint->submit(_lut);
- }
- }
+ size_t get_num_recv_frames(void) const { return _num_recv_frames; }
+ size_t get_num_send_frames(void) const { return _num_send_frames; }
- ssize_t commit(size_t num_bytes)
- {
- if (_committed) {
- std::cerr << "UHD: send buffer already committed" << std::endl;
- return 0;
- }
-
- UHD_ASSERT_THROW(num_bytes <= boost::asio::buffer_size(_buff));
+ size_t get_recv_frame_size(void) const { return _recv_frame_size; }
+ size_t get_send_frame_size(void) const { return _send_frame_size; }
- _lut->length = num_bytes;
- _lut->actual_length = 0;
+private:
+ void release(libusb_transfer *lut){
+ _recv_ep->submit(lut);
+ }
- if (_endpoint->submit(_lut)) {
- _committed = true;
- return num_bytes;
+ void commit(libusb_transfer *lut, size_t num_bytes){
+ lut->length = num_bytes;
+ try{
+ _send_ep->submit(lut);
}
- else {
- return 0;
+ catch(const std::exception &e){
+ std::cerr << "Error in commit: " << e.what() << std::endl;
}
}
-private:
- const boost::asio::mutable_buffer &get() const
- {
- return _buff;
+ libusb::device_handle::sptr _handle;
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
+ usb_endpoint::sptr _recv_ep, _send_ep;
+
+ //event handler threads
+ boost::thread_group _thread_group;
+ bool _threads_running;
+
+ void run_event_loop(void){
+ set_thread_priority_safe();
+ libusb::session::sptr session = libusb::session::get_global_session();
+ _threads_running = true;
+ while(_threads_running){
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000; //100ms
+ libusb_handle_events_timeout(session->get_context(), &tv);
+ }
}
-
- libusb_transfer *_lut;
- usb_endpoint *_endpoint;
- const boost::asio::mutable_buffer _buff;
- bool _committed;
-};
-
-
-/***********************************************************************
- * USB zero_copy device class
- **********************************************************************/
-class libusb_zero_copy_impl : public usb_zero_copy
-{
-private:
- usb_endpoint *_rx_ep;
- usb_endpoint *_tx_ep;
-
- // Maintain libusb values
- libusb_context *_rx_ctx;
- libusb_context *_tx_ctx;
- libusb_device_handle *_rx_dev_handle;
- libusb_device_handle *_tx_dev_handle;
-
- size_t _recv_buff_size;
- size_t _send_buff_size;
- size_t _num_frames;
-
-public:
- typedef boost::shared_ptr<libusb_zero_copy_impl> sptr;
-
- libusb_zero_copy_impl(usb_device_handle::sptr handle,
- unsigned int rx_endpoint,
- unsigned int tx_endpoint,
- size_t recv_buff_size,
- size_t send_buff_size);
-
- ~libusb_zero_copy_impl();
-
- managed_recv_buffer::sptr get_recv_buff(void);
- managed_send_buffer::sptr get_send_buff(void);
-
- size_t get_num_recv_frames(void) const { return _num_frames; }
- size_t get_num_send_frames(void) const { return _num_frames; }
};
/*
* Constructor
* Initializes libusb, opens devices, and sets up interfaces for I/O.
- * Finally, creates endpoints for asynchronous I/O.
+ * Finally, creates endpoints for asynchronous I/O.
*/
-libusb_zero_copy_impl::libusb_zero_copy_impl(usb_device_handle::sptr handle,
- unsigned int rx_endpoint,
- unsigned int tx_endpoint,
- size_t buff_size,
- size_t block_size)
- : _rx_ctx(NULL), _tx_ctx(NULL), _rx_dev_handle(NULL), _tx_dev_handle(NULL),
- _recv_buff_size(block_size), _send_buff_size(block_size),
- _num_frames(buff_size / block_size)
+libusb_zero_copy_impl::libusb_zero_copy_impl(
+ libusb::device_handle::sptr handle,
+ size_t recv_endpoint,
+ size_t send_endpoint,
+ const device_addr_t &hints
+):
+ _handle(handle),
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)))
{
- // Initialize libusb with separate contexts to allow
- // thread safe operation of transmit and receive
- libusb::init(&_rx_ctx, libusb_debug_level);
- libusb::init(&_tx_ctx, libusb_debug_level);
-
- UHD_ASSERT_THROW((_rx_ctx != NULL) && (_tx_ctx != NULL));
-
- // Find and open the libusb_device corresponding to the
- // given handle and return the libusb_device_handle
- // that can be used for I/O purposes.
- _rx_dev_handle = libusb::open_device(_rx_ctx, handle);
- _tx_dev_handle = libusb::open_device(_tx_ctx, handle);
-
- // Open USB interfaces for tx/rx using magic values.
- // IN interface: 2
- // OUT interface: 1
- // Control interface: 0
- libusb::open_interface(_rx_dev_handle, 2);
- libusb::open_interface(_tx_dev_handle, 1);
-
- _rx_ep = new usb_endpoint(_rx_dev_handle, // libusb device_handle
- _rx_ctx, // libusb context
- rx_endpoint, // USB endpoint number
+ _handle->claim_interface(2 /*in interface*/);
+ _handle->claim_interface(1 /*out interface*/);
+
+ _recv_ep = usb_endpoint::sptr(new usb_endpoint(
+ _handle, // libusb device_handle
+ recv_endpoint, // USB endpoint number
true, // IN endpoint
- _recv_buff_size, // buffer size per transfer
- _num_frames); // number of libusb transfers
+ this->get_recv_frame_size(), // buffer size per transfer
+ this->get_num_recv_frames() // number of libusb transfers
+ ));
- _tx_ep = new usb_endpoint(_tx_dev_handle, // libusb device_handle
- _tx_ctx, // libusb context
- tx_endpoint, // USB endpoint number
+ _send_ep = usb_endpoint::sptr(new usb_endpoint(
+ _handle, // libusb device_handle
+ send_endpoint, // USB endpoint number
false, // OUT endpoint
- _send_buff_size, // buffer size per transfer
- _num_frames); // number of libusb transfers
+ this->get_send_frame_size(), // buffer size per transfer
+ this->get_num_send_frames() // number of libusb transfers
+ ));
+
+ //spawn the event handler threads
+ size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
+ for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
+ boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
+ );
}
-
-libusb_zero_copy_impl::~libusb_zero_copy_impl()
-{
- delete _rx_ep;
- delete _tx_ep;
-
- libusb_close(_rx_dev_handle);
- libusb_close(_tx_dev_handle);
-
- libusb_exit(_rx_ctx);
- libusb_exit(_tx_ctx);
-}
-
-
/*
* Construct a managed receive buffer from a completed libusb transfer
* (happy with buffer full of data) obtained from the receive endpoint.
* Return empty pointer if no transfer is available (timeout or error).
- * \return pointer to a managed receive buffer
+ * \return pointer to a managed receive buffer
*/
-managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff()
-{
- libusb_transfer *lut = _rx_ep->get_completed_transfer();
+managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){
+ libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout);
if (lut == NULL) {
return managed_recv_buffer::sptr();
}
else {
- return managed_recv_buffer::sptr(
- new libusb_managed_recv_buffer_impl(lut,
- _rx_ep));
+ return managed_recv_buffer::make_safe(
+ boost::asio::const_buffer(lut->buffer, lut->actual_length),
+ boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut)
+ );
}
}
@@ -729,39 +409,34 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff()
* Construct a managed send buffer from a free libusb transfer (with
* empty buffer). Return empty pointer of no transfer is available
* (timeout or error).
- * \return pointer to a managed send buffer
+ * \return pointer to a managed send buffer
*/
-managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff()
-{
- libusb_transfer *lut = _tx_ep->get_free_transfer();
+managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){
+ libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout);
if (lut == NULL) {
return managed_send_buffer::sptr();
}
else {
- return managed_send_buffer::sptr(
- new libusb_managed_send_buffer_impl(lut,
- _tx_ep,
- _send_buff_size));
+ return managed_send_buffer::make_safe(
+ boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()),
+ boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1)
+ );
}
}
-
/***********************************************************************
* USB zero_copy make functions
**********************************************************************/
-usb_zero_copy::sptr usb_zero_copy::make(usb_device_handle::sptr handle,
- unsigned int rx_endpoint,
- unsigned int tx_endpoint,
- size_t buff_size,
- size_t block_size)
-
-{
- return sptr(new libusb_zero_copy_impl(handle,
- rx_endpoint,
- tx_endpoint,
- buff_size,
- block_size));
+usb_zero_copy::sptr usb_zero_copy::make(
+ usb_device_handle::sptr handle,
+ size_t recv_endpoint,
+ size_t send_endpoint,
+ const device_addr_t &hints
+){
+ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(
+ boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()
+ ));
+ return sptr(new libusb_zero_copy_impl(
+ dev_handle, recv_endpoint, send_endpoint, hints
+ ));
}
-
-
-
diff --git a/host/lib/transport/msvc/stdint.h b/host/lib/transport/msvc/stdint.h
new file mode 100644
index 000000000..b3eb61aae
--- /dev/null
+++ b/host/lib/transport/msvc/stdint.h
@@ -0,0 +1,35 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_STDINT_H
+#define INCLUDED_LIBUHD_TRANSPORT_STDINT_H
+
+#include <boost/cstdint.hpp>
+
+//provide a stdint implementation for libusb
+
+typedef boost::uint64_t uint64_t;
+typedef boost::uint32_t uint32_t;
+typedef boost::uint16_t uint16_t;
+typedef boost::uint8_t uint8_t;
+
+typedef boost::int64_t int64_t;
+typedef boost::int32_t int32_t;
+typedef boost::int16_t int16_t;
+typedef boost::int8_t int8_t;
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_STDINT_H */
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index ee989ee2b..d84aeefdd 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -17,25 +17,58 @@
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
#include <uhd/utils/warning.hpp>
-#include <boost/cstdint.hpp>
+#include <boost/shared_array.hpp>
#include <boost/asio.hpp>
#include <boost/format.hpp>
+#include <boost/thread.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <iostream>
+using namespace uhd;
using namespace uhd::transport;
+namespace asio = boost::asio;
/***********************************************************************
* Constants
**********************************************************************/
+//Define this to the the boost async io calls to perform receive.
+//Otherwise, get_recv_buff uses a blocking receive with timeout.
+//#define USE_ASIO_ASYNC_RECV
+
+//Define this to the the boost async io calls to perform send.
+//Otherwise, the commit callback uses a blocking send.
+//#define USE_ASIO_ASYNC_SEND
+
//enough buffering for half a second of samples at full rate on usrp2
-static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 25e6 * 0.5);
+static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
+
//Large buffers cause more underflow at high rates.
//Perhaps this is due to the kernel scheduling,
//but may change with host-based flow control.
static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
-static const double RECV_TIMEOUT = 0.1; //100 ms
+
+//The number of async frames to allocate for each send and recv:
+//The non-async recv can have a very large number of recv frames
+//because the CPU overhead is independent of the number of frames.
+#ifdef USE_ASIO_ASYNC_RECV
+static const size_t DEFAULT_NUM_RECV_FRAMES = 32;
+#else
+static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu;
+#endif
+//The non-async send only ever requires a single frame
+//because the buffer will be committed before a new get.
+#ifdef USE_ASIO_ASYNC_SEND
+static const size_t DEFAULT_NUM_SEND_FRAMES = 32;
+#else
+static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;;
+#endif
+
+//a single concurrent thread for io_service seems to be the fastest
+static const size_t CONCURRENCY_HINT = 1;
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
@@ -44,39 +77,68 @@ static const double RECV_TIMEOUT = 0.1; //100 ms
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
-class udp_zero_copy_impl:
- public phony_zero_copy_recv_if,
- public phony_zero_copy_send_if,
- public udp_zero_copy
-{
+class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> {
public:
- typedef boost::shared_ptr<udp_zero_copy_impl> sptr;
+ typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
- udp_zero_copy_impl(
+ udp_zero_copy_asio_impl(
const std::string &addr,
- const std::string &port
+ const std::string &port,
+ const device_addr_t &hints
):
- phony_zero_copy_recv_if(udp_simple::mtu),
- phony_zero_copy_send_if(udp_simple::mtu)
+ _io_service(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES)))
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
- // resolve the address
- boost::asio::ip::udp::resolver resolver(_io_service);
- boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
- boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+ //resolve the address
+ asio::ip::udp::resolver resolver(_io_service);
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
+ asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
- // create, open, and connect the socket
- _socket = new boost::asio::ip::udp::socket(_io_service);
- _socket->open(boost::asio::ip::udp::v4());
+ //create, open, and connect the socket
+ _socket = new asio::ip::udp::socket(_io_service);
+ _socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
_sock_fd = _socket->native();
}
- ~udp_zero_copy_impl(void){
+ ~udp_zero_copy_asio_impl(void){
+ delete _work; //allow io_service run to complete
+ _thread_group.join_all(); //wait for service threads to exit
delete _socket;
}
+ void init(void){
+ //allocate all recv frames and release them to begin xfers
+ _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames);
+ _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]);
+ for (size_t i = 0; i < _num_recv_frames; i++){
+ release(_recv_buffer.get() + i*_recv_frame_size);
+ }
+
+ //allocate all send frames and push them into the fifo
+ _pending_send_buffs = pending_buffs_type::make(_num_send_frames);
+ _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]);
+ for (size_t i = 0; i < _num_send_frames; i++){
+ handle_send(_send_buffer.get() + i*_send_frame_size);
+ }
+
+ //spawn the service threads that will run the io service
+ _work = new asio::io_service::work(_io_service); //new work to delete later
+ for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread(
+ boost::bind(&udp_zero_copy_asio_impl::service, this)
+ );
+ }
+
+ void service(void){
+ set_thread_priority_safe();
+ _io_service.run();
+ }
+
//get size for internal socket buffer
template <typename Opt> size_t get_buff_size(void) const{
Opt option;
@@ -91,60 +153,165 @@ public:
return get_buff_size<Opt>();
}
+ //! handle a recv callback -> push the filled memory into the fifo
+ UHD_INLINE void handle_recv(void *mem, size_t len){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
+ }
- //The number of frames is approximately the buffer size divided by the max datagram size.
- //In reality, this is a phony zero-copy interface and the number of frames is infinite.
- //However, its sensible to advertise a frame count that is approximate to buffer size.
- //This way, the transport caller will have an idea about how much buffering to create.
-
- size_t get_num_recv_frames(void) const{
- return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/udp_simple::mtu;
+ ////////////////////////////////////////////////////////////////////
+ #ifdef USE_ASIO_ASYNC_RECV
+ ////////////////////////////////////////////////////////////////////
+ //! pop a filled recv buffer off of the fifo and bind with the release callback
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
+ if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){
+ return managed_recv_buffer::make_safe(
+ buff, boost::bind(
+ &udp_zero_copy_asio_impl::release,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff)
+ )
+ );
+ }
+ return managed_recv_buffer::sptr();
}
- size_t get_num_send_frames(void) const{
- return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/udp_simple::mtu;
+ //! release a recv buffer -> start an async recv on the buffer
+ void release(void *mem){
+ _socket->async_receive(
+ boost::asio::buffer(mem, this->get_recv_frame_size()),
+ boost::bind(
+ &udp_zero_copy_asio_impl::handle_recv,
+ shared_from_this(), mem,
+ asio::placeholders::bytes_transferred
+ )
+ );
}
-private:
- boost::asio::ip::udp::socket *_socket;
- boost::asio::io_service _io_service;
- int _sock_fd;
+ ////////////////////////////////////////////////////////////////////
+ #else /*USE_ASIO_ASYNC_RECV*/
+ ////////////////////////////////////////////////////////////////////
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
- ssize_t recv(const boost::asio::mutable_buffer &buff){
//setup timeval for timeout
timeval tv;
tv.tv_sec = 0;
- tv.tv_usec = int(RECV_TIMEOUT*1e6);
+ tv.tv_usec = long(timeout*1e6);
//setup rset for timeout
fd_set rset;
FD_ZERO(&rset);
FD_SET(_sock_fd, &rset);
- //call select to perform timed wait
- if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0;
+ //call select to perform timed wait and grab an available buffer with wait
+ //if the condition is true, call receive and return the managed buffer
+ if (
+ ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and
+ _pending_recv_buffs->pop_with_timed_wait(buff, timeout)
+ ){
+ return managed_recv_buffer::make_safe(
+ asio::buffer(
+ boost::asio::buffer_cast<void *>(buff),
+ _socket->receive(asio::buffer(buff))
+ ),
+ boost::bind(
+ &udp_zero_copy_asio_impl::release,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff)
+ )
+ );
+ }
+ return managed_recv_buffer::sptr();
+ }
+
+ void release(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ handle_recv(mem, this->get_recv_frame_size());
+ }
- return ::recv(
- _sock_fd,
- boost::asio::buffer_cast<char *>(buff),
- boost::asio::buffer_size(buff), 0
- );
+ ////////////////////////////////////////////////////////////////////
+ #endif /*USE_ASIO_ASYNC_RECV*/
+ ////////////////////////////////////////////////////////////////////
+
+ size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+ size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+
+ //! handle a send callback -> push the emptied memory into the fifo
+ UHD_INLINE void handle_send(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size()));
}
- ssize_t send(const boost::asio::const_buffer &buff){
- return ::send(
- _sock_fd,
- boost::asio::buffer_cast<const char *>(buff),
- boost::asio::buffer_size(buff), 0
+ //! pop an empty send buffer off of the fifo and bind with the commit callback
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
+ if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){
+ return managed_send_buffer::make_safe(
+ buff, boost::bind(
+ &udp_zero_copy_asio_impl::commit,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff), _1
+ )
+ );
+ }
+ return managed_send_buffer::sptr();
+ }
+
+ ////////////////////////////////////////////////////////////////////
+ #ifdef USE_ASIO_ASYNC_SEND
+ ////////////////////////////////////////////////////////////////////
+ //! commit a send buffer -> start an async send on the buffer
+ void commit(void *mem, size_t len){
+ _socket->async_send(
+ boost::asio::buffer(mem, len),
+ boost::bind(
+ &udp_zero_copy_asio_impl::handle_send,
+ shared_from_this(), mem
+ )
);
}
+
+ ////////////////////////////////////////////////////////////////////
+ #else /*USE_ASIO_ASYNC_SEND*/
+ ////////////////////////////////////////////////////////////////////
+ void commit(void *mem, size_t len){
+ _socket->send(asio::buffer(mem, len));
+ handle_send(mem);
+ }
+
+ ////////////////////////////////////////////////////////////////////
+ #endif /*USE_ASIO_ASYNC_SEND*/
+ ////////////////////////////////////////////////////////////////////
+
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+ size_t get_send_frame_size(void) const {return _send_frame_size;}
+
+private:
+ //asio guts -> socket and service
+ asio::ip::udp::socket *_socket;
+ asio::io_service _io_service;
+ asio::io_service::work *_work;
+ int _sock_fd;
+
+ //memory management -> buffers and fifos
+ boost::thread_group _thread_group;
+ boost::shared_array<char> _send_buffer, _recv_buffer;
+ typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;
+ pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
};
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
template<typename Opt> static void resize_buff_helper(
- udp_zero_copy_impl::sptr udp_trans,
+ udp_zero_copy_asio_impl::sptr udp_trans,
size_t target_size,
const std::string &name
){
@@ -152,6 +319,13 @@ template<typename Opt> static void resize_buff_helper(
if (name == "recv") min_sock_buff_size = MIN_RECV_SOCK_BUFF_SIZE;
if (name == "send") min_sock_buff_size = MIN_SEND_SOCK_BUFF_SIZE;
+ std::string help_message;
+ #if defined(UHD_PLATFORM_LINUX)
+ help_message = str(boost::format(
+ "Please run: sudo sysctl -w net.core.%smem_max=%d\n"
+ ) % ((name == "recv")?"r":"w") % min_sock_buff_size);
+ #endif /*defined(UHD_PLATFORM_LINUX)*/
+
//resize the buffer if size was provided
if (target_size > 0){
size_t actual_size = udp_trans->resize_buff<Opt>(target_size);
@@ -165,8 +339,8 @@ template<typename Opt> static void resize_buff_helper(
if (actual_size < target_size) uhd::print_warning(str(boost::format(
"The %s buffer is smaller than the requested size.\n"
"The minimum recommended buffer size is %d bytes.\n"
- "See the USRP2 application notes on buffer resizing.\n"
- ) % name % min_sock_buff_size));
+ "See the transport application notes on buffer resizing.\n%s"
+ ) % name % min_sock_buff_size % help_message));
}
//only enable on platforms that are happy with the large buffer resize
@@ -181,14 +355,21 @@ template<typename Opt> static void resize_buff_helper(
udp_zero_copy::sptr udp_zero_copy::make(
const std::string &addr,
const std::string &port,
- size_t recv_buff_size,
- size_t send_buff_size
+ const device_addr_t &hints
){
- udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port));
+ udp_zero_copy_asio_impl::sptr udp_trans(
+ new udp_zero_copy_asio_impl(addr, port, hints)
+ );
+
+ //extract buffer size hints from the device addr
+ size_t recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0));
+ size_t send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0));
//call the helper to resize send and recv buffers
- resize_buff_helper<boost::asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
- resize_buff_helper<boost::asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
+ resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
+ resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
+
+ udp_trans->init(); //buffers resized -> call init() to use
return udp_trans;
}
diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp
new file mode 100644
index 000000000..8a9772e7f
--- /dev/null
+++ b/host/lib/transport/usb_dummy_impl.cpp
@@ -0,0 +1,39 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/usb_device_handle.hpp>
+#include <uhd/transport/usb_control.hpp>
+#include <uhd/transport/usb_zero_copy.hpp>
+#include <uhd/utils/exception.hpp>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::uint16_t, boost::uint16_t){
+ return std::vector<usb_device_handle::sptr>(); //empty list
+}
+
+usb_control::sptr usb_control::make(usb_device_handle::sptr){
+ throw std::runtime_error("no usb support -> usb_control::make not implemented");
+}
+
+usb_zero_copy::sptr usb_zero_copy::make(
+ usb_device_handle::sptr,
+ size_t, size_t, const device_addr_t &
+){
+ throw std::runtime_error("no usb support -> usb_zero_copy::make not implemented");
+}
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp
index 7e0588f03..939517411 100644
--- a/host/lib/transport/vrt_packet_handler.hpp
+++ b/host/lib/transport/vrt_packet_handler.hpp
@@ -150,7 +150,8 @@ template <typename T> UHD_INLINE T get_context_code(
const vrt_unpacker_t &vrt_unpacker,
const get_recv_buffs_t &get_recv_buffs,
const handle_overflow_t &handle_overflow,
- size_t vrt_header_offset_words32
+ size_t vrt_header_offset_words32,
+ size_t chans_per_otw_buff
){
metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_NONE;
@@ -184,15 +185,21 @@ template <typename T> UHD_INLINE T get_context_code(
//extract the number of samples available to copy
size_t bytes_per_item = otw_type.get_sample_size();
size_t nsamps_available = state.size_of_copy_buffs/bytes_per_item;
- size_t nsamps_to_copy = std::min(total_samps, nsamps_available);
+ size_t nsamps_to_copy = std::min(total_samps*chans_per_otw_buff, nsamps_available);
size_t bytes_to_copy = nsamps_to_copy*bytes_per_item;
+ size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff;
+
+ std::vector<void *> io_buffs(chans_per_otw_buff);
+ for (size_t i = 0; i < state.width; i+=chans_per_otw_buff){
+
+ //fill a vector with pointers to the io buffers
+ for (size_t j = 0; j < chans_per_otw_buff; j++){
+ io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes;
+ }
- for (size_t i = 0; i < state.width; i++){
//copy-convert the samples from the recv buffer
uhd::transport::convert_otw_type_to_io_type(
- state.copy_buffs[i], otw_type,
- reinterpret_cast<boost::uint8_t *>(buffs[i]) + offset_bytes,
- io_type, nsamps_to_copy
+ state.copy_buffs[i], otw_type, io_buffs, io_type, nsamps_to_copy_per_io_buff
);
//update the rx copy buffer to reflect the bytes copied
@@ -206,7 +213,7 @@ template <typename T> UHD_INLINE T get_context_code(
metadata.fragment_offset = state.fragment_offset_in_samps;
state.fragment_offset_in_samps += nsamps_to_copy; //set for next call
- return nsamps_to_copy;
+ return nsamps_to_copy_per_io_buff;
}
/*******************************************************************
@@ -224,7 +231,8 @@ template <typename T> UHD_INLINE T get_context_code(
const vrt_unpacker_t &vrt_unpacker,
const get_recv_buffs_t &get_recv_buffs,
const handle_overflow_t &handle_overflow = &handle_overflow_nop,
- size_t vrt_header_offset_words32 = 0
+ size_t vrt_header_offset_words32 = 0,
+ size_t chans_per_otw_buff = 1
){
switch(recv_mode){
@@ -241,7 +249,8 @@ template <typename T> UHD_INLINE T get_context_code(
vrt_unpacker,
get_recv_buffs,
handle_overflow,
- vrt_header_offset_words32
+ vrt_header_offset_words32,
+ chans_per_otw_buff
);
}
@@ -261,7 +270,8 @@ template <typename T> UHD_INLINE T get_context_code(
vrt_unpacker,
get_recv_buffs,
handle_overflow,
- vrt_header_offset_words32
+ vrt_header_offset_words32,
+ chans_per_otw_buff
);
if (num_samps == 0) break; //had a recv timeout or error, break loop
accum_num_samps += num_samps;
@@ -293,47 +303,49 @@ template <typename T> UHD_INLINE T get_context_code(
* Pack a vrt header, copy-convert the data, and send it.
* - helper function for vrt_packet_handler::send
******************************************************************/
- static UHD_INLINE void _send1(
+ static UHD_INLINE size_t _send1(
send_state &state,
const std::vector<const void *> &buffs,
- size_t offset_bytes,
- size_t num_samps,
+ const size_t offset_bytes,
+ const size_t num_samps,
uhd::transport::vrt::if_packet_info_t &if_packet_info,
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
const vrt_packer_t &vrt_packer,
const get_send_buffs_t &get_send_buffs,
- size_t vrt_header_offset_words32
+ const size_t vrt_header_offset_words32,
+ const size_t chans_per_otw_buff
){
//load the rest of the if_packet_info in here
- if_packet_info.num_payload_words32 = (num_samps*otw_type.get_sample_size())/sizeof(boost::uint32_t);
+ if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t);
if_packet_info.packet_count = state.next_packet_seq++;
//get send buffers for each channel
- managed_send_buffs_t send_buffs(buffs.size());
- UHD_ASSERT_THROW(get_send_buffs(send_buffs));
+ managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff);
+ if (not get_send_buffs(send_buffs)) return 0;
- for (size_t i = 0; i < buffs.size(); i++){
+ std::vector<const void *> io_buffs(chans_per_otw_buff);
+ for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){
//calculate pointers with offsets to io and otw memory
- const boost::uint8_t *io_mem = reinterpret_cast<const boost::uint8_t *>(buffs[i]) + offset_bytes;
+ for (size_t j = 0; j < chans_per_otw_buff; j++){
+ io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes;
+ }
boost::uint32_t *otw_mem = send_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32;
//pack metadata into a vrt header
vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
//copy-convert the samples into the send buffer
uhd::transport::convert_io_type_to_otw_type(
- io_mem, io_type,
- otw_mem + if_packet_info.num_header_words32, otw_type,
- num_samps
+ io_buffs, io_type, otw_mem, otw_type, num_samps
);
//commit the samples to the zero-copy interface
size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);
- if (send_buffs[i]->commit(num_bytes_total) < ssize_t(num_bytes_total)){
- std::cerr << "commit to send buffer returned less than commit size" << std::endl;
- }
+ send_buffs[i]->commit(num_bytes_total);
}
+ return num_samps;
}
/*******************************************************************
@@ -351,7 +363,8 @@ template <typename T> UHD_INLINE T get_context_code(
const vrt_packer_t &vrt_packer,
const get_send_buffs_t &get_send_buffs,
size_t max_samples_per_packet,
- size_t vrt_header_offset_words32 = 0
+ size_t vrt_header_offset_words32 = 0,
+ size_t chans_per_otw_buff = 1
){
//translate the metadata to vrt if packet info
uhd::transport::vrt::if_packet_info_t if_packet_info;
@@ -367,7 +380,6 @@ template <typename T> UHD_INLINE T get_context_code(
////////////////////////////////////////////////////////////////
case uhd::device::SEND_MODE_ONE_PACKET:{
////////////////////////////////////////////////////////////////
- size_t num_samps = std::min(total_num_samps, max_samples_per_packet);
//fill in parts of the packet info overwrote in full buff mode
if_packet_info.has_tsi = metadata.has_time_spec;
@@ -375,49 +387,54 @@ template <typename T> UHD_INLINE T get_context_code(
if_packet_info.sob = metadata.start_of_burst;
if_packet_info.eob = metadata.end_of_burst;
- _send1(
+ return _send1(
state,
buffs, 0,
- num_samps,
+ std::min(total_num_samps, max_samples_per_packet),
if_packet_info,
io_type, otw_type,
vrt_packer,
get_send_buffs,
- vrt_header_offset_words32
+ vrt_header_offset_words32,
+ chans_per_otw_buff
);
- return num_samps;
}
////////////////////////////////////////////////////////////////
case uhd::device::SEND_MODE_FULL_BUFF:{
////////////////////////////////////////////////////////////////
- //calculate constants for fragmentation
- const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet;
- static const size_t first_fragment_index = 0;
- const size_t final_fragment_index = num_fragments-1;
+ size_t total_num_samps_sent = 0;
//loop through the following fragment indexes
- for (size_t n = first_fragment_index; n <= final_fragment_index; n++){
+ while(total_num_samps_sent < total_num_samps){
+
+ //calculate per-loop-iteration variables
+ const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent;
+ const bool first_fragment = (total_num_samps_sent == 0);
+ const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet);
//calculate new flags for the fragments
- if_packet_info.has_tsi = metadata.has_time_spec and (n == first_fragment_index);
- if_packet_info.has_tsf = metadata.has_time_spec and (n == first_fragment_index);
- if_packet_info.sob = metadata.start_of_burst and (n == first_fragment_index);
- if_packet_info.eob = metadata.end_of_burst and (n == final_fragment_index);
+ if_packet_info.has_tsi = metadata.has_time_spec and first_fragment;
+ if_packet_info.has_tsf = if_packet_info.has_tsi;
+ if_packet_info.sob = metadata.start_of_burst and first_fragment;
+ if_packet_info.eob = metadata.end_of_burst and final_fragment;
//send the fragment with the helper function
- _send1(
+ const size_t num_samps_sent = _send1(
state,
- buffs, n*max_samples_per_packet*io_type.size,
- (n == final_fragment_index)?(total_num_samps%max_samples_per_packet):max_samples_per_packet,
+ buffs, total_num_samps_sent*io_type.size,
+ std::min(total_num_samps_unsent, max_samples_per_packet),
if_packet_info,
io_type, otw_type,
vrt_packer,
get_send_buffs,
- vrt_header_offset_words32
+ vrt_header_offset_words32,
+ chans_per_otw_buff
);
+ total_num_samps_sent += num_samps_sent;
+ if (num_samps_sent == 0) return total_num_samps_sent;
}
- return total_num_samps;
+ return total_num_samps_sent;
}
default: throw std::runtime_error("unknown send mode");
diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp
index 8a1cde694..a5a864a04 100644
--- a/host/lib/transport/zero_copy.cpp
+++ b/host/lib/transport/zero_copy.cpp
@@ -16,32 +16,35 @@
//
#include <uhd/transport/zero_copy.hpp>
-#include <boost/cstdint.hpp>
-#include <boost/function.hpp>
-#include <boost/bind.hpp>
using namespace uhd::transport;
/***********************************************************************
- * The pure-virtual deconstructor needs an implementation to be happy
+ * Safe managed receive buffer
**********************************************************************/
-managed_recv_buffer::~managed_recv_buffer(void){
+static void release_nop(void){
/* NOP */
}
-/***********************************************************************
- * Phony zero-copy recv interface implementation
- **********************************************************************/
-
-//! phony zero-copy recv buffer implementation
-class managed_recv_buffer_impl : public managed_recv_buffer{
+class safe_managed_receive_buffer : public managed_recv_buffer{
public:
- managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){
+ safe_managed_receive_buffer(
+ const boost::asio::const_buffer &buff,
+ const release_fcn_t &release_fcn
+ ):
+ _buff(buff), _release_fcn(release_fcn)
+ {
/* NOP */
}
- ~managed_recv_buffer_impl(void){
- delete [] this->cast<const boost::uint8_t *>();
+ ~safe_managed_receive_buffer(void){
+ _release_fcn();
+ }
+
+ void release(void){
+ release_fcn_t release_fcn = _release_fcn;
+ _release_fcn = &release_nop;
+ return release_fcn();
}
private:
@@ -50,64 +53,42 @@ private:
}
const boost::asio::const_buffer _buff;
+ release_fcn_t _release_fcn;
};
-//! phony zero-copy recv interface implementation
-struct phony_zero_copy_recv_if::impl{
- impl(size_t max_buff_size) : max_buff_size(max_buff_size){
- /* NOP */
- }
- size_t max_buff_size;
-};
-
-phony_zero_copy_recv_if::phony_zero_copy_recv_if(size_t max_buff_size){
- _impl = UHD_PIMPL_MAKE(impl, (max_buff_size));
-}
-
-phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){
- /* NOP */
-}
-
-managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(void){
- //allocate memory
- boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size];
-
- //call recv() with timeout option
- ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size));
-
- if (num_bytes <= 0) return managed_recv_buffer::sptr(); //NULL sptr
-
- //create a new managed buffer to house the data
- return managed_recv_buffer::sptr(
- new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes))
- );
+managed_recv_buffer::sptr managed_recv_buffer::make_safe(
+ const boost::asio::const_buffer &buff,
+ const release_fcn_t &release_fcn
+){
+ return sptr(new safe_managed_receive_buffer(buff, release_fcn));
}
/***********************************************************************
- * Phony zero-copy send interface implementation
+ * Safe managed send buffer
**********************************************************************/
+static void commit_nop(size_t){
+ /* NOP */
+}
-//! phony zero-copy send buffer implementation
-class managed_send_buffer_impl : public managed_send_buffer{
+class safe_managed_send_buffer : public managed_send_buffer{
public:
- typedef boost::function<ssize_t(const boost::asio::const_buffer &)> send_fcn_t;
-
- managed_send_buffer_impl(
+ safe_managed_send_buffer(
const boost::asio::mutable_buffer &buff,
- const send_fcn_t &send_fcn
+ const commit_fcn_t &commit_fcn
):
- _buff(buff),
- _send_fcn(send_fcn)
+ _buff(buff), _commit_fcn(commit_fcn)
{
/* NOP */
}
- ~managed_send_buffer_impl(void){
- /* NOP */
+ ~safe_managed_send_buffer(void){
+ _commit_fcn(0);
}
- ssize_t commit(size_t num_bytes){
- return _send_fcn(boost::asio::buffer(_buff, num_bytes));
+ void commit(size_t num_bytes){
+ commit_fcn_t commit_fcn = _commit_fcn;
+ _commit_fcn = &commit_nop;
+ return commit_fcn(num_bytes);
}
private:
@@ -116,28 +97,12 @@ private:
}
const boost::asio::mutable_buffer _buff;
- const send_fcn_t _send_fcn;
-};
-
-//! phony zero-copy send interface implementation
-struct phony_zero_copy_send_if::impl{
- boost::uint8_t *send_mem;
- managed_send_buffer::sptr send_buff;
+ commit_fcn_t _commit_fcn;
};
-phony_zero_copy_send_if::phony_zero_copy_send_if(size_t max_buff_size){
- _impl = UHD_PIMPL_MAKE(impl, ());
- _impl->send_mem = new boost::uint8_t[max_buff_size];
- _impl->send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl(
- boost::asio::buffer(_impl->send_mem, max_buff_size),
- boost::bind(&phony_zero_copy_send_if::send, this, _1)
- ));
-}
-
-phony_zero_copy_send_if::~phony_zero_copy_send_if(void){
- delete [] _impl->send_mem;
-}
-
-managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){
- return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these
+safe_managed_send_buffer::sptr managed_send_buffer::make_safe(
+ const boost::asio::mutable_buffer &buff,
+ const commit_fcn_t &commit_fcn
+){
+ return sptr(new safe_managed_send_buffer(buff, commit_fcn));
}