aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rwxr-xr-xhost/lib/transport/gen_vrt_if_packet.py52
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp496
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp307
-rw-r--r--host/lib/transport/vrt_packet_handler.hpp66
-rw-r--r--host/lib/transport/zero_copy.cpp108
6 files changed, 375 insertions, 655 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 8765c6703..a66a58d32 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -77,5 +77,4 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy_asio.cpp
${CMAKE_CURRENT_SOURCE_DIR}/vrt_packet_handler.hpp
- ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy.cpp
)
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py
index dbe026ba3..427217eb6 100755
--- a/host/lib/transport/gen_vrt_if_packet.py
+++ b/host/lib/transport/gen_vrt_if_packet.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2010 Ettus Research LLC
+# Copyright 2010-2011 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -35,6 +35,7 @@ TMPL_TEXT = """
\#include <uhd/utils/byteswap.hpp>
\#include <boost/detail/endian.hpp>
\#include <stdexcept>
+\#include <vector>
//define the endian macros to convert integers
\#ifdef BOOST_BIG_ENDIAN
@@ -48,18 +49,28 @@ TMPL_TEXT = """
using namespace uhd;
using namespace uhd::transport;
-########################################################################
-#def gen_code($XE_MACRO, $suffix)
-########################################################################
+typedef size_t pred_type;
+typedef std::vector<pred_type> pred_table_type;
+#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff)
+
+static pred_table_type get_pred_unpack_table(void){
+ pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28)
+ for (size_t i = 0; i < table.size(); i++){
+ boost::uint32_t vrt_hdr_word = i << 20;
+ if(vrt_hdr_word & $hex(0x1 << 28)) table[i] |= $hex($sid_p);
+ if(vrt_hdr_word & $hex(0x1 << 27)) table[i] |= $hex($cid_p);
+ if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p);
+ if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p);
+ if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p);
+ }
+ return table;
+}
+
+static const pred_table_type pred_unpack_table(get_pred_unpack_table());
########################################################################
-## setup predicates
+#def gen_code($XE_MACRO, $suffix)
########################################################################
-#set $sid_p = 0b00001
-#set $cid_p = 0b00010
-#set $tsi_p = 0b00100
-#set $tsf_p = 0b01000
-#set $tlr_p = 0b10000
void vrt::if_hdr_pack_$(suffix)(
boost::uint32_t *packet_buff,
@@ -67,7 +78,7 @@ void vrt::if_hdr_pack_$(suffix)(
){
boost::uint32_t vrt_hdr_flags = 0;
- boost::uint8_t pred = 0;
+ pred_type pred = 0;
if (if_packet_info.has_sid) pred |= $hex($sid_p);
if (if_packet_info.has_cid) pred |= $hex($cid_p);
if (if_packet_info.has_tsi) pred |= $hex($tsi_p);
@@ -159,12 +170,7 @@ void vrt::if_hdr_unpack_$(suffix)(
//if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented
//if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented
- boost::uint8_t pred = 0;
- if(vrt_hdr_word & $hex(0x1 << 28)) pred |= $hex($sid_p);
- if(vrt_hdr_word & $hex(0x1 << 27)) pred |= $hex($cid_p);
- if(vrt_hdr_word & $hex(0x3 << 22)) pred |= $hex($tsi_p);
- if(vrt_hdr_word & $hex(0x3 << 20)) pred |= $hex($tsf_p);
- if(vrt_hdr_word & $hex(0x1 << 26)) pred |= $hex($tlr_p);
+ const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];
switch(pred){
#for $pred in range(2**5)
@@ -200,7 +206,7 @@ void vrt::if_hdr_unpack_$(suffix)(
if_packet_info.has_tsf = true;
if_packet_info.tsf = boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 32;
#set $num_header_words += 1
- if_packet_info.tsf |= boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 0;
+ if_packet_info.tsf |= $(XE_MACRO)(packet_buff[$num_header_words]);
#set $num_header_words += 1
#else
if_packet_info.has_tsf = false;
@@ -239,4 +245,12 @@ def parse_tmpl(_tmpl_text, **kwargs):
if __name__ == '__main__':
import sys
- open(sys.argv[1], 'w').write(parse_tmpl(TMPL_TEXT, file=__file__))
+ open(sys.argv[1], 'w').write(parse_tmpl(
+ TMPL_TEXT,
+ file=__file__,
+ sid_p = 0b00001,
+ cid_p = 0b00010,
+ tsi_p = 0b00100,
+ tsf_p = 0b01000,
+ tlr_p = 0b10000,
+ ))
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 311a8953b..87adece45 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -21,266 +21,94 @@
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
+#include <boost/function.hpp>
#include <boost/foreach.hpp>
-#include <boost/thread.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <vector>
+#include <boost/thread/thread.hpp>
+#include <list>
#include <iostream>
using namespace uhd;
using namespace uhd::transport;
-static const double CLEANUP_TIMEOUT = 0.2; //seconds
static const size_t DEFAULT_NUM_XFERS = 16; //num xfers
static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
/***********************************************************************
- * Helper functions
- ***********************************************************************/
-/*
- * Print the values of a libusb_transfer struct
- * http://libusb.sourceforge.net/api-1.0/structlibusb__transfer.html
- */
-void pp_transfer(libusb_transfer *lut)
-{
- std::cout << "Libusb transfer" << std::endl;
- std::cout << " flags: 0x" << std::hex << (unsigned int) lut->flags << std::endl;
- std::cout << " endpoint: 0x" << std::hex << (unsigned int) lut->endpoint << std::endl;
- std::cout << " type: 0x" << std::hex << (unsigned int) lut->type << std::endl;
- std::cout << " timeout: " << std::dec << lut->timeout << std::endl;
- std::cout << " status: 0x" << std::hex << lut->status << std::endl;
- std::cout << " length: " << std::dec << lut->length << std::endl;
- std::cout << " actual_length: " << std::dec << lut->actual_length << std::endl;
-}
-
-/***********************************************************************
- * USB asynchronous zero_copy endpoint
- * This endpoint implementation provides asynchronous I/O to libusb-1.0
- * devices. Each endpoint is directional and two can be combined to
- * create a bidirectional interface. It is a zero copy implementation
- * with respect to libusb, however, each send and recv requires a copy
- * operation from kernel to userspace; this is due to the usbfs
- * interface provided by the kernel.
+ * Reusable managed receiver buffer:
+ * - Associated with a particular libusb transfer struct.
+ * - Submits the transfer to libusb in the release method.
**********************************************************************/
-class usb_endpoint {
+class libusb_zero_copy_mrb : public managed_recv_buffer{
public:
- typedef boost::shared_ptr<usb_endpoint> sptr;
-
- usb_endpoint(
- libusb::device_handle::sptr handle,
- int endpoint,
- bool input,
- size_t transfer_size,
- size_t num_transfers
- );
-
- ~usb_endpoint(void);
+ libusb_zero_copy_mrb(libusb_transfer *lut):
+ _lut(lut), _expired(true) { /* NOP */ }
- // Exposed interface for submitting / retrieving transfer buffers
-
- //! Submit a new transfer that was presumably just filled or emptied.
- void submit(libusb_transfer *lut);
-
- /*!
- * Get an available transfer:
- * For inputs, this is a just filled transfer.
- * For outputs, this is a just emptied transfer.
- * \param timeout the timeout to wait for a lut
- * \return the transfer pointer or NULL if timeout
- */
- libusb_transfer *get_lut_with_wait(double timeout);
+ void release(void){
+ if (_expired) return;
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ _expired = true;
+ }
- //Callback use only
- void callback_handle_transfer(libusb_transfer *lut);
+ sptr get_new(void){
+ _expired = false;
+ return sptr(this, &libusb_zero_copy_mrb::fake_deleter);
+ }
private:
- libusb::device_handle::sptr _handle;
- int _endpoint;
- bool _input;
-
- //! hold a bounded buffer of completed transfers
- typedef bounded_buffer<libusb_transfer *> lut_buff_type;
- lut_buff_type::sptr _completed_list;
-
- //! a list of all transfer structs we allocated
- std::vector<libusb_transfer *> _all_luts;
+ static void fake_deleter(void *obj){
+ static_cast<libusb_zero_copy_mrb *>(obj)->release();
+ }
- //! memory allocated for the transfer buffers
- buffer_pool::sptr _buffer_pool;
+ const void *get_buff(void) const{return _lut->buffer;}
+ size_t get_size(void) const{return _lut->actual_length;}
- // Calls for processing asynchronous I/O
- libusb_transfer *allocate_transfer(void *mem, size_t len);
- void print_transfer_status(libusb_transfer *lut);
+ libusb_transfer *_lut;
+ bool _expired;
};
-
-/*
- * Callback function called when submitted transfers complete.
- * The endpoint upon which the transfer is part of is recovered
- * and the transfer moved from pending to completed state.
- * Callbacks occur during the reaping calls where libusb_handle_events()
- * is used. The callback only modifies the transfer state by moving
- * it from the pending to completed status list.
- * \param lut pointer to libusb_transfer
- */
-static void callback(libusb_transfer *lut){
- usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;
- endpoint->callback_handle_transfer(lut);
-}
-
-
-/*
- * Accessor call to allow list access from callback space
- * \param pointer to libusb_transfer
- */
-void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){
- _completed_list->push_with_wait(lut);
-}
-
-
-/*
- * Constructor
- * Allocate libusb transfers and mark as free. For IN endpoints,
- * submit the transfers so that they're ready to return when
- * data is available.
- */
-usb_endpoint::usb_endpoint(
- libusb::device_handle::sptr handle,
- int endpoint,
- bool input,
- size_t transfer_size,
- size_t num_transfers
-):
- _handle(handle),
- _endpoint(endpoint),
- _input(input)
-{
- _completed_list = lut_buff_type::make(num_transfers);
- _buffer_pool = buffer_pool::make(num_transfers, transfer_size);
- for (size_t i = 0; i < num_transfers; i++){
- _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size));
-
- //input luts are immediately submitted to be filled
- //output luts go into the completed list as free buffers
- if (_input) this->submit(_all_luts.back());
- else _completed_list->push_with_wait(_all_luts.back());
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - Associated with a particular libusb transfer struct.
+ * - Submits the transfer to libusb in the commit method.
+ **********************************************************************/
+class libusb_zero_copy_msb : public managed_send_buffer{
+public:
+ libusb_zero_copy_msb(libusb_transfer *lut):
+ _lut(lut), _expired(true) { /* NOP */ }
+
+ void commit(size_t len){
+ if (_expired) return;
+ _lut->length = len;
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ _expired = true;
}
-}
-
-/*
- * Destructor
- * Make sure all the memory is freed. Cancel any pending transfers.
- * When all completed transfers are moved to the free list, release
- * the transfers. Libusb will deallocate the data buffer held by
- * each transfer.
- */
-usb_endpoint::~usb_endpoint(void){
- //cancel all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
- libusb_cancel_transfer(lut);
+ sptr get_new(void){
+ _expired = false;
+ return sptr(this, &libusb_zero_copy_msb::fake_deleter);
}
- //collect canceled transfers (drain the queue)
- while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){};
-
- //free all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
- libusb_free_transfer(lut);
+private:
+ static void fake_deleter(void *obj){
+ static_cast<libusb_zero_copy_msb *>(obj)->commit(0);
}
-}
-
-
-/*
- * Allocate a libusb transfer
- * The allocated transfer - and buffer it contains - is repeatedly
- * submitted, reaped, and reused and should not be freed until shutdown.
- * \param mem a pointer to the buffer memory
- * \param len size of the individual buffer
- * \return pointer to an allocated libusb_transfer
- */
-libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){
- libusb_transfer *lut = libusb_alloc_transfer(0);
- UHD_ASSERT_THROW(lut != NULL);
-
- unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0));
- unsigned char *buff = reinterpret_cast<unsigned char *>(mem);
- libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);
-
- libusb_fill_bulk_transfer(lut, // transfer
- _handle->get(), // dev_handle
- endpoint, // endpoint
- buff, // buffer
- len, // length
- lut_callback, // callback
- this, // user_data
- 0); // timeout
- return lut;
-}
+ void *get_buff(void) const{return _lut->buffer;}
+ size_t get_size(void) const{return _lut->length;}
-/*
- * Asynchonous transfer submission
- * Submit a libusb transfer to libusb add pending status
- * \param lut pointer to libusb_transfer
- * \return true on success or false on error
- */
-void usb_endpoint::submit(libusb_transfer *lut){
- UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0);
-}
-
-/*
- * Print status errors of a completed transfer
- * \param lut pointer to an libusb_transfer
- */
-void usb_endpoint::print_transfer_status(libusb_transfer *lut){
- std::cout << "here " << lut->status << std::endl;
- switch (lut->status) {
- case LIBUSB_TRANSFER_COMPLETED:
- if (lut->actual_length < lut->length) {
- std::cerr << "USB: transfer completed with short write,"
- << " length = " << lut->length
- << " actual = " << lut->actual_length << std::endl;
- }
-
- if ((lut->actual_length < 0) || (lut->length < 0)) {
- std::cerr << "USB: transfer completed with invalid response"
- << std::endl;
- }
- break;
- case LIBUSB_TRANSFER_CANCELLED:
- break;
- case LIBUSB_TRANSFER_NO_DEVICE:
- std::cerr << "USB: device was disconnected" << std::endl;
- break;
- case LIBUSB_TRANSFER_OVERFLOW:
- std::cerr << "USB: device sent more data than requested" << std::endl;
- break;
- case LIBUSB_TRANSFER_TIMED_OUT:
- std::cerr << "USB: transfer timed out" << std::endl;
- break;
- case LIBUSB_TRANSFER_STALL:
- std::cerr << "USB: halt condition detected (stalled)" << std::endl;
- break;
- case LIBUSB_TRANSFER_ERROR:
- std::cerr << "USB: transfer failed" << std::endl;
- break;
- default:
- std::cerr << "USB: received unknown transfer status" << std::endl;
- }
-}
+ libusb_transfer *_lut;
+ bool _expired;
+};
-libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- libusb_transfer *lut;
- if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut;
- return NULL;
+//! helper function: handles all async callbacks
+static void libusb_async_cb(libusb_transfer *lut){
+ (*static_cast<boost::function<void()> *>(lut->user_data))();
}
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
-class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> {
+class libusb_zero_copy_impl : public usb_zero_copy{
public:
libusb_zero_copy_impl(
@@ -288,16 +116,107 @@ public:
size_t recv_endpoint,
size_t send_endpoint,
const device_addr_t &hints
- );
+ ):
+ _handle(handle),
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _pending_recv_buffs(_num_recv_frames),
+ _pending_send_buffs(_num_send_frames)
+ {
+ _handle->claim_interface(2 /*in interface*/);
+ _handle->claim_interface(1 /*out interface*/);
+
+ //allocate libusb transfer structs and managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+
+ libusb_transfer *lut = libusb_alloc_transfer(0);
+ UHD_ASSERT_THROW(lut != NULL);
+
+ _mrb_pool.push_back(libusb_zero_copy_mrb(lut));
+ _callbacks.push_back(boost::bind(
+ &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back()
+ ));
+
+ libusb_fill_bulk_transfer(
+ lut, // transfer
+ _handle->get(), // dev_handle
+ (recv_endpoint & 0x7f) | 0x80, // endpoint
+ static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer
+ this->get_recv_frame_size(), // length
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void *>(&_callbacks.back()), // user_data
+ 0 // timeout
+ );
+
+ _all_luts.push_back(lut);
+ _mrb_pool.back().get_new();
+ }
+
+ //allocate libusb transfer structs and managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+
+ libusb_transfer *lut = libusb_alloc_transfer(0);
+ UHD_ASSERT_THROW(lut != NULL);
+
+ _msb_pool.push_back(libusb_zero_copy_msb(lut));
+ _callbacks.push_back(boost::bind(
+ &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back()
+ ));
+
+ libusb_fill_bulk_transfer(
+ lut, // transfer
+ _handle->get(), // dev_handle
+ (send_endpoint & 0x7f) | 0x00, // endpoint
+ static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer
+ this->get_send_frame_size(), // length
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void *>(&_callbacks.back()), // user_data
+ 0 // timeout
+ );
+
+ _all_luts.push_back(lut);
+ libusb_async_cb(lut);
+ }
+
+ //spawn the event handler threads
+ size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
+ for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
+ boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
+ );
+ }
~libusb_zero_copy_impl(void){
+ //shutdown the threads
_threads_running = false;
_thread_group.interrupt_all();
_thread_group.join_all();
+
+ //cancel and free all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_cancel_transfer(lut);
+ libusb_free_transfer(lut);
+ }
}
- managed_recv_buffer::sptr get_recv_buff(double);
- managed_send_buffer::sptr get_send_buff(double);
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ libusb_zero_copy_mrb *mrb = NULL;
+ if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
+ return mrb->get_new();
+ }
+ return managed_recv_buffer::sptr();
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ libusb_zero_copy_msb *msb = NULL;
+ if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
+ return msb->get_new();
+ }
+ return managed_send_buffer::sptr();
+ }
size_t get_num_recv_frames(void) const { return _num_recv_frames; }
size_t get_num_send_frames(void) const { return _num_send_frames; }
@@ -306,125 +225,50 @@ public:
size_t get_send_frame_size(void) const { return _send_frame_size; }
private:
- void release(libusb_transfer *lut){
- _recv_ep->submit(lut);
+ //! Handle a bound async callback for recv
+ void handle_recv(libusb_zero_copy_mrb *mrb){
+ _pending_recv_buffs.push_with_haste(mrb);
}
- void commit(libusb_transfer *lut, size_t num_bytes){
- lut->length = num_bytes;
- try{
- _send_ep->submit(lut);
- }
- catch(const std::exception &e){
- std::cerr << "Error in commit: " << e.what() << std::endl;
- }
+ //! Handle a bound async callback for send
+ void handle_send(libusb_zero_copy_msb *msb){
+ _pending_send_buffs.push_with_haste(msb);
}
libusb::device_handle::sptr _handle;
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
- usb_endpoint::sptr _recv_ep, _send_ep;
- //event handler threads
+ //! Storage for transfer related objects
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs;
+ bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs;
+ std::list<libusb_zero_copy_mrb> _mrb_pool;
+ std::list<libusb_zero_copy_msb> _msb_pool;
+ std::list<boost::function<void()> > _callbacks;
+
+ //! a list of all transfer structs we allocated
+ std::list<libusb_transfer *> _all_luts;
+
+ //! event handler threads
boost::thread_group _thread_group;
bool _threads_running;
void run_event_loop(void){
set_thread_priority_safe();
- libusb::session::sptr session = libusb::session::get_global_session();
+ libusb_context *context = libusb::session::get_global_session()->get_context();
_threads_running = true;
try{
while(_threads_running){
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100000; //100ms
- libusb_handle_events_timeout(session->get_context(), &tv);
+ libusb_handle_events_timeout(context, &tv);
}
} catch(const boost::thread_interrupted &){}
}
-};
-
-/*
- * Constructor
- * Initializes libusb, opens devices, and sets up interfaces for I/O.
- * Finally, creates endpoints for asynchronous I/O.
- */
-libusb_zero_copy_impl::libusb_zero_copy_impl(
- libusb::device_handle::sptr handle,
- size_t recv_endpoint,
- size_t send_endpoint,
- const device_addr_t &hints
-):
- _handle(handle),
- _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
- _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)))
-{
- _handle->claim_interface(2 /*in interface*/);
- _handle->claim_interface(1 /*out interface*/);
-
- _recv_ep = usb_endpoint::sptr(new usb_endpoint(
- _handle, // libusb device_handle
- recv_endpoint, // USB endpoint number
- true, // IN endpoint
- this->get_recv_frame_size(), // buffer size per transfer
- this->get_num_recv_frames() // number of libusb transfers
- ));
-
- _send_ep = usb_endpoint::sptr(new usb_endpoint(
- _handle, // libusb device_handle
- send_endpoint, // USB endpoint number
- false, // OUT endpoint
- this->get_send_frame_size(), // buffer size per transfer
- this->get_num_send_frames() // number of libusb transfers
- ));
-
- //spawn the event handler threads
- size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
- for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
- boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
- );
-}
-
-/*
- * Construct a managed receive buffer from a completed libusb transfer
- * (happy with buffer full of data) obtained from the receive endpoint.
- * Return empty pointer if no transfer is available (timeout or error).
- * \return pointer to a managed receive buffer
- */
-managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){
- libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout);
- if (lut == NULL) {
- return managed_recv_buffer::sptr();
- }
- else {
- return managed_recv_buffer::make_safe(
- boost::asio::const_buffer(lut->buffer, lut->actual_length),
- boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut)
- );
- }
-}
-
-/*
- * Construct a managed send buffer from a free libusb transfer (with
- * empty buffer). Return empty pointer of no transfer is available
- * (timeout or error).
- * \return pointer to a managed send buffer
- */
-managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){
- libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout);
- if (lut == NULL) {
- return managed_send_buffer::sptr();
- }
- else {
- return managed_send_buffer::make_safe(
- boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()),
- boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1)
- );
- }
-}
+};
/***********************************************************************
* USB zero_copy make functions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index a80de7b87..05352ffce 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -19,53 +19,102 @@
#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
-#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
#include <uhd/utils/warning.hpp>
#include <boost/asio.hpp>
#include <boost/format.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/enable_shared_from_this.hpp>
#include <iostream>
+#include <list>
using namespace uhd;
using namespace uhd::transport;
namespace asio = boost::asio;
-//Define this to the the boost async io calls to perform receive.
-//Otherwise, get_recv_buff uses a blocking receive with timeout.
-#define USE_ASIO_ASYNC_RECV
-
-//Define this to the the boost async io calls to perform send.
-//Otherwise, the commit callback uses a blocking send.
-//#define USE_ASIO_ASYNC_SEND
-
-//The asio async receive implementation is broken for some macos.
-//Just disable for all macos since we don't know the problem.
-#if defined(UHD_PLATFORM_MACOS) && defined(USE_ASIO_ASYNC_RECV)
- #undef USE_ASIO_ASYNC_RECV
-#endif
-
-//The number of service threads to spawn for async ASIO:
-//A single concurrent thread for io_service seems to be the fastest.
-//Threads are disabled when no async implementations are enabled.
-#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND)
-static const size_t CONCURRENCY_HINT = 1;
-#else
-static const size_t CONCURRENCY_HINT = 0;
-#endif
-
//A reasonable number of frames for send/recv and async/sync
static const size_t DEFAULT_NUM_FRAMES = 32;
/***********************************************************************
+ * Reusable managed receiver buffer:
+ * - Initialize with memory and a release callback.
+ * - Call get new with a length in bytes to re-use.
+ **********************************************************************/
+class udp_zero_copy_asio_mrb : public managed_recv_buffer{
+public:
+ typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type;
+
+ udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb):
+ _mem(mem), _len(0), _release_cb(release_cb){/* NOP */}
+
+ void release(void){
+ if (_len == 0) return;
+ this->_release_cb(this);
+ _len = 0;
+ }
+
+ sptr get_new(size_t len){
+ _len = len;
+ return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter);
+ }
+
+ template <class T> T cast(void) const{return static_cast<T>(_mem);}
+
+private:
+ static void fake_deleter(void *obj){
+ static_cast<udp_zero_copy_asio_mrb *>(obj)->release();
+ }
+
+ const void *get_buff(void) const{return _mem;}
+ size_t get_size(void) const{return _len;}
+
+ void *_mem;
+ size_t _len;
+ release_cb_type _release_cb;
+};
+
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - Initialize with memory and a commit callback.
+ * - Call get new with a length in bytes to re-use.
+ **********************************************************************/
+class udp_zero_copy_asio_msb : public managed_send_buffer{
+public:
+ typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type;
+
+ udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb):
+ _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */}
+
+ void commit(size_t len){
+ if (_len == 0) return;
+ this->_commit_cb(this, len);
+ _len = 0;
+ }
+
+ sptr get_new(size_t len){
+ _len = len;
+ return sptr(this, &udp_zero_copy_asio_msb::fake_deleter);
+ }
+
+private:
+ static void fake_deleter(void *obj){
+ static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0);
+ }
+
+ void *get_buff(void) const{return _mem;}
+ size_t get_size(void) const{return _len;}
+
+ void *_mem;
+ size_t _len;
+ commit_cb_type _commit_cb;
+};
+
+/***********************************************************************
* Zero Copy UDP implementation with ASIO:
* This is the portable zero copy implementation for systems
* where a faster, platform specific solution is not available.
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
-class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> {
+class udp_zero_copy_asio_impl : public udp_zero_copy{
public:
typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
@@ -78,8 +127,10 @@ public:
_num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
_send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
_num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
- _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
- _io_service(_concurrency_hint)
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _pending_recv_buffs(_num_recv_frames),
+ _pending_send_buffs(_num_send_frames)
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -93,39 +144,26 @@ public:
_socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
_sock_fd = _socket->native();
- }
-
- ~udp_zero_copy_asio_impl(void){
- delete _work; //allow io_service run to complete
- _thread_group.join_all(); //wait for service threads to exit
- delete _socket;
- }
- void init(void){
- //allocate all recv frames and release them to begin xfers
- _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames);
- _recv_buffer_pool = buffer_pool::make(_num_recv_frames, _recv_frame_size);
- for (size_t i = 0; i < _num_recv_frames; i++){
- release(_recv_buffer_pool->at(i));
+ //allocate re-usable managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+ _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),
+ boost::bind(&udp_zero_copy_asio_impl::release, this, _1))
+ );
+ handle_recv(&_mrb_pool.back());
}
- //allocate all send frames and push them into the fifo
- _pending_send_buffs = pending_buffs_type::make(_num_send_frames);
- _send_buffer_pool = buffer_pool::make(_num_send_frames, _send_frame_size);
- for (size_t i = 0; i < _num_send_frames; i++){
- handle_send(_send_buffer_pool->at(i));
+ //allocate re-usable managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+ _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i),
+ boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2))
+ );
+ handle_send(&_msb_pool.back());
}
-
- //spawn the service threads that will run the io service
- _work = new asio::io_service::work(_io_service); //new work to delete later
- for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread(
- boost::bind(&udp_zero_copy_asio_impl::service, this)
- );
}
- void service(void){
- set_thread_priority_safe();
- _io_service.run();
+ ~udp_zero_copy_asio_impl(void){
+ delete _socket;
}
//get size for internal socket buffer
@@ -142,50 +180,15 @@ public:
return get_buff_size<Opt>();
}
- //! handle a recv callback -> push the filled memory into the fifo
- UHD_INLINE void handle_recv(void *mem, size_t len){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
- }
-
- ////////////////////////////////////////////////////////////////////
- #ifdef USE_ASIO_ASYNC_RECV
- ////////////////////////////////////////////////////////////////////
- //! pop a filled recv buffer off of the fifo and bind with the release callback
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- asio::mutable_buffer buff;
- if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){
- return managed_recv_buffer::make_safe(
- buff, boost::bind(
- &udp_zero_copy_asio_impl::release,
- shared_from_this(),
- asio::buffer_cast<void*>(buff)
- )
- );
- }
- return managed_recv_buffer::sptr();
- }
-
- //! release a recv buffer -> start an async recv on the buffer
- void release(void *mem){
- _socket->async_receive(
- boost::asio::buffer(mem, this->get_recv_frame_size()),
- boost::bind(
- &udp_zero_copy_asio_impl::handle_recv,
- shared_from_this(), mem,
- asio::placeholders::bytes_transferred
- )
- );
- }
-
- ////////////////////////////////////////////////////////////////////
- #else /*USE_ASIO_ASYNC_RECV*/
- ////////////////////////////////////////////////////////////////////
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- asio::mutable_buffer buff;
-
+ /*******************************************************************
+ * Receive implementation:
+ *
+ * Use select to perform a blocking receive with timeout.
+ * Return the managed receive buffer with the new length.
+ * When the caller is finished with the managed buffer,
+ * the managed receive buffer is released back into the queue.
+ ******************************************************************/
+ UHD_INLINE bool is_recv_ready(double timeout){
//setup timeval for timeout
timeval tv;
tv.tv_sec = 0;
@@ -196,104 +199,70 @@ public:
FD_ZERO(&rset);
FD_SET(_sock_fd, &rset);
- //call select to perform timed wait and grab an available buffer with wait
- //if the condition is true, call receive and return the managed buffer
- if (
- ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and
- _pending_recv_buffs->pop_with_timed_wait(buff, timeout)
- ){
- return managed_recv_buffer::make_safe(
- asio::buffer(
- boost::asio::buffer_cast<void *>(buff),
- _socket->receive(asio::buffer(buff))
- ),
- boost::bind(
- &udp_zero_copy_asio_impl::release,
- shared_from_this(),
- asio::buffer_cast<void*>(buff)
- )
- );
+ //call select with timeout on receive socket
+ return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0;
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ udp_zero_copy_asio_mrb *mrb = NULL;
+ if (is_recv_ready(timeout) and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
+ return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0));
}
return managed_recv_buffer::sptr();
}
- void release(void *mem){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- handle_recv(mem, this->get_recv_frame_size());
+ UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){
+ _pending_recv_buffs.push_with_haste(mrb);
}
- ////////////////////////////////////////////////////////////////////
- #endif /*USE_ASIO_ASYNC_RECV*/
- ////////////////////////////////////////////////////////////////////
+ void release(udp_zero_copy_asio_mrb *mrb){
+ handle_recv(mrb);
+ }
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
- //! handle a send callback -> push the emptied memory into the fifo
- UHD_INLINE void handle_send(void *mem){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size()));
- }
-
- //! pop an empty send buffer off of the fifo and bind with the commit callback
+ /*******************************************************************
+ * Send implementation:
+ *
+ * Get a managed receive buffer immediately with max length set.
+ * The caller will fill the buffer and commit it when finished.
+ * The commit routine will perform a blocking send operation,
+ * and push the managed send buffer back into the queue.
+ ******************************************************************/
managed_send_buffer::sptr get_send_buff(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- asio::mutable_buffer buff;
- if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){
- return managed_send_buffer::make_safe(
- buff, boost::bind(
- &udp_zero_copy_asio_impl::commit,
- shared_from_this(),
- asio::buffer_cast<void*>(buff), _1
- )
- );
+ udp_zero_copy_asio_msb *msb = NULL;
+ if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
+ return msb->get_new(_send_frame_size);
}
return managed_send_buffer::sptr();
}
- ////////////////////////////////////////////////////////////////////
- #ifdef USE_ASIO_ASYNC_SEND
- ////////////////////////////////////////////////////////////////////
- //! commit a send buffer -> start an async send on the buffer
- void commit(void *mem, size_t len){
- _socket->async_send(
- boost::asio::buffer(mem, len),
- boost::bind(
- &udp_zero_copy_asio_impl::handle_send,
- shared_from_this(), mem
- )
- );
+ UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){
+ _pending_send_buffs.push_with_haste(msb);
}
- ////////////////////////////////////////////////////////////////////
- #else /*USE_ASIO_ASYNC_SEND*/
- ////////////////////////////////////////////////////////////////////
- void commit(void *mem, size_t len){
- _socket->send(asio::buffer(mem, len));
- handle_send(mem);
+ void commit(udp_zero_copy_asio_msb *msb, size_t len){
+ ::send(_sock_fd, msb->cast<const char *>(), len, 0);
+ handle_send(msb);
}
- ////////////////////////////////////////////////////////////////////
- #endif /*USE_ASIO_ASYNC_SEND*/
- ////////////////////////////////////////////////////////////////////
-
size_t get_num_send_frames(void) const {return _num_send_frames;}
size_t get_send_frame_size(void) const {return _send_frame_size;}
private:
//memory management -> buffers and fifos
- boost::thread_group _thread_group;
- buffer_pool::sptr _send_buffer_pool, _recv_buffer_pool;
- typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;
- pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;
+ bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs;
+ std::list<udp_zero_copy_asio_msb> _msb_pool;
+ std::list<udp_zero_copy_asio_mrb> _mrb_pool;
//asio guts -> socket and service
- size_t _concurrency_hint;
asio::io_service _io_service;
asio::ip::udp::socket *_socket;
- asio::io_service::work *_work;
int _sock_fd;
};
@@ -346,7 +315,5 @@ udp_zero_copy::sptr udp_zero_copy::make(
resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
- udp_trans->init(); //buffers resized -> call init() to use
-
return udp_trans;
}
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp
index c535edd04..6f3ac0421 100644
--- a/host/lib/transport/vrt_packet_handler.hpp
+++ b/host/lib/transport/vrt_packet_handler.hpp
@@ -67,13 +67,16 @@ template <typename T> UHD_INLINE T get_context_code(
std::vector<const boost::uint8_t *> copy_buffs;
size_t size_of_copy_buffs;
size_t fragment_offset_in_samps;
+ std::vector<void *> io_buffs;
+ std::vector<const void *> otw_buffs;
recv_state(size_t width = 1):
width(width),
managed_buffs(width),
copy_buffs(width, NULL),
size_of_copy_buffs(0),
- fragment_offset_in_samps(0)
+ fragment_offset_in_samps(0),
+ io_buffs(0) //resized later
{
/* NOP */
}
@@ -144,7 +147,7 @@ template <typename T> UHD_INLINE T get_context_code(
******************************************************************/
static UHD_INLINE size_t _recv1(
recv_state &state,
- const std::vector<void *> &buffs,
+ const uhd::device::recv_buffs_type &buffs,
size_t offset_bytes,
size_t total_samps,
uhd::rx_metadata_t &metadata,
@@ -192,17 +195,15 @@ template <typename T> UHD_INLINE T get_context_code(
size_t bytes_to_copy = nsamps_to_copy*bytes_per_item;
size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff;
- std::vector<void *> io_buffs(chans_per_otw_buff);
- for (size_t i = 0; i < state.width; i+=chans_per_otw_buff){
+ for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){
//fill a vector with pointers to the io buffers
for (size_t j = 0; j < chans_per_otw_buff; j++){
- io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes;
+ state.io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes;
}
//copy-convert the samples from the recv buffer
- uhd::convert::input_type otw_buffs(1, state.copy_buffs[i]);
- converter(otw_buffs, io_buffs, nsamps_to_copy_per_io_buff);
+ converter(state.copy_buffs[i], state.io_buffs, nsamps_to_copy_per_io_buff);
//update the rx copy buffer to reflect the bytes copied
state.copy_buffs[i] += bytes_to_copy;
@@ -223,7 +224,7 @@ template <typename T> UHD_INLINE T get_context_code(
******************************************************************/
static UHD_INLINE size_t recv(
recv_state &state,
- const std::vector<void *> &buffs,
+ const uhd::device::recv_buffs_type &buffs,
const size_t total_num_samps,
uhd::rx_metadata_t &metadata,
uhd::device::recv_mode_t recv_mode,
@@ -236,6 +237,8 @@ template <typename T> UHD_INLINE T get_context_code(
size_t vrt_header_offset_words32 = 0,
size_t chans_per_otw_buff = 1
){
+ state.io_buffs.resize(chans_per_otw_buff);
+
uhd::convert::function_type converter(
uhd::convert::get_converter_otw_to_cpu(
io_type, otw_type, 1, chans_per_otw_buff
@@ -300,8 +303,18 @@ template <typename T> UHD_INLINE T get_context_code(
struct send_state{
//init the expected seq number
size_t next_packet_seq;
+ managed_send_buffs_t managed_buffs;
+ const boost::uint64_t zeros;
+ std::vector<const void *> zero_buffs;
+ std::vector<const void *> io_buffs;
- send_state(void) : next_packet_seq(0){
+ send_state(size_t width = 1):
+ next_packet_seq(0),
+ managed_buffs(width),
+ zeros(0),
+ zero_buffs(width, &zeros),
+ io_buffs(0) //resized later
+ {
/* NOP */
}
};
@@ -312,7 +325,7 @@ template <typename T> UHD_INLINE T get_context_code(
******************************************************************/
static UHD_INLINE size_t _send1(
send_state &state,
- const std::vector<const void *> &buffs,
+ const uhd::device::send_buffs_type &buffs,
const size_t offset_bytes,
const size_t num_samps,
uhd::transport::vrt::if_packet_info_t &if_packet_info,
@@ -326,29 +339,26 @@ template <typename T> UHD_INLINE T get_context_code(
if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t);
if_packet_info.packet_count = state.next_packet_seq;
- //get send buffers for each channel
- managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff);
- if (not get_send_buffs(send_buffs)) return 0;
+ //get send buffers for each otw channel
+ if (not get_send_buffs(state.managed_buffs)) return 0;
- std::vector<const void *> io_buffs(chans_per_otw_buff);
for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){
//calculate pointers with offsets to io and otw memory
for (size_t j = 0; j < chans_per_otw_buff; j++){
- io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes;
+ state.io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes;
}
- boost::uint32_t *otw_mem = send_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32;
+ boost::uint32_t *otw_mem = state.managed_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32;
//pack metadata into a vrt header
vrt_packer(otw_mem, if_packet_info);
otw_mem += if_packet_info.num_header_words32;
//copy-convert the samples into the send buffer
- uhd::convert::output_type otw_buffs(1, otw_mem);
- converter(io_buffs, otw_buffs, num_samps);
+ converter(state.io_buffs, otw_mem, num_samps);
//commit the samples to the zero-copy interface
size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);
- send_buffs[i]->commit(num_bytes_total);
+ state.managed_buffs[i]->commit(num_bytes_total);
}
state.next_packet_seq++; //increment sequence after commits
return num_samps;
@@ -359,7 +369,7 @@ template <typename T> UHD_INLINE T get_context_code(
******************************************************************/
static UHD_INLINE size_t send(
send_state &state,
- const std::vector<const void *> &buffs,
+ const uhd::device::send_buffs_type &buffs,
const size_t total_num_samps,
const uhd::tx_metadata_t &metadata,
uhd::device::send_mode_t send_mode,
@@ -372,6 +382,8 @@ template <typename T> UHD_INLINE T get_context_code(
size_t vrt_header_offset_words32 = 0,
size_t chans_per_otw_buff = 1
){
+ state.io_buffs.resize(chans_per_otw_buff);
+
uhd::convert::function_type converter(
uhd::convert::get_converter_cpu_to_otw(
io_type, otw_type, chans_per_otw_buff, 1
@@ -398,19 +410,11 @@ template <typename T> UHD_INLINE T get_context_code(
if_packet_info.sob = metadata.start_of_burst;
if_packet_info.eob = metadata.end_of_burst;
- //TODO remove this code when sample counts of zero are supported by hardware
- std::vector<const void *> buffs_(buffs);
- size_t total_num_samps_(total_num_samps);
- if (total_num_samps == 0){
- static const boost::uint64_t zeros = 0; //max size of a host sample
- buffs_ = std::vector<const void *>(buffs.size(), &zeros);
- total_num_samps_ = 1;
- }
-
return _send1(
state,
- buffs_, 0,
- std::min(total_num_samps_, max_samples_per_packet),
+ //TODO remove this code when sample counts of zero are supported by hardware
+ (total_num_samps)?buffs : state.zero_buffs, 0,
+ std::max<size_t>(1, std::min(total_num_samps, max_samples_per_packet)),
if_packet_info,
converter,
vrt_packer,
diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp
deleted file mode 100644
index a5a864a04..000000000
--- a/host/lib/transport/zero_copy.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-//
-// Copyright 2010 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#include <uhd/transport/zero_copy.hpp>
-
-using namespace uhd::transport;
-
-/***********************************************************************
- * Safe managed receive buffer
- **********************************************************************/
-static void release_nop(void){
- /* NOP */
-}
-
-class safe_managed_receive_buffer : public managed_recv_buffer{
-public:
- safe_managed_receive_buffer(
- const boost::asio::const_buffer &buff,
- const release_fcn_t &release_fcn
- ):
- _buff(buff), _release_fcn(release_fcn)
- {
- /* NOP */
- }
-
- ~safe_managed_receive_buffer(void){
- _release_fcn();
- }
-
- void release(void){
- release_fcn_t release_fcn = _release_fcn;
- _release_fcn = &release_nop;
- return release_fcn();
- }
-
-private:
- const boost::asio::const_buffer &get(void) const{
- return _buff;
- }
-
- const boost::asio::const_buffer _buff;
- release_fcn_t _release_fcn;
-};
-
-managed_recv_buffer::sptr managed_recv_buffer::make_safe(
- const boost::asio::const_buffer &buff,
- const release_fcn_t &release_fcn
-){
- return sptr(new safe_managed_receive_buffer(buff, release_fcn));
-}
-
-/***********************************************************************
- * Safe managed send buffer
- **********************************************************************/
-static void commit_nop(size_t){
- /* NOP */
-}
-
-class safe_managed_send_buffer : public managed_send_buffer{
-public:
- safe_managed_send_buffer(
- const boost::asio::mutable_buffer &buff,
- const commit_fcn_t &commit_fcn
- ):
- _buff(buff), _commit_fcn(commit_fcn)
- {
- /* NOP */
- }
-
- ~safe_managed_send_buffer(void){
- _commit_fcn(0);
- }
-
- void commit(size_t num_bytes){
- commit_fcn_t commit_fcn = _commit_fcn;
- _commit_fcn = &commit_nop;
- return commit_fcn(num_bytes);
- }
-
-private:
- const boost::asio::mutable_buffer &get(void) const{
- return _buff;
- }
-
- const boost::asio::mutable_buffer _buff;
- commit_fcn_t _commit_fcn;
-};
-
-safe_managed_send_buffer::sptr managed_send_buffer::make_safe(
- const boost::asio::mutable_buffer &buff,
- const commit_fcn_t &commit_fcn
-){
- return sptr(new safe_managed_send_buffer(buff, commit_fcn));
-}