aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt19
-rw-r--r--host/lib/transport/libusb1_base.cpp62
-rw-r--r--host/lib/transport/libusb1_base.hpp12
-rw-r--r--host/lib/transport/libusb1_control.cpp24
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp47
-rw-r--r--host/lib/transport/muxed_zero_copy_if.cpp250
-rw-r--r--host/lib/transport/nirio/lvbitx/CMakeLists.txt5
-rw-r--r--host/lib/transport/nirio/rpc/rpc_client.cpp24
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp127
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp65
-rw-r--r--host/lib/transport/usb_dummy_impl.cpp12
-rw-r--r--host/lib/transport/zero_copy_recv_offload.cpp158
12 files changed, 667 insertions, 138 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 6abc399b4..44c8d59af 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -22,17 +22,15 @@
########################################################################
# Include subdirectories (different than add)
########################################################################
-INCLUDE_SUBDIRECTORY(nirio)
+IF(ENABLE_X300)
+ INCLUDE_SUBDIRECTORY(nirio)
+ENDIF(ENABLE_X300)
########################################################################
# Setup libusb
########################################################################
-MESSAGE(STATUS "")
-FIND_PACKAGE(USB1)
-
-LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF)
-
IF(ENABLE_USB)
+ MESSAGE(STATUS "")
MESSAGE(STATUS "USB support enabled via libusb.")
INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIRS})
LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES})
@@ -124,14 +122,21 @@ LIBUHD_PYTHON_GEN_SOURCE(
)
LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp
${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp
${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp
)
+IF(ENABLE_X300)
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp
+ )
+ENDIF(ENABLE_X300)
+
# Verbose Debug output for send/recv
SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" )
OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" )
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index f92117a9e..7b9e11da9 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -47,10 +47,7 @@ public:
task_handler = task::make(boost::bind(&libusb_session_impl::libusb_event_handler_task, this, _context));
}
- ~libusb_session_impl(void){
- task_handler.reset();
- libusb_exit(_context);
- }
+ virtual ~libusb_session_impl(void);
libusb_context *get_context(void) const{
return _context;
@@ -86,6 +83,11 @@ private:
}
};
+libusb_session_impl::~libusb_session_impl(void){
+ task_handler.reset();
+ libusb_exit(_context);
+}
+
libusb::session::sptr libusb::session::get_global_session(void){
static boost::weak_ptr<session> global_session;
@@ -121,9 +123,7 @@ public:
_dev = dev;
}
- ~libusb_device_impl(void){
- libusb_unref_device(this->get());
- }
+ virtual ~libusb_device_impl(void);
libusb_device *get(void) const{
return _dev;
@@ -134,6 +134,10 @@ private:
libusb_device *_dev;
};
+libusb_device_impl::~libusb_device_impl(void){
+ libusb_unref_device(this->get());
+}
+
/***********************************************************************
* libusb device list
**********************************************************************/
@@ -160,6 +164,8 @@ public:
libusb_free_device_list(dev_list, false/*dont unref*/);
}
+ virtual ~libusb_device_list_impl(void);
+
size_t size(void) const{
return _devs.size();
}
@@ -172,6 +178,10 @@ private:
std::vector<libusb::device::sptr> _devs;
};
+libusb_device_list_impl::~libusb_device_list_impl(void){
+ /* NOP */
+}
+
libusb::device_list::sptr libusb::device_list::make(void){
return sptr(new libusb_device_list_impl());
}
@@ -190,6 +200,8 @@ public:
UHD_ASSERT_THROW(libusb_get_device_descriptor(_dev->get(), &_desc) == 0);
}
+ virtual ~libusb_device_descriptor_impl(void);
+
const libusb_device_descriptor &get(void) const{
return _desc;
}
@@ -207,12 +219,12 @@ public:
);
unsigned char buff[512];
- ssize_t ret = libusb_get_string_descriptor_ascii(
- handle->get(), off, buff, sizeof(buff)
+ int ret = libusb_get_string_descriptor_ascii(
+ handle->get(), off, buff, int(sizeof(buff))
);
if (ret < 0) return ""; //on error, just return empty string
- std::string string_descriptor((char *)buff, ret);
+ std::string string_descriptor((char *)buff, size_t(ret));
byte_vector_t string_vec(string_descriptor.begin(), string_descriptor.end());
std::string out;
BOOST_FOREACH(boost::uint8_t byte, string_vec){
@@ -227,6 +239,10 @@ private:
libusb_device_descriptor _desc;
};
+libusb_device_descriptor_impl::~libusb_device_descriptor_impl(void){
+ /* NOP */
+}
+
libusb::device_descriptor::sptr libusb::device_descriptor::make(device::sptr dev){
return sptr(new libusb_device_descriptor_impl(dev));
}
@@ -245,13 +261,7 @@ public:
UHD_ASSERT_THROW(libusb_open(_dev->get(), &_handle) == 0);
}
- ~libusb_device_handle_impl(void){
- //release all claimed interfaces
- for (size_t i = 0; i < _claimed.size(); i++){
- libusb_release_interface(this->get(), _claimed[i]);
- }
- libusb_close(_handle);
- }
+ virtual ~libusb_device_handle_impl(void);
libusb_device_handle *get(void) const{
return _handle;
@@ -283,6 +293,14 @@ private:
std::vector<int> _claimed;
};
+libusb_device_handle_impl::~libusb_device_handle_impl(void){
+ //release all claimed interfaces
+ for (size_t i = 0; i < _claimed.size(); i++){
+ libusb_release_interface(this->get(), _claimed[i]);
+ }
+ libusb_close(_handle);
+}
+
libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::sptr dev){
static uhd::dict<libusb_device *, boost::weak_ptr<device_handle> > handles;
@@ -327,6 +345,8 @@ public:
_dev = dev;
}
+ virtual ~libusb_special_handle_impl(void);
+
libusb::device::sptr get_device(void) const{
return _dev;
}
@@ -361,6 +381,10 @@ private:
libusb::device::sptr _dev; //always keep a reference to device
};
+libusb_special_handle_impl::~libusb_special_handle_impl(void){
+ /* NOP */
+}
+
libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){
return sptr(new libusb_special_handle_impl(dev));
}
@@ -368,6 +392,10 @@ libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){
/***********************************************************************
* list device handles implementations
**********************************************************************/
+usb_device_handle::~usb_device_handle(void) {
+ /* NOP */
+}
+
std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(
boost::uint16_t vid, boost::uint16_t pid
){
diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp
index 2ff1291d9..6d104bc75 100644
--- a/host/lib/transport/libusb1_base.hpp
+++ b/host/lib/transport/libusb1_base.hpp
@@ -67,7 +67,7 @@ namespace libusb {
public:
typedef boost::shared_ptr<session> sptr;
- virtual ~session(void) = 0;
+ virtual ~session(void);
/*!
* Level 0: no messages ever printed by the library (default)
@@ -92,7 +92,7 @@ namespace libusb {
public:
typedef boost::shared_ptr<device> sptr;
- virtual ~device(void) = 0;
+ virtual ~device(void);
//! get the underlying device pointer
virtual libusb_device *get(void) const = 0;
@@ -106,7 +106,7 @@ namespace libusb {
public:
typedef boost::shared_ptr<device_list> sptr;
- virtual ~device_list(void) = 0;
+ virtual ~device_list(void);
//! make a new device list
static sptr make(void);
@@ -125,7 +125,7 @@ namespace libusb {
public:
typedef boost::shared_ptr<device_descriptor> sptr;
- virtual ~device_descriptor(void) = 0;
+ virtual ~device_descriptor(void);
//! make a new descriptor from a device reference
static sptr make(device::sptr);
@@ -143,7 +143,7 @@ namespace libusb {
public:
typedef boost::shared_ptr<device_handle> sptr;
- virtual ~device_handle(void) = 0;
+ virtual ~device_handle(void);
//! get a cached handle or make a new one given the device
static sptr get_cached_handle(device::sptr);
@@ -172,7 +172,7 @@ namespace libusb {
public:
typedef boost::shared_ptr<special_handle> sptr;
- virtual ~special_handle(void) = 0;
+ virtual ~special_handle(void);
//! make a new special handle from device
static sptr make(device::sptr);
diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp
index 00c113163..a18f657d9 100644
--- a/host/lib/transport/libusb1_control.cpp
+++ b/host/lib/transport/libusb1_control.cpp
@@ -30,19 +30,21 @@ usb_control::~usb_control(void){
**********************************************************************/
class libusb_control_impl : public usb_control {
public:
- libusb_control_impl(libusb::device_handle::sptr handle, const size_t interface):
+ libusb_control_impl(libusb::device_handle::sptr handle, const int interface):
_handle(handle)
{
_handle->claim_interface(interface);
}
- ssize_t submit(boost::uint8_t request_type,
- boost::uint8_t request,
- boost::uint16_t value,
- boost::uint16_t index,
- unsigned char *buff,
- boost::uint16_t length,
- boost::int32_t libusb_timeout = 0
+ virtual ~libusb_control_impl(void);
+
+ int submit(boost::uint8_t request_type,
+ boost::uint8_t request,
+ boost::uint16_t value,
+ boost::uint16_t index,
+ unsigned char *buff,
+ boost::uint16_t length,
+ boost::uint32_t libusb_timeout = 0
){
boost::mutex::scoped_lock lock(_mutex);
return libusb_control_transfer(_handle->get(),
@@ -60,10 +62,14 @@ private:
boost::mutex _mutex;
};
+libusb_control_impl::~libusb_control_impl(void) {
+ /* NOP */
+}
+
/***********************************************************************
* USB control public make functions
**********************************************************************/
-usb_control::sptr usb_control::make(usb_device_handle::sptr handle, const size_t interface){
+usb_control::sptr usb_control::make(usb_device_handle::sptr handle, const int interface){
return sptr(new libusb_control_impl(libusb::device_handle::get_cached_handle(
boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()
), interface));
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 53e345009..c32b96b63 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -125,19 +125,21 @@ public:
_ctx(libusb::session::get_global_session()->get_context()),
_lut(lut), _frame_size(frame_size) { /* NOP */ }
+ virtual ~libusb_zero_copy_mb(void);
+
void release(void){
_release_cb(this);
}
UHD_INLINE void submit(void)
{
- _lut->length = (_is_recv)? _frame_size : size(); //always set length
+ _lut->length = int((_is_recv)? _frame_size : size()); //always set length
#ifdef UHD_TXRX_DEBUG_PRINTS
result.start_time = boost::get_system_time().time_of_day().total_microseconds();
result.buff_num = num();
result.is_recv = _is_recv;
#endif
- const int ret = libusb_submit_transfer(_lut);
+ int ret = libusb_submit_transfer(_lut);
if (ret != LIBUSB_SUCCESS)
throw uhd::usb_error(ret, str(boost::format(
"usb %s submit failed: %s") % _name % libusb_error_name(ret)));
@@ -152,7 +154,7 @@ public:
throw uhd::io_error(str(boost::format("usb %s transfer status: %d")
% _name % libusb_error_name(result.status)));
result.completed = 0;
- return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size);
+ return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? size_t(result.actual_length) : _frame_size);
}
return typename buffer_type::sptr();
}
@@ -189,6 +191,10 @@ private:
const size_t _frame_size;
};
+libusb_zero_copy_mb::~libusb_zero_copy_mb(void) {
+ /* NOP */
+}
+
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
@@ -197,7 +203,7 @@ class libusb_zero_copy_single
public:
libusb_zero_copy_single(
libusb::device_handle::sptr handle,
- const size_t interface, const size_t endpoint,
+ const int interface, const unsigned char endpoint,
const size_t num_frames, const size_t frame_size
):
_handle(handle),
@@ -221,7 +227,7 @@ public:
_handle->get(), // dev_handle
endpoint, // endpoint
static_cast<unsigned char *>(buff),
- sizeof(buff),
+ int(sizeof(buff)),
&transfered, //bytes xfered
10 //timeout ms
);
@@ -243,7 +249,7 @@ public:
_handle->get(), // dev_handle
endpoint, // endpoint
static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer
- this->get_frame_size(), // length
+ int(this->get_frame_size()), // length
libusb_transfer_cb_fn(&libusb_async_cb), // callback
static_cast<void *>(&_mb_pool.back()->result), // user_data
0 // timeout (ms)
@@ -372,10 +378,10 @@ struct libusb_zero_copy_impl : usb_zero_copy
{
libusb_zero_copy_impl(
libusb::device_handle::sptr handle,
- const size_t recv_interface,
- const size_t recv_endpoint,
- const size_t send_interface,
- const size_t send_endpoint,
+ const int recv_interface,
+ const unsigned char recv_endpoint,
+ const int send_interface,
+ const unsigned char send_endpoint,
const device_addr_t &hints
){
_recv_impl.reset(new libusb_zero_copy_single(
@@ -388,6 +394,8 @@ struct libusb_zero_copy_impl : usb_zero_copy
size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))));
}
+ virtual ~libusb_zero_copy_impl(void);
+
managed_recv_buffer::sptr get_recv_buff(double timeout)
{
boost::mutex::scoped_lock l(_recv_mutex);
@@ -410,15 +418,26 @@ struct libusb_zero_copy_impl : usb_zero_copy
boost::mutex _recv_mutex, _send_mutex;
};
+libusb_zero_copy_impl::~libusb_zero_copy_impl(void) {
+ /* NOP */
+}
+
+/***********************************************************************
+ * USB zero_copy destructor
+ **********************************************************************/
+usb_zero_copy::~usb_zero_copy(void) {
+ /* NOP */
+}
+
/***********************************************************************
* USB zero_copy make functions
**********************************************************************/
usb_zero_copy::sptr usb_zero_copy::make(
usb_device_handle::sptr handle,
- const size_t recv_interface,
- const size_t recv_endpoint,
- const size_t send_interface,
- const size_t send_endpoint,
+ const int recv_interface,
+ const unsigned char recv_endpoint,
+ const int send_interface,
+ const unsigned char send_endpoint,
const device_addr_t &hints
){
libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(
diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp
new file mode 100644
index 000000000..996db3c98
--- /dev/null
+++ b/host/lib/transport/muxed_zero_copy_if.cpp
@@ -0,0 +1,250 @@
+//
+// Copyright 2016 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/muxed_zero_copy_if.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/utils/safe_call.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/locks.hpp>
+#include <map>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+class muxed_zero_copy_if_impl : public muxed_zero_copy_if,
+ public boost::enable_shared_from_this<muxed_zero_copy_if_impl>
+{
+public:
+ typedef boost::shared_ptr<muxed_zero_copy_if_impl> sptr;
+
+ muxed_zero_copy_if_impl(
+ zero_copy_if::sptr base_xport,
+ stream_classifier_fn classify_fn,
+ size_t max_streams
+ ):
+ _base_xport(base_xport), _classify(classify_fn),
+ _max_num_streams(max_streams), _num_dropped_frames(0)
+ {
+ //Create the receive thread to poll the underlying transport
+ //and classify packets into queues
+ _recv_thread = boost::thread(
+ boost::bind(&muxed_zero_copy_if_impl::_update_queues, this));
+ }
+
+ virtual ~muxed_zero_copy_if_impl()
+ {
+ UHD_SAFE_CALL(
+ //Interrupt buffer updater loop
+ _recv_thread.interrupt();
+ //Wait for loop to finish
+ //No timeout on join. The recv loop is guaranteed
+ //to terminate in a reasonable amount of time because
+ //there are no timed blocks on the underlying.
+ _recv_thread.join();
+ //Flush base transport
+ while (_base_xport->get_recv_buff(0.0001)) /*NOP*/;
+ //Release child streams
+ //Note that this will not delete or flush the child streams
+ //until the owners of the streams have released the respective
+ //shared pointers. This ensures that packets are not dropped.
+ _streams.clear();
+ );
+ }
+
+ virtual zero_copy_if::sptr make_stream(const uint32_t stream_num)
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ if (_streams.size() >= _max_num_streams) {
+ throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams.");
+ }
+ stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num);
+ _streams[stream_num] = stream;
+ return stream;
+ }
+
+ virtual size_t get_num_dropped_frames() const
+ {
+ return _num_dropped_frames;
+ }
+
+ void remove_stream(const uint32_t stream_num)
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ _streams.erase(stream_num);
+ }
+
+private:
+ class stream_impl : public zero_copy_if
+ {
+ public:
+ typedef boost::shared_ptr<stream_impl> sptr;
+ typedef boost::weak_ptr<stream_impl> wptr;
+
+ stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num):
+ _stream_num(stream_num), _muxed_xport(muxed_xport),
+ _buff_queue(muxed_xport->base_xport()->get_num_recv_frames())
+ {
+ }
+
+ ~stream_impl(void)
+ {
+ //First remove the stream from muxed transport
+ //so no more frames are pushed in
+ _muxed_xport->remove_stream(_stream_num);
+ //Flush the transport
+ managed_recv_buffer::sptr buff;
+ while (_buff_queue.pop_with_haste(buff)) {
+ //NOP
+ }
+ }
+
+ size_t get_num_recv_frames(void) const {
+ return _muxed_xport->base_xport()->get_num_recv_frames();
+ }
+
+ size_t get_recv_frame_size(void) const {
+ return _muxed_xport->base_xport()->get_recv_frame_size();
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout) {
+ managed_recv_buffer::sptr buff;
+ if (_buff_queue.pop_with_timed_wait(buff, timeout)) {
+ return buff;
+ } else {
+ return managed_recv_buffer::sptr();
+ }
+
+ }
+
+ void push_recv_buff(managed_recv_buffer::sptr buff) {
+ _buff_queue.push_with_wait(buff);
+ }
+
+ size_t get_num_send_frames(void) const {
+ return _muxed_xport->base_xport()->get_num_send_frames();
+ }
+
+ size_t get_send_frame_size(void) const {
+ return _muxed_xport->base_xport()->get_send_frame_size();
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ return _muxed_xport->base_xport()->get_send_buff(timeout);
+ }
+
+ private:
+ const uint32_t _stream_num;
+ muxed_zero_copy_if_impl::sptr _muxed_xport;
+ bounded_buffer<managed_recv_buffer::sptr> _buff_queue;
+ };
+
+ inline zero_copy_if::sptr& base_xport() { return _base_xport; }
+
+ void _update_queues()
+ {
+ //Run forever:
+ // - Pull packets from the base transport
+ // - Classify them
+ // - Push them to the appropriate receive queue
+ while (true) {
+ { //Uninterruptable block of code
+ boost::this_thread::disable_interruption interrupt_disabler;
+ if (not _process_next_buffer()) {
+ //Be a good citizen and yield if no packet is processed
+ static const size_t MIN_DUR = 1;
+ boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
+ //We call sleep(MIN_DUR) above instead of yield() to ensure that we
+ //relinquish the current scheduler time slot.
+ //yield() is a hint to the scheduler to end the time
+ //slice early and schedule in another thread that is ready to run.
+ //However in most situations, there will be no other thread and
+ //this thread will continue to run which will rail a CPU core.
+ //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time.
+ //Ideally we would like to use boost::chrono::.*seconds::min() but that
+ //is bound to 0, which causes the sleep_for call to be a no-op and
+ //thus useless to actually force a sleep.
+
+ //****************************************************************
+ //NOTE: This behavior makes this transport a poor choice for
+ // low latency communication.
+ //****************************************************************
+ }
+ }
+ //Check if the master thread has requested a shutdown
+ if (boost::this_thread::interruption_requested()) break;
+ }
+ }
+
+ bool _process_next_buffer()
+ {
+ managed_recv_buffer::sptr buff = _base_xport->get_recv_buff(0.0);
+ if (buff) {
+ stream_impl::sptr stream;
+ try {
+ const uint32_t stream_num = _classify(buff->cast<void*>(), _base_xport->get_recv_frame_size());
+ {
+ //Hold the stream mutex long enough to pull a bounded buffer
+ //and lock it (increment its ref count).
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ stream_map_t::iterator str_iter = _streams.find(stream_num);
+ if (str_iter != _streams.end()) {
+ stream = (*str_iter).second.lock();
+ }
+ }
+ } catch (std::exception&) {
+ //If _classify throws we simply drop the frame
+ }
+ //Once a bounded buffer is acquired, we can rely on its
+ //thread safety to serialize with the consumer.
+ if (stream.get()) {
+ stream->push_recv_buff(buff);
+ } else {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ _num_dropped_frames++;
+ }
+ //We processed a packet, and there could be more coming
+ //Don't yield in the next iteration.
+ return true;
+ } else {
+ //The base transport is idle. Return false to let the
+ //thread yield.
+ return false;
+ }
+ }
+
+ typedef std::map<uint32_t, stream_impl::wptr> stream_map_t;
+
+ zero_copy_if::sptr _base_xport;
+ stream_classifier_fn _classify;
+ stream_map_t _streams;
+ const size_t _max_num_streams;
+ size_t _num_dropped_frames;
+ boost::thread _recv_thread;
+ boost::mutex _mutex;
+};
+
+muxed_zero_copy_if::sptr muxed_zero_copy_if::make(
+ zero_copy_if::sptr base_xport,
+ muxed_zero_copy_if::stream_classifier_fn classify_fn,
+ size_t max_streams
+) {
+ return boost::make_shared<muxed_zero_copy_if_impl>(base_xport, classify_fn, max_streams);
+}
diff --git a/host/lib/transport/nirio/lvbitx/CMakeLists.txt b/host/lib/transport/nirio/lvbitx/CMakeLists.txt
index b9a2a9f15..5741a12f8 100644
--- a/host/lib/transport/nirio/lvbitx/CMakeLists.txt
+++ b/host/lib/transport/nirio/lvbitx/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2013 Ettus Research LLC
+# Copyright 2013,2015 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -30,8 +30,8 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile)
SET(IMAGES_PATH_OPT --uhd-images-path=${UHD_IMAGES_DIR})
ADD_CUSTOM_COMMAND(
- OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp
+ OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/process-lvbitx.py
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.hpp
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.cpp
@@ -41,6 +41,7 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile)
)
#make libuhd depend on the output file
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp)
LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp)
ENDMACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM)
diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp
index bbaf9f235..3d62b57ae 100644
--- a/host/lib/transport/nirio/rpc/rpc_client.cpp
+++ b/host/lib/transport/nirio/rpc/rpc_client.cpp
@@ -1,5 +1,5 @@
///
-// Copyright 2013 Ettus Research LLC
+// Copyright 2013,2016 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -55,22 +55,7 @@ rpc_client::rpc_client (
tcp::resolver::query::flags query_flags(tcp::resolver::query::passive);
tcp::resolver::query query(tcp::v4(), server, port, query_flags);
tcp::resolver::iterator iterator = resolver.resolve(query);
-
- #if BOOST_VERSION < 104700
- // default constructor creates end iterator
- tcp::resolver::iterator end;
-
- boost::system::error_code error = boost::asio::error::host_not_found;
- while (error && iterator != end)
- {
- _socket.close();
- _socket.connect(*iterator++, error);
- }
- if (error)
- throw boost::system::system_error(error);
- #else
- boost::asio::connect(_socket, iterator);
- #endif
+ boost::asio::connect(_socket, iterator);
UHD_LOG << "rpc_client connected to server." << std::endl;
@@ -109,11 +94,6 @@ rpc_client::rpc_client (
} catch (boost::exception&) {
UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl;
_exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
-#if BOOST_VERSION < 104700
- } catch (std::exception& e) {
- UHD_LOG << "rpc_client connection error: " << e.what() << std::endl;
- _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
-#endif
}
}
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 8bfa1973a..5ca1da687 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -24,18 +24,19 @@
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#ifdef DEVICE3_STREAMER
+# include "../rfnoc/rx_stream_terminator.hpp"
+#endif
#include <boost/dynamic_bitset.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
-#include <boost/thread/barrier.hpp>
#include <iostream>
#include <vector>
@@ -92,22 +93,15 @@ public:
}
~recv_packet_handler(void){
- _task_barrier.interrupt();
- _task_handlers.clear();
+ /* NOP */
}
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
- _task_handlers.clear();
_props.resize(size);
//re-initialize all buffers infos by re-creating the vector
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
- _task_barrier.resize(size);
- _task_handlers.resize(size);
- for (size_t i = 1/*skip 0*/; i < size; i++){
- _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
- };
}
//! Get the channel width of this handler
@@ -121,13 +115,42 @@ public:
_header_offset_words32 = header_offset_words32;
}
+ ////////////////// RFNOC ///////////////////////////
+ //! Set the stream ID for a specific channel (or no SID)
+ void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){
+ _props.at(xport_chan).has_sid = has_sid;
+ _props.at(xport_chan).sid = sid;
+ }
+
+ //! Get the stream ID for a specific channel (or zero if no SID)
+ boost::uint32_t get_xport_chan_sid(const size_t xport_chan) const {
+ if (_props.at(xport_chan).has_sid) {
+ return _props.at(xport_chan).sid;
+ } else {
+ return 0;
+ }
+ }
+
+ #ifdef DEVICE3_STREAMER
+ void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator)
+ {
+ _terminator = terminator;
+ }
+
+ uhd::rfnoc::rx_stream_terminator::sptr get_terminator()
+ {
+ return _terminator;
+ }
+ #endif
+ ////////////////// RFNOC ///////////////////////////
+
/*!
* Set the threshold for alignment failure.
* How many packets throw out before giving up?
* \param threshold number of packets per channel
*/
void set_alignment_failure_threshold(const size_t threshold){
- _alignment_faulure_threshold = threshold*this->size();
+ _alignment_failure_threshold = threshold*this->size();
}
//! Set the rate of ticks per second
@@ -203,6 +226,13 @@ public:
//! Overload call to issue stream commands
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
{
+ // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this.
+ //if (stream_cmd.stream_now
+ //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
+ //and _props.size() > 1) {
+ //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
+ //}
+
for (size_t i = 0; i < _props.size(); i++)
{
if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
@@ -234,7 +264,7 @@ public:
buffs, nsamps_per_buff, metadata, timeout
);
- if (one_packet){
+ if (one_packet or metadata.end_of_burst){
#ifdef UHD_TXRX_DEBUG_PRINTS
dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
#endif
@@ -242,7 +272,9 @@ public:
}
//first recv had an error code set, return immediately
- if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps;
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ return accum_num_samps;
+ }
//loop until buffer is filled or error code
while(accum_num_samps < nsamps_per_buff){
@@ -256,10 +288,16 @@ public:
_queue_error_for_next_call = true;
break;
}
+
accum_num_samps += num_samps;
+
+ //return immediately if end of burst
+ if (_queue_metadata.end_of_burst) {
+ break;
+ }
}
#ifdef UHD_TXRX_DEBUG_PRINTS
- dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
+ dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet);
#endif
return accum_num_samps;
}
@@ -269,7 +307,7 @@ private:
size_t _header_offset_words32;
double _tick_rate, _samp_rate;
bool _queue_error_for_next_call;
- size_t _alignment_faulure_threshold;
+ size_t _alignment_failure_threshold;
rx_metadata_t _queue_metadata;
struct xport_chan_props_type{
xport_chan_props_type(void):
@@ -283,6 +321,10 @@ private:
handle_overflow_type handle_overflow;
handle_flowctrl_type handle_flowctrl;
size_t fc_update_window;
+ /////// RFNOC ///////////
+ bool has_sid;
+ boost::uint32_t sid;
+ /////// RFNOC ///////////
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -355,6 +397,10 @@ private:
int recvd_packets;
#endif
+ #ifdef DEVICE3_STREAMER
+ uhd::rfnoc::rx_stream_terminator::sptr _terminator;
+ #endif
+
/*******************************************************************
* Get and process a single packet from the transport:
* Receive a single packet at the given index.
@@ -421,6 +467,7 @@ private:
const size_t expected_packet_count = _props[index].packet_count;
_props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
if (expected_packet_count != info.ifpi.packet_count){
+ //UHD_MSG(status) << "expected: " << expected_packet_count << " got: " << info.ifpi.packet_count << std::endl;
if (_props[index].handle_flowctrl) {
// Always update flow control in this case, because we don't
// know which packet was dropped and what state the upstream
@@ -442,6 +489,10 @@ private:
void _flush_all(double timeout)
{
+ get_prev_buffer_info().reset();
+ get_curr_buffer_info().reset();
+ get_next_buffer_info().reset();
+
for (size_t i = 0; i < _props.size(); i++)
{
per_buffer_info_type prev_buffer_info, curr_buffer_info;
@@ -462,9 +513,6 @@ private:
curr_buffer_info.reset();
}
}
- get_prev_buffer_info().reset();
- get_curr_buffer_info().reset();
- get_next_buffer_info().reset();
}
/*******************************************************************
@@ -562,20 +610,27 @@ private:
curr_info.metadata.time_spec = next_info[index].time;
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
+ // Not sending flow control would cause timeouts due to source flow control locking up.
+ // Send first as the overrun handler may flush the receive buffers which could contain
+ // packets with sequence numbers after this packet's sequence number!
+ if(_props[index].handle_flowctrl) {
+ _props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
+ }
+
rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
curr_info.metadata = metadata;
UHD_MSG(fastpath) << "O";
-
- // Not sending flow control would cause timeouts due to source flow control locking up
- if(_props[index].handle_flowctrl) {
- _props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
- }
}
+ curr_info[index].buff.reset();
+ curr_info[index].copy_buff = NULL;
return;
case PACKET_TIMEOUT_ERROR:
std::swap(curr_info, next_info); //save progress from curr -> next
+ if(_props[index].handle_flowctrl) {
+ _props[index].handle_flowctrl(next_info[index].ifpi.packet_count);
+ }
curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
return;
@@ -593,7 +648,7 @@ private:
}
//too many iterations: detect alignment failure
- if (iterations++ > _alignment_faulure_threshold){
+ if (iterations++ > _alignment_failure_threshold){
UHD_MSG(error) << boost::format(
"The receive packet handler failed to time-align packets.\n"
"%u received packets were processed by the handler.\n"
@@ -657,7 +712,9 @@ private:
_convert_bytes_to_copy = bytes_to_copy;
//perform N channels of conversion
- converter_thread_task(0);
+ for (size_t i = 0; i < this->size(); i++) {
+ convert_to_out_buff(i);
+ }
//update the copy buffer's availability
info.data_bytes_to_copy -= bytes_to_copy;
@@ -670,15 +727,15 @@ private:
return nsamps_to_copy_per_io_buff;
}
- /*******************************************************************
- * Perform one thread's work of the conversion task.
- * The entry and exit use a dual synchronization barrier,
- * to wait for data to become ready and block until completion.
- ******************************************************************/
- UHD_INLINE void converter_thread_task(const size_t index)
+ /*! Run the conversion from the internal buffers to the user's output
+ * buffer.
+ *
+ * - Calls the converter
+ * - Releases internal data buffers
+ * - Updates read/write pointers
+ */
+ inline void convert_to_out_buff(const size_t index)
{
- _task_barrier.wait();
-
//shortcut references to local data structures
buffers_info_type &buff_info = get_curr_buffer_info();
per_buffer_info_type &info = buff_info[index];
@@ -702,13 +759,9 @@ private:
if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
info.buff.reset(); //effectively a release
}
-
- if (index == 0) _task_barrier.wait_others();
}
//! Shared variables for the worker threads
- reusable_barrier _task_barrier;
- std::vector<task::sptr> _task_handlers;
size_t _convert_nsamps;
const rx_streamer::buffs_type *_convert_buffs;
size_t _convert_buffer_offset_bytes;
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index c2810842e..5e81fb442 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -24,11 +24,14 @@
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#ifdef DEVICE3_STREAMER
+# include "../rfnoc/tx_stream_terminator.hpp"
+#endif
+#include <boost/thread/thread.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
@@ -74,22 +77,15 @@ public:
}
~send_packet_handler(void){
- _task_barrier.interrupt();
- _task_handlers.clear();
+ /* NOP */
}
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
- _task_handlers.clear();
_props.resize(size);
static const boost::uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
- _task_barrier.resize(size);
- _task_handlers.resize(size);
- for (size_t i = 1/*skip 0*/; i < size; i++){
- _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
- };
}
//! Get the channel width of this handler
@@ -109,6 +105,29 @@ public:
_props.at(xport_chan).sid = sid;
}
+ ///////// RFNOC ///////////////////
+ //! Get the stream ID for a specific channel (or zero if no SID)
+ boost::uint32_t get_xport_chan_sid(const size_t xport_chan) const {
+ if (_props.at(xport_chan).has_sid) {
+ return _props.at(xport_chan).sid;
+ } else {
+ return 0;
+ }
+ }
+
+ #ifdef DEVICE3_STREAMER
+ void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator)
+ {
+ _terminator = terminator;
+ }
+
+ uhd::rfnoc::tx_stream_terminator::sptr get_terminator()
+ {
+ return _terminator;
+ }
+ #endif
+ ///////// RFNOC ///////////////////
+
void set_enable_trailer(const bool enable)
{
_has_tlr = enable;
@@ -303,6 +322,10 @@ private:
bool _cached_metadata;
uhd::tx_metadata_t _metadata_cache;
+ #ifdef DEVICE3_STREAMER
+ uhd::rfnoc::tx_stream_terminator::sptr _terminator;
+ #endif
+
#ifdef UHD_TXRX_DEBUG_PRINTS
struct dbg_send_stat_t {
dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate):
@@ -377,21 +400,23 @@ private:
_convert_if_packet_info = &if_packet_info;
//perform N channels of conversion
- converter_thread_task(0);
+ for (size_t i = 0; i < this->size(); i++) {
+ convert_to_in_buff(i);
+ }
_next_packet_seq++; //increment sequence after commits
return nsamps_per_buff;
}
- /*******************************************************************
- * Perform one thread's work of the conversion task.
- * The entry and exit use a dual synchronization barrier,
- * to wait for data to become ready and block until completion.
- ******************************************************************/
- UHD_INLINE void converter_thread_task(const size_t index)
+ /*! Run the conversion from the internal buffers to the user's input
+ * buffer.
+ *
+ * - Calls the converter
+ * - Releases internal data buffers
+ * - Updates read/write pointers
+ */
+ UHD_INLINE void convert_to_in_buff(const size_t index)
{
- _task_barrier.wait();
-
//shortcut references to local data structures
managed_send_buffer::sptr &buff = _props[index].buff;
vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
@@ -419,13 +444,9 @@ private:
const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
buff->commit(num_vita_words32*sizeof(boost::uint32_t));
buff.reset(); //effectively a release
-
- if (index == 0) _task_barrier.wait_others();
}
//! Shared variables for the worker threads
- reusable_barrier _task_barrier;
- std::vector<task::sptr> _task_handlers;
size_t _convert_nsamps;
const tx_streamer::buffs_type *_convert_buffs;
size_t _convert_buffer_offset_bytes;
diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp
index ce8c306e4..b53b6f590 100644
--- a/host/lib/transport/usb_dummy_impl.cpp
+++ b/host/lib/transport/usb_dummy_impl.cpp
@@ -31,12 +31,20 @@ std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::u
return std::vector<usb_device_handle::sptr>(); //empty list
}
-usb_control::sptr usb_control::make(usb_device_handle::sptr, const size_t){
+usb_control::sptr usb_control::make(
+ usb_device_handle::sptr,
+ const int
+) {
throw uhd::not_implemented_error("no usb support -> usb_control::make not implemented");
}
usb_zero_copy::sptr usb_zero_copy::make(
- usb_device_handle::sptr, const size_t, const size_t, const size_t, const size_t, const device_addr_t &
+ usb_device_handle::sptr,
+ const int,
+ const unsigned char,
+ const int,
+ const unsigned char,
+ const device_addr_t &
){
throw uhd::not_implemented_error("no usb support -> usb_zero_copy::make not implemented");
}
diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp
new file mode 100644
index 000000000..e8b013abc
--- /dev/null
+++ b/host/lib/transport/zero_copy_recv_offload.cpp
@@ -0,0 +1,158 @@
+//
+// Copyright 2016 Ettus Research
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/zero_copy_recv_offload.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/safe_call.hpp>
+#include <boost/format.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/bind.hpp>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t;
+
+/***********************************************************************
+ * Zero copy offload transport:
+ * An intermediate transport that utilizes threading to free
+ * the main thread from any receive work.
+ **********************************************************************/
+class zero_copy_recv_offload_impl : public zero_copy_recv_offload {
+public:
+ typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr;
+
+ zero_copy_recv_offload_impl(zero_copy_if::sptr transport,
+ const double timeout) :
+ _transport(transport), _timeout(timeout),
+ _inbox(transport->get_num_recv_frames()),
+ _recv_done(false)
+ {
+ UHD_LOG << "Created threaded transport" << std::endl;
+
+ // Create the receive and send threads to offload
+ // the system calls onto other threads
+ _recv_thread = boost::thread(
+ boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this)
+ );
+ }
+
+ // Receive thread flags
+ void set_recv_done()
+ {
+ boost::lock_guard<boost::mutex> guard(_recv_mutex);
+ _recv_done = true;
+ }
+
+ bool is_recv_done()
+ {
+ boost::lock_guard<boost::mutex> guard(_recv_mutex);
+ return _recv_done;
+ }
+
+ ~zero_copy_recv_offload_impl()
+ {
+ // Signal the threads we're finished
+ set_recv_done();
+
+ // Wait for them to join
+ UHD_SAFE_CALL(
+ _recv_thread.join();
+ )
+ }
+
+ // The receive thread function is responsible for
+ // pulling pointers to managed receiver buffers quickly
+ void enqueue_recv()
+ {
+ while (not is_recv_done()) {
+ managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout);
+ if (not buff) continue;
+ _inbox.push_with_timed_wait(buff, _timeout);
+ }
+ }
+
+ /*******************************************************************
+ * Receive implementation:
+ * Pop the receive buffer pointer from the underlying transport
+ ******************************************************************/
+ managed_recv_buffer::sptr get_recv_buff(double timeout)
+ {
+ managed_recv_buffer::sptr ptr;
+ _inbox.pop_with_timed_wait(ptr, timeout);
+ return ptr;
+ }
+
+ size_t get_num_recv_frames() const
+ {
+ return _transport->get_num_recv_frames();
+ }
+
+ size_t get_recv_frame_size() const
+ {
+ return _transport->get_recv_frame_size();
+ }
+
+ /*******************************************************************
+ * Send implementation:
+ * Pass the send buffer pointer from the underlying transport
+ ******************************************************************/
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ return _transport->get_send_buff(timeout);
+ }
+
+ size_t get_num_send_frames() const
+ {
+ return _transport->get_num_send_frames();
+ }
+
+ size_t get_send_frame_size() const
+ {
+ return _transport->get_send_frame_size();
+ }
+
+private:
+ // The linked transport
+ zero_copy_if::sptr _transport;
+
+ const double _timeout;
+
+ // Shared buffers
+ bounded_buffer_t _inbox;
+
+ // Threading
+ bool _recv_done;
+ boost::thread _recv_thread;
+ boost::mutex _recv_mutex;
+};
+
+zero_copy_recv_offload::sptr zero_copy_recv_offload::make(
+ zero_copy_if::sptr transport,
+ const double timeout)
+{
+ zero_copy_recv_offload_impl::sptr zero_copy_recv_offload(
+ new zero_copy_recv_offload_impl(transport, timeout)
+ );
+
+ return zero_copy_recv_offload;
+}