aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt7
-rw-r--r--host/lib/transport/gen_vrt_if_packet.py143
-rw-r--r--host/lib/transport/libusb1_base.cpp33
-rw-r--r--host/lib/transport/libusb1_base.hpp4
-rw-r--r--host/lib/transport/libusb1_control.cpp5
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp335
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp53
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp41
-rw-r--r--host/lib/transport/usb_zero_copy_wrapper.cpp231
9 files changed, 465 insertions, 387 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 6524a8412..963edcf85 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2010-2011 Ettus Research LLC
+# Copyright 2010-2013 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -37,6 +37,10 @@ IF(ENABLE_USB)
${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.cpp
${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.hpp
)
+ SET_SOURCE_FILES_PROPERTIES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_zero_copy.cpp
+ PROPERTIES COMPILE_DEFINITIONS "${LIBUSB_DEFINITIONS}"
+ )
ELSE(ENABLE_USB)
LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/usb_dummy_impl.cpp
@@ -118,5 +122,4 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/usb_zero_copy_wrapper.cpp
)
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py
index e28ce3aae..98f6804ae 100644
--- a/host/lib/transport/gen_vrt_if_packet.py
+++ b/host/lib/transport/gen_vrt_if_packet.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2010-2011 Ettus Research LLC
+# Copyright 2010-2013 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -48,12 +48,14 @@ TMPL_TEXT = """
using namespace uhd;
using namespace uhd::transport;
+using namespace uhd::transport::vrt;
typedef size_t pred_type;
typedef std::vector<pred_type> pred_table_type;
#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff)
-static pred_table_type get_pred_unpack_table(void){
+static pred_table_type get_pred_unpack_table(void)
+{
pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28)
for (size_t i = 0; i < table.size(); i++){
boost::uint32_t vrt_hdr_word = i << 20;
@@ -74,13 +76,45 @@ static const pred_table_type pred_unpack_table(get_pred_unpack_table());
//maps num empty bytes to trailer bits
static const size_t occ_table[] = {0, 2, 1, 3};
+const boost::uint32_t VRLP = ('V' << 24) | ('R' << 16) | ('L' << 8) | ('P' << 0);
+const boost::uint32_t VEND = ('V' << 24) | ('E' << 16) | ('N' << 8) | ('D' << 0);
+
+UHD_INLINE static boost::uint32_t chdr_to_vrt(const boost::uint32_t chdr, if_packet_info_t &info)
+{
+ const boost::uint32_t bytes = chdr & 0xffff;
+ boost::uint32_t vrt = (bytes + 3)/4;
+ info.packet_count = (chdr >> 16) & 0xfff;
+ vrt |= ((chdr >> 31) & 0x1) << 30; //context packet
+ vrt |= ((chdr >> 29) & 0x1) << 20; //has tsf
+ vrt |= ((chdr >> 28) & 0x1) << 24; //has eob
+ vrt |= (0x1) << 28; //has sid (always)
+ return vrt;
+}
+
+UHD_INLINE static boost::uint32_t vrt_to_chdr(const boost::uint32_t vrt, const if_packet_info_t &info)
+{
+ const boost::uint32_t words32 = vrt & 0xffff;
+ int bytes_rem = info.num_payload_bytes % 4;
+ if (bytes_rem != 0) bytes_rem -= 4; //adjust for round up
+ boost::uint32_t chdr = (words32 * 4) + bytes_rem;
+ chdr |= (info.packet_count & 0xfff) << 16;
+ chdr |= ((vrt >> 30) & 0x1) << 31; //context packet
+ chdr |= ((vrt >> 20) & 0x1) << 29; //has tsf
+ chdr |= ((vrt >> 24) & 0x1) << 28; //has eob
+ return chdr;
+}
+
########################################################################
#def gen_code($XE_MACRO, $suffix)
########################################################################
-void vrt::if_hdr_pack_$(suffix)(
+/***********************************************************************
+ * interal impl of packing VRT IF header only
+ **********************************************************************/
+UHD_INLINE void __if_hdr_pack_$(suffix)(
boost::uint32_t *packet_buff,
- if_packet_info_t &if_packet_info
+ if_packet_info_t &if_packet_info,
+ boost::uint32_t &vrt_hdr_word32
){
boost::uint32_t vrt_hdr_flags = 0;
@@ -154,31 +188,33 @@ void vrt::if_hdr_pack_$(suffix)(
}
//fill in complete header word
- packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0
+ vrt_hdr_word32 = boost::uint32_t(0
| (if_packet_info.packet_type << 29)
| vrt_hdr_flags
| ((if_packet_info.packet_count & 0xf) << 16)
| (if_packet_info.num_packet_words32 & 0xffff)
- ));
+ );
}
-void vrt::if_hdr_unpack_$(suffix)(
+/***********************************************************************
+ * interal impl of unpacking VRT IF header only
+ **********************************************************************/
+UHD_INLINE void __if_hdr_unpack_$(suffix)(
const boost::uint32_t *packet_buff,
- if_packet_info_t &if_packet_info
+ if_packet_info_t &if_packet_info,
+ const boost::uint32_t vrt_hdr_word32
){
- //extract vrt header
- boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]);
- size_t packet_words32 = vrt_hdr_word & 0xffff;
+ const size_t packet_words32 = vrt_hdr_word32 & 0xffff;
//failure case
if (if_packet_info.num_packet_words32 < packet_words32)
throw uhd::value_error("bad vrt header or packet fragment");
//extract fields from the header
- if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29);
- if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf;
+ if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word32 >> 29);
+ if_packet_info.packet_count = (vrt_hdr_word32 >> 16) & 0xf;
- const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];
+ const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word32)];
size_t empty_bytes = 0;
@@ -259,6 +295,85 @@ void vrt::if_hdr_unpack_$(suffix)(
}
}
+/***********************************************************************
+ * link layer + VRT IF packing
+ **********************************************************************/
+void vrt::if_hdr_pack_$(suffix)(
+ boost::uint32_t *packet_buff,
+ if_packet_info_t &if_packet_info
+){
+ boost::uint32_t vrt_hdr_word32 = 0;
+ switch (if_packet_info.link_type)
+ {
+ case if_packet_info_t::LINK_TYPE_NONE:
+ __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ packet_buff[0] = $(XE_MACRO)(vrt_hdr_word32);
+ break;
+
+ case if_packet_info_t::LINK_TYPE_CHDR:
+ {
+ __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ const boost::uint32_t chdr = vrt_to_chdr(vrt_hdr_word32, if_packet_info);
+ packet_buff[0] = $(XE_MACRO)(chdr);
+ break;
+ }
+
+ case if_packet_info_t::LINK_TYPE_VRLP:
+ __if_hdr_pack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32);
+ if_packet_info.num_header_words32 += 2;
+ if_packet_info.num_packet_words32 += 3;
+ packet_buff[0] = $(XE_MACRO)(VRLP);
+ packet_buff[1] = $(XE_MACRO)(boost::uint32_t(
+ (if_packet_info.num_packet_words32 & 0xfffff) |
+ ((if_packet_info.packet_count & 0xfff) << 20)
+ ));
+ packet_buff[2] = $(XE_MACRO)(vrt_hdr_word32);
+ packet_buff[if_packet_info.num_packet_words32-1] = $(XE_MACRO)(VEND);
+ break;
+ }
+}
+
+/***********************************************************************
+ * link layer + VRT IF unpacking
+ **********************************************************************/
+void vrt::if_hdr_unpack_$(suffix)(
+ const boost::uint32_t *packet_buff,
+ if_packet_info_t &if_packet_info
+){
+ boost::uint32_t vrt_hdr_word32 = 0;
+ switch (if_packet_info.link_type)
+ {
+ case if_packet_info_t::LINK_TYPE_NONE:
+ vrt_hdr_word32 = $(XE_MACRO)(packet_buff[0]);
+ __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ break;
+
+ case if_packet_info_t::LINK_TYPE_CHDR:
+ {
+ const boost::uint32_t chdr = $(XE_MACRO)(packet_buff[0]);
+ vrt_hdr_word32 = chdr_to_vrt(chdr, if_packet_info);
+ size_t packet_count = if_packet_info.packet_count;
+ __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ if_packet_info.num_payload_bytes -= (~chdr + 1) & 0x3;
+ if_packet_info.packet_count = packet_count;
+ break;
+ }
+
+ case if_packet_info_t::LINK_TYPE_VRLP:
+ {
+ if ($(XE_MACRO)(packet_buff[0]) != VRLP) throw uhd::value_error("bad vrl header VRLP");
+ const boost::uint32_t vrl_hdr = $(XE_MACRO)(packet_buff[1]);
+ vrt_hdr_word32 = $(XE_MACRO)(packet_buff[2]);
+ if (if_packet_info.num_packet_words32 < (vrl_hdr & 0xfffff)) throw uhd::value_error("bad vrl header or packet fragment");
+ if ($(XE_MACRO)(packet_buff[(vrl_hdr & 0xfffff)-1]) != VEND) throw uhd::value_error("bad vrl trailer VEND");
+ __if_hdr_unpack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32);
+ if_packet_info.num_header_words32 += 2; //add vrl header
+ if_packet_info.packet_count = (vrl_hdr >> 20) & 0xfff;
+ break;
+ }
+ }
+}
+
########################################################################
#end def
########################################################################
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index d4ec874f1..0ef53db0a 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2011 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -23,6 +23,7 @@
#include <boost/weak_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/foreach.hpp>
+#include <cstdlib>
#include <iostream>
using namespace uhd;
@@ -59,6 +60,15 @@ libusb::session::sptr libusb::session::get_global_session(void){
//create a new global session
sptr new_global_session(new libusb_session_impl());
global_session = new_global_session;
+
+ //set logging if envvar is set
+ const char *level_string = getenv("LIBUSB_DEBUG_LEVEL");
+ if (level_string != NULL)
+ {
+ const int level = int(level_string[0] - '0'); //easy conversion to integer
+ if (level >= 0 and level <= 3) libusb_set_debug(new_global_session->get_context(), level);
+ }
+
return new_global_session;
}
@@ -137,8 +147,13 @@ public:
return _desc;
}
- std::string get_ascii_serial(void) const{
- if (this->get().iSerialNumber == 0) return "";
+ std::string get_ascii_property(const std::string &what) const
+ {
+ boost::uint8_t off = 0;
+ if (what == "serial") off = this->get().iSerialNumber;
+ if (what == "product") off = this->get().iProduct;
+ if (what == "manufacturer") off = this->get().iManufacturer;
+ if (off == 0) return "";
libusb::device_handle::sptr handle(
libusb::device_handle::get_cached_handle(_dev)
@@ -146,7 +161,7 @@ public:
unsigned char buff[512];
ssize_t ret = libusb_get_string_descriptor_ascii(
- handle->get(), this->get().iSerialNumber, buff, sizeof(buff)
+ handle->get(), off, buff, sizeof(buff)
);
if (ret < 0) return ""; //on error, just return empty string
@@ -240,7 +255,15 @@ public:
}
std::string get_serial(void) const{
- return libusb::device_descriptor::make(this->get_device())->get_ascii_serial();
+ return libusb::device_descriptor::make(this->get_device())->get_ascii_property("serial");
+ }
+
+ std::string get_manufacturer() const{
+ return libusb::device_descriptor::make(this->get_device())->get_ascii_property("manufacturer");
+ }
+
+ std::string get_product() const{
+ return libusb::device_descriptor::make(this->get_device())->get_ascii_property("product");
}
boost::uint16_t get_vendor_id(void) const{
diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp
index 04c1d6574..7dab07fda 100644
--- a/host/lib/transport/libusb1_base.hpp
+++ b/host/lib/transport/libusb1_base.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -102,7 +102,7 @@ namespace libusb {
//! get the underlying device descriptor
virtual const libusb_device_descriptor &get(void) const = 0;
- virtual std::string get_ascii_serial(void) const = 0;
+ virtual std::string get_ascii_property(const std::string &what) const = 0;
};
/*!
diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp
index 3d9b38785..c1b8fe6df 100644
--- a/host/lib/transport/libusb1_control.cpp
+++ b/host/lib/transport/libusb1_control.cpp
@@ -21,8 +21,6 @@
using namespace uhd::transport;
-const int libusb_timeout = 0;
-
/***********************************************************************
* libusb-1.0 implementation of USB control transport
**********************************************************************/
@@ -39,7 +37,8 @@ public:
boost::uint16_t value,
boost::uint16_t index,
unsigned char *buff,
- boost::uint16_t length
+ boost::uint16_t length,
+ boost::int32_t libusb_timeout = 0
){
boost::mutex::scoped_lock lock(_mutex);
return libusb_control_transfer(_handle->get(),
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 28bff9709..197e257da 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2012 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -18,11 +18,17 @@
#include "libusb1_base.hpp"
#include <uhd/transport/usb_zero_copy.hpp>
#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/exception.hpp>
#include <boost/foreach.hpp>
+#include <boost/format.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
-#include <boost/thread/thread.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
#include <list>
using namespace uhd;
@@ -36,14 +42,51 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
#define LIBUSB_CALL
#endif /*LIBUSB_CALL*/
+//! libusb_handle_events_timeout_completed is only in newer API
+#ifndef HAVE_LIBUSB_HANDLE_EVENTS_TIMEOUT_COMPLETED
+ #define libusb_handle_events_timeout_completed(ctx, tx, completed) \
+ libusb_handle_events_timeout(ctx, tx)
+#endif
+
+//! libusb_error_name is only in newer API
+#ifndef HAVE_LIBUSB_ERROR_NAME
+ #define libusb_error_name(code) \
+ str(boost::format("LIBUSB_ERROR_CODE %d") % code)
+#endif
+
+//! type for sharing the release queue with managed buffers
+class libusb_zero_copy_mb;
+typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb *> > mb_queue_sptr;
+
+/*!
+ * The libusb docs state that status and actual length can only be read in the callback.
+ * Therefore, this struct is intended to store data seen from the callback function.
+ */
+struct lut_result_t
+{
+ lut_result_t(void)
+ {
+ completed = 0;
+ status = LIBUSB_TRANSFER_COMPLETED;
+ actual_length = 0;
+ }
+ int completed;
+ libusb_transfer_status status;
+ int actual_length;
+};
+
/*!
* All libusb callback functions should be marked with the LIBUSB_CALL macro
* to ensure that they are compiled with the same calling convention as libusb.
*/
//! helper function: handles all async callbacks
-static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){
- *(static_cast<bool *>(lut->user_data)) = true;
+static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
+{
+ lut_result_t *r = (lut_result_t *)lut->user_data;
+ r->completed = 1;
+ r->status = lut->status;
+ r->actual_length = lut->actual_length;
}
/*!
@@ -61,7 +104,8 @@ static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){
* \param completed a reference to the completed flag
* \return true for completion, false for timeout
*/
-UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){
+UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, int &completed)
+{
//already completed by a previous call?
if (completed) return true;
@@ -69,7 +113,7 @@ UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, b
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
- libusb_handle_events_timeout(ctx, &tv);
+ libusb_handle_events_timeout_completed(ctx, &tv, &completed);
if (completed) return true;
//finish the rest with a timeout loop
@@ -78,73 +122,54 @@ UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, b
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10000; /*10ms*/
- libusb_handle_events_timeout(ctx, &tv);
+ libusb_handle_events_timeout_completed(ctx, &tv, &completed);
}
return completed;
}
/***********************************************************************
- * Reusable managed receiver buffer:
+ * Reusable managed buffer:
* - Associated with a particular libusb transfer struct.
* - Submits the transfer to libusb in the release method.
**********************************************************************/
-class libusb_zero_copy_mrb : public managed_recv_buffer{
+class libusb_zero_copy_mb : public managed_buffer
+{
public:
- libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size):
+ libusb_zero_copy_mb(libusb_transfer *lut, const size_t frame_size, boost::function<void(libusb_zero_copy_mb *)> release_cb, const bool is_recv, const std::string &name):
+ _release_cb(release_cb), _is_recv(is_recv), _name(name),
_ctx(libusb::session::get_global_session()->get_context()),
_lut(lut), _frame_size(frame_size) { /* NOP */ }
- void release(void){
- completed = false;
- _lut->length = _frame_size; //always reset length
- UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
- }
-
- sptr get_new(const double timeout, size_t &index){
- if (wait_for_completion(_ctx, timeout, completed)){
- index++;
- return make(this, _lut->buffer, _lut->actual_length);
- }
- return managed_recv_buffer::sptr();
- }
-
- bool completed;
-
-private:
- libusb_context *_ctx;
- libusb_transfer *_lut;
- const size_t _frame_size;
-};
-
-/***********************************************************************
- * Reusable managed send buffer:
- * - Associated with a particular libusb transfer struct.
- * - Submits the transfer to libusb in the commit method.
- **********************************************************************/
-class libusb_zero_copy_msb : public managed_send_buffer{
-public:
- libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size):
- _ctx(libusb::session::get_global_session()->get_context()),
- _lut(lut), _frame_size(frame_size) { completed = true; }
+ void release(void){_release_cb(this);}
- void release(void){
- completed = false;
- _lut->length = size();
- UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ UHD_INLINE void submit(void)
+ {
+ _lut->length = (_is_recv)? _frame_size : size(); //always set length
+ const int ret = libusb_submit_transfer(_lut);
+ if (ret != 0) throw uhd::runtime_error(str(boost::format(
+ "usb %s submit failed: %s") % _name % libusb_error_name(ret)));
}
- sptr get_new(const double timeout, size_t &index){
- if (wait_for_completion(_ctx, timeout, completed)){
- index++;
- return make(this, _lut->buffer, _frame_size);
+ template <typename buffer_type>
+ UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
+ {
+ if (wait_for_completion(_ctx, timeout, result.completed))
+ {
+ if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format(
+ "usb %s transfer status: %d") % _name % int(result.status)));
+ result.completed = 0;
+ return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size);
}
- return managed_send_buffer::sptr();
+ return typename buffer_type::sptr();
}
- bool completed;
+ lut_result_t result;
private:
+ boost::function<void(libusb_zero_copy_mb *)> _release_cb;
+ const bool _is_recv;
+ const std::string _name;
libusb_context *_ctx;
libusb_transfer *_lut;
const size_t _frame_size;
@@ -153,39 +178,33 @@ private:
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
-class libusb_zero_copy_impl : public usb_zero_copy{
+class libusb_zero_copy_single
+{
public:
-
- libusb_zero_copy_impl(
+ libusb_zero_copy_single(
libusb::device_handle::sptr handle,
- const size_t recv_interface,
- const size_t recv_endpoint,
- const size_t send_interface,
- const size_t send_endpoint,
- const device_addr_t &hints
+ const size_t interface, const size_t endpoint,
+ const size_t num_frames, const size_t frame_size
):
_handle(handle),
- _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
- _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),
- _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
- _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _next_recv_buff_index(0),
- _next_send_buff_index(0)
+ _num_frames(num_frames),
+ _frame_size(frame_size),
+ _buffer_pool(buffer_pool::make(_num_frames, _frame_size)),
+ _enqueued(_num_frames), _released(_num_frames)
{
- _handle->claim_interface(recv_interface);
- _handle->claim_interface(send_interface);
+ const bool is_recv = (endpoint & 0x80) != 0;
+ const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f));
+ _handle->claim_interface(interface);
//flush the buffers out of the recv endpoint
//limit the flushing to at most one second
- for (size_t i = 0; i < 100; i++)
+ if (is_recv) for (size_t i = 0; i < 100; i++)
{
unsigned char buff[512];
int transfered = 0;
const int status = libusb_bulk_transfer(
_handle->get(), // dev_handle
- (recv_endpoint & 0x7f) | 0x80, // endpoint
+ endpoint, // endpoint
static_cast<unsigned char *>(buff),
sizeof(buff),
&transfered, //bytes xfered
@@ -194,102 +213,170 @@ public:
if (status == LIBUSB_ERROR_TIMEOUT) break;
}
- //allocate libusb transfer structs and managed receive buffers
- for (size_t i = 0; i < get_num_recv_frames(); i++){
-
+ //allocate libusb transfer structs and managed buffers
+ for (size_t i = 0; i < get_num_frames(); i++)
+ {
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size()));
+ _mb_pool.push_back(boost::make_shared<libusb_zero_copy_mb>(
+ lut, this->get_frame_size(), boost::bind(&libusb_zero_copy_single::enqueue_damn_buffer, this, _1), is_recv, name
+ ));
libusb_fill_bulk_transfer(
lut, // transfer
_handle->get(), // dev_handle
- (recv_endpoint & 0x7f) | 0x80, // endpoint
- static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer
- this->get_recv_frame_size(), // length
+ endpoint, // endpoint
+ static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer
+ this->get_frame_size(), // length
libusb_transfer_cb_fn(&libusb_async_cb), // callback
- static_cast<void *>(&_mrb_pool.back()->completed), // user_data
+ static_cast<void *>(&_mb_pool.back()->result), // user_data
0 // timeout (ms)
);
_all_luts.push_back(lut);
- _mrb_pool.back()->release();
}
- //allocate libusb transfer structs and managed send buffers
- for (size_t i = 0; i < get_num_send_frames(); i++){
-
- libusb_transfer *lut = libusb_alloc_transfer(0);
- UHD_ASSERT_THROW(lut != NULL);
-
- _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size()));
-
- libusb_fill_bulk_transfer(
- lut, // transfer
- _handle->get(), // dev_handle
- (send_endpoint & 0x7f) | 0x00, // endpoint
- static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer
- this->get_send_frame_size(), // length
- libusb_transfer_cb_fn(&libusb_async_cb), // callback
- static_cast<void *>(&_msb_pool.back()->completed), // user_data
- 0 // timeout
- );
-
- _all_luts.push_back(lut);
+ //initial release for all buffers
+ for (size_t i = 0; i < get_num_frames(); i++)
+ {
+ libusb_zero_copy_mb &mb = *(_mb_pool[i]);
+ if (is_recv) mb.release();
+ else
+ {
+ mb.result.completed = 1;
+ _enqueued.push_back(&mb);
+ }
}
}
- ~libusb_zero_copy_impl(void){
+ ~libusb_zero_copy_single(void)
+ {
libusb_context *ctx = libusb::session::get_global_session()->get_context();
//cancel all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts)
+ {
libusb_cancel_transfer(lut);
}
//process all transfers until timeout occurs
- bool completed = false;
+ int completed = 0;
wait_for_completion(ctx, 0.01, completed);
//free all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts)
+ {
libusb_free_transfer(lut);
}
-
}
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
- return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
- }
+ template <typename buffer_type>
+ UHD_INLINE typename buffer_type::sptr get_buff(double timeout)
+ {
+ typename buffer_type::sptr buff;
+ libusb_zero_copy_mb *front = NULL;
+ {
+ boost::mutex::scoped_lock l(_mutex);
+ if (_enqueued.empty())
+ {
+ _cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6)));
+ }
+ if (_enqueued.empty()) return buff;
+ front = _enqueued.front();
+ }
- managed_send_buffer::sptr get_send_buff(double timeout){
- if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
- return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
- }
+ buff = front->get_new<buffer_type>(timeout);
- size_t get_num_recv_frames(void) const { return _num_recv_frames; }
- size_t get_num_send_frames(void) const { return _num_send_frames; }
+ boost::mutex::scoped_lock l(_mutex);
+ if (buff) _enqueued.pop_front();
+ this->submit_what_we_can();
+ return buff;
+ }
- size_t get_recv_frame_size(void) const { return _recv_frame_size; }
- size_t get_send_frame_size(void) const { return _send_frame_size; }
+ UHD_INLINE size_t get_num_frames(void) const { return _num_frames; }
+ UHD_INLINE size_t get_frame_size(void) const { return _frame_size; }
private:
libusb::device_handle::sptr _handle;
- const size_t _recv_frame_size, _num_recv_frames;
- const size_t _send_frame_size, _num_send_frames;
+ const size_t _num_frames, _frame_size;
//! Storage for transfer related objects
- buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool;
- std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool;
- size_t _next_recv_buff_index, _next_send_buff_index;
+ buffer_pool::sptr _buffer_pool;
+ std::vector<boost::shared_ptr<libusb_zero_copy_mb> > _mb_pool;
+
+ boost::mutex _mutex;
+ boost::condition_variable _cond;
+
+ //! why 2 queues? there is room in the future to have > N buffers but only N in flight
+ boost::circular_buffer<libusb_zero_copy_mb *> _enqueued, _released;
+
+ void enqueue_damn_buffer(libusb_zero_copy_mb *mb)
+ {
+ boost::mutex::scoped_lock l(_mutex);
+ _released.push_back(mb);
+ this->submit_what_we_can();
+ l.unlock();
+ _cond.notify_one();
+ }
+
+ void submit_what_we_can(void)
+ {
+ while (not _released.empty() and not _enqueued.full())
+ {
+ _released.front()->submit();
+ _enqueued.push_back(_released.front());
+ _released.pop_front();
+ }
+ }
//! a list of all transfer structs we allocated
std::list<libusb_transfer *> _all_luts;
+};
+
+/***********************************************************************
+ * USB zero_copy device class
+ **********************************************************************/
+struct libusb_zero_copy_impl : usb_zero_copy
+{
+ libusb_zero_copy_impl(
+ libusb::device_handle::sptr handle,
+ const size_t recv_interface,
+ const size_t recv_endpoint,
+ const size_t send_interface,
+ const size_t send_endpoint,
+ const device_addr_t &hints
+ ){
+ _recv_impl.reset(new libusb_zero_copy_single(
+ handle, recv_interface, (recv_endpoint & 0x7f) | 0x80,
+ size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS)),
+ size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))));
+ _send_impl.reset(new libusb_zero_copy_single(
+ handle, send_interface, (send_endpoint & 0x7f) | 0x00,
+ size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)),
+ size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))));
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout)
+ {
+ boost::mutex::scoped_lock l(_recv_mutex);
+ return _recv_impl->get_buff<managed_recv_buffer>(timeout);
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ boost::mutex::scoped_lock l(_send_mutex);
+ return _send_impl->get_buff<managed_send_buffer>(timeout);
+ }
+
+ size_t get_num_recv_frames(void) const { return _recv_impl->get_num_frames(); }
+ size_t get_num_send_frames(void) const { return _send_impl->get_num_frames(); }
+ size_t get_recv_frame_size(void) const { return _recv_impl->get_frame_size(); }
+ size_t get_send_frame_size(void) const { return _send_impl->get_frame_size(); }
+ boost::shared_ptr<libusb_zero_copy_single> _recv_impl, _send_impl;
+ boost::mutex _recv_mutex, _send_mutex;
};
/***********************************************************************
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 5a75d5f0d..688228e49 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -63,6 +63,8 @@ static inline void handle_overflow_nop(void){}
class recv_packet_handler{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<void(const size_t)> handle_flowctrl_type;
+ typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
@@ -139,6 +141,19 @@ public:
_props.at(xport_chan).get_buff = get_buff;
}
+ /*!
+ * Set the function to handle flow control
+ * \param xport_chan which transport channel
+ * \param handle_flowctrl the callback function
+ */
+ void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false)
+ {
+ _props.at(xport_chan).handle_flowctrl = handle_flowctrl;
+ //we need the window size to be within the 0xfff (max 12 bit seq)
+ _props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff);
+ if (do_init) handle_flowctrl(0);
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_outputs = id.num_outputs;
@@ -158,6 +173,21 @@ public:
_converter->set_scalar(scale_factor);
}
+ //! Set the callback to issue stream commands
+ void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd)
+ {
+ _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd;
+ }
+
+ //! Overload call to issue stream commands
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ for (size_t i = 0; i < _props.size(); i++)
+ {
+ if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
+ }
+ }
+
/*******************************************************************
* Receive:
* The entry point for the fast-path receive calls.
@@ -219,8 +249,11 @@ private:
handle_overflow(&handle_overflow_nop)
{}
get_buff_type get_buff;
+ issue_stream_cmd_type issue_stream_cmd;
size_t packet_count;
handle_overflow_type handle_overflow;
+ handle_flowctrl_type handle_flowctrl;
+ size_t fc_update_window;
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -302,6 +335,15 @@ private:
info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+ //handle flow control
+ if (_props[index].handle_flowctrl)
+ {
+ if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0)
+ {
+ _props[index].handle_flowctrl(info.ifpi.packet_count);
+ }
+ }
+
//--------------------------------------------------------------
//-- Determine return conditions:
//-- The order of these checks is HOLY.
@@ -314,8 +356,9 @@ private:
//2) check for sequence errors
#ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;
const size_t expected_packet_count = _props[index].packet_count;
- _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
+ _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
if (expected_packet_count != info.ifpi.packet_count){
return PACKET_SEQUENCE_ERROR;
}
@@ -459,7 +502,7 @@ private:
curr_info.metadata.start_of_burst = false;
curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
- UHD_MSG(fastpath) << "O";
+ UHD_MSG(fastpath) << "D";
return;
}
@@ -479,6 +522,7 @@ private:
curr_info.metadata.start_of_burst = false;
curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ _props[index].handle_overflow();
return;
}
@@ -622,6 +666,11 @@ public:
return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
}
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ return recv_packet_handler::issue_stream_cmd(stream_cmd);
+ }
+
private:
size_t _max_num_samps;
};
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 726742327..41f030ea6 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -47,6 +47,7 @@ namespace uhd{ namespace transport{ namespace sph{
class send_packet_handler{
public:
typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;
typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
@@ -57,6 +58,7 @@ public:
send_packet_handler(const size_t size = 1):
_next_packet_seq(0)
{
+ this->set_enable_trailer(true);
this->resize(size);
}
@@ -96,6 +98,11 @@ public:
_props.at(xport_chan).sid = sid;
}
+ void set_enable_trailer(const bool enable)
+ {
+ _has_tlr = enable;
+ }
+
//! Set the rate of ticks per second
void set_tick_rate(const double rate){
_tick_rate = rate;
@@ -138,6 +145,21 @@ public:
_converter->set_scalar(scale_factor);
}
+ //! Set the callback to get async messages
+ void set_async_receiver(const async_receiver_type &async_receiver)
+ {
+ _async_receiver = async_receiver;
+ }
+
+ //! Overload call to get async metadata
+ bool recv_async_msg(
+ uhd::async_metadata_t &async_metadata, double timeout = 0.1
+ ){
+ if (_async_receiver) return _async_receiver(async_metadata, timeout);
+ boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6)));
+ return false;
+ }
+
/*******************************************************************
* Send:
* The entry point for the fast-path send calls.
@@ -154,7 +176,7 @@ public:
if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
//if_packet_info.has_sid = false; //set per channel
if_packet_info.has_cid = false;
- if_packet_info.has_tlr = true;
+ if_packet_info.has_tlr = _has_tlr;
if_packet_info.has_tsi = false;
if_packet_info.has_tsf = metadata.has_time_spec;
if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate);
@@ -165,9 +187,12 @@ public:
//TODO remove this code when sample counts of zero are supported by hardware
#ifndef SSPH_DONT_PAD_TO_ONE
- if (nsamps_per_buff == 0) return send_one_packet(
- _zero_buffs, 1, if_packet_info, timeout
- ) & 0x0;
+ static const boost::uint64_t zero = 0;
+ _zero_buffs.resize(buffs.size(), &zero);
+
+ if (nsamps_per_buff == 0) return send_one_packet(
+ _zero_buffs, 1, if_packet_info, timeout
+ ) & 0x0;
#endif
return send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout);
@@ -228,6 +253,8 @@ private:
size_t _max_samples_per_packet;
std::vector<const void *> _zero_buffs;
size_t _next_packet_seq;
+ bool _has_tlr;
+ async_receiver_type _async_receiver;
/*******************************************************************
* Send a single packet:
@@ -337,6 +364,12 @@ public:
return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout);
}
+ bool recv_async_msg(
+ uhd::async_metadata_t &async_metadata, double timeout = 0.1
+ ){
+ return send_packet_handler::recv_async_msg(async_metadata, timeout);
+ }
+
private:
size_t _max_num_samps;
};
diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp
deleted file mode 100644
index d04244ca9..000000000
--- a/host/lib/transport/usb_zero_copy_wrapper.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-//
-// Copyright 2011-2012 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#include <uhd/transport/usb_zero_copy.hpp>
-#include <uhd/transport/buffer_pool.hpp>
-#include <uhd/utils/byteswap.hpp>
-#include <uhd/utils/msg.hpp>
-#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/atomic.hpp>
-#include <boost/foreach.hpp>
-#include <boost/make_shared.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <boost/bind.hpp>
-#include <vector>
-#include <iostream>
-
-using namespace uhd;
-using namespace uhd::transport;
-
-static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1));
-
-/***********************************************************************
- * USB zero copy wrapper - managed receive buffer
- **********************************************************************/
-class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{
-public:
- usb_zero_copy_wrapper_mrb(void){/*NOP*/}
-
- void release(void){
- _mrb.reset(); //decrement ref count, other MRB's may hold a ref
- _claimer.release();
- }
-
- UHD_INLINE sptr get_new(
- managed_recv_buffer::sptr &mrb, size_t &offset_bytes,
- const double timeout, size_t &index
- ){
- if (not mrb or not _claimer.claim_with_wait(timeout)) return sptr();
-
- index++; //advances the caller's buffer
-
- //hold a copy of the buffer shared pointer
- _mrb = mrb;
-
- //extract this packet's memory address and length in bytes
- char *mem = mrb->cast<char *>() + offset_bytes;
- const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem);
- const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header)
- const size_t len = words32*sizeof(boost::uint32_t); //length in bytes
-
- //check if this receive buffer has been exhausted
- offset_bytes += len;
- if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref
- else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset();
-
- return make(this, mem, len);
- }
-
-private:
- managed_recv_buffer::sptr _mrb;
- simple_claimer _claimer;
-};
-
-/***********************************************************************
- * USB zero copy wrapper - managed send buffer
- **********************************************************************/
-class usb_zero_copy_wrapper_msb : public managed_send_buffer{
-public:
- usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size):
- _internal(internal), _fragmentation_size(fragmentation_size)
- {
- _ok_to_auto_flush = false;
- _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this));
- }
-
- ~usb_zero_copy_wrapper_msb(void)
- {
- //ensure the task has exited before anything auto deconstructs
- _task.reset();
- }
-
- void release(void){
- boost::mutex::scoped_lock lock(_mutex);
- _ok_to_auto_flush = true;
-
- //get a reference to the VITA header before incrementing
- const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0];
-
- _bytes_in_buffer += size();
- _mem_buffer_tip += size();
-
- //extract VITA end of packet flag, we must force flush under eof conditions
- const bool eop = (uhd::wtohx(vita_header) & (0x1 << 24)) != 0;
- const bool full = _bytes_in_buffer >= (_last_send_buff->size() - _fragmentation_size);
- if (eop or full){
- _last_send_buff->commit(_bytes_in_buffer);
- _last_send_buff.reset();
-
- //notify the auto-flusher to restart its timed_wait
- lock.unlock(); _cond.notify_one();
- }
- }
-
- UHD_INLINE sptr get_new(const double timeout){
- boost::mutex::scoped_lock lock(_mutex);
- _ok_to_auto_flush = false;
-
- if (not _last_send_buff){
- _last_send_buff = _internal->get_send_buff(timeout);
- if (not _last_send_buff) return sptr();
- _mem_buffer_tip = _last_send_buff->cast<char *>();
- _bytes_in_buffer = 0;
- }
-
- return make(this, _mem_buffer_tip, _fragmentation_size);
- }
-
-private:
- usb_zero_copy::sptr _internal;
- const size_t _fragmentation_size;
- managed_send_buffer::sptr _last_send_buff;
- size_t _bytes_in_buffer;
- char *_mem_buffer_tip;
-
- //private variables for auto flusher
- boost::mutex _mutex;
- boost::condition_variable _cond;
- uhd::task::sptr _task;
- bool _ok_to_auto_flush;
-
- /*!
- * The auto flusher ensures that buffers are force committed when
- * the user has not called get_new() within a certain time window.
- */
- void auto_flush(void)
- {
- boost::mutex::scoped_lock lock(_mutex);
- const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT);
- if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0)
- {
- _last_send_buff->commit(_bytes_in_buffer);
- _last_send_buff.reset();
- }
- }
-};
-
-/***********************************************************************
- * USB zero copy wrapper implementation
- **********************************************************************/
-class usb_zero_copy_wrapper : public usb_zero_copy{
-public:
- usb_zero_copy_wrapper(sptr usb_zc, const size_t frame_boundary):
- _internal_zc(usb_zc),
- _frame_boundary(frame_boundary),
- _next_recv_buff_index(0)
- {
- for (size_t i = 0; i < this->get_num_recv_frames(); i++){
- _mrb_pool.push_back(boost::make_shared<usb_zero_copy_wrapper_mrb>());
- }
- _the_only_msb = boost::make_shared<usb_zero_copy_wrapper_msb>(usb_zc, frame_boundary);
- }
-
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- //attempt to get a managed recv buffer
- if (not _last_recv_buff){
- _last_recv_buff = _internal_zc->get_recv_buff(timeout);
- _last_recv_offset = 0; //reset offset into buffer
- }
-
- //get the buffer to be returned to the user
- if (_next_recv_buff_index == _mrb_pool.size()) _next_recv_buff_index = 0;
- return _mrb_pool[_next_recv_buff_index]->get_new(
- _last_recv_buff, _last_recv_offset, timeout, _next_recv_buff_index
- );
- }
-
- size_t get_num_recv_frames(void) const{
- return _internal_zc->get_num_recv_frames();
- }
-
- size_t get_recv_frame_size(void) const{
- return std::min(_frame_boundary, _internal_zc->get_recv_frame_size());
- }
-
- managed_send_buffer::sptr get_send_buff(double timeout){
- return _the_only_msb->get_new(timeout);
- }
-
- size_t get_num_send_frames(void) const{
- return _internal_zc->get_num_send_frames();
- }
-
- size_t get_send_frame_size(void) const{
- return std::min(_frame_boundary, _internal_zc->get_send_frame_size());
- }
-
-private:
- sptr _internal_zc;
- size_t _frame_boundary;
- std::vector<boost::shared_ptr<usb_zero_copy_wrapper_mrb> > _mrb_pool;
- boost::shared_ptr<usb_zero_copy_wrapper_msb> _the_only_msb;
-
- //state for last recv buffer to create multiple managed buffers
- managed_recv_buffer::sptr _last_recv_buff;
- size_t _last_recv_offset;
- size_t _next_recv_buff_index;
-};
-
-/***********************************************************************
- * USB zero copy wrapper factory function
- **********************************************************************/
-usb_zero_copy::sptr usb_zero_copy::make_wrapper(
- sptr usb_zc, size_t usb_frame_boundary
-){
- return sptr(new usb_zero_copy_wrapper(usb_zc, usb_frame_boundary));
-}