aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt7
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp66
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp95
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp112
-rw-r--r--host/lib/transport/udp_wsa_zero_copy.cpp300
-rw-r--r--host/lib/transport/udp_zero_copy.cpp153
-rw-r--r--host/lib/transport/usb_zero_copy_wrapper.cpp154
7 files changed, 673 insertions, 214 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 8e8ea5ea8..6524a8412 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -82,7 +82,11 @@ SET_SOURCE_FILES_PROPERTIES(
########################################################################
# Setup UDP
########################################################################
-LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp)
+IF(WIN32)
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp)
+ELSE()
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp)
+ENDIF()
#On windows, the boost asio implementation uses the winsock2 library.
#Note: we exclude the .lib extension for cygwin and mingw platforms.
@@ -97,6 +101,7 @@ CHECK_INCLUDE_FILE_CXX(atlbase.h HAVE_ATLBASE_H)
IF(HAVE_ATLBASE_H)
SET_SOURCE_FILES_PROPERTIES(
${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp
PROPERTIES COMPILE_DEFINITIONS "HAVE_ATLBASE_H"
)
ENDIF(HAVE_ATLBASE_H)
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 3e67264cd..28bff9709 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2011 Ettus Research LLC
+// Copyright 2010-2012 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -21,6 +21,7 @@
#include <uhd/utils/msg.hpp>
#include <uhd/exception.hpp>
#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
#include <boost/thread/thread.hpp>
#include <list>
@@ -61,8 +62,18 @@ static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){
* \return true for completion, false for timeout
*/
UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){
- const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
+ //already completed by a previous call?
+ if (completed) return true;
+
+ //perform a non-blocking event handle
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ libusb_handle_events_timeout(ctx, &tv);
+ if (completed) return true;
+ //finish the rest with a timeout loop
+ const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
while (not completed and (boost::get_system_time() < timeout_time)){
timeval tv;
tv.tv_sec = 0;
@@ -82,21 +93,18 @@ class libusb_zero_copy_mrb : public managed_recv_buffer{
public:
libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size):
_ctx(libusb::session::get_global_session()->get_context()),
- _lut(lut), _expired(false), _frame_size(frame_size) { /* NOP */ }
+ _lut(lut), _frame_size(frame_size) { /* NOP */ }
void release(void){
- if (_expired) return;
completed = false;
_lut->length = _frame_size; //always reset length
UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
- _expired = true;
}
sptr get_new(const double timeout, size_t &index){
if (wait_for_completion(_ctx, timeout, completed)){
index++;
- _expired = false;
- return make_managed_buffer(this);
+ return make(this, _lut->buffer, _lut->actual_length);
}
return managed_recv_buffer::sptr();
}
@@ -104,12 +112,8 @@ public:
bool completed;
private:
- const void *get_buff(void) const{return _lut->buffer;}
- size_t get_size(void) const{return _lut->actual_length;}
-
libusb_context *_ctx;
libusb_transfer *_lut;
- bool _expired;
const size_t _frame_size;
};
@@ -122,22 +126,18 @@ class libusb_zero_copy_msb : public managed_send_buffer{
public:
libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size):
_ctx(libusb::session::get_global_session()->get_context()),
- _lut(lut), _expired(false), _frame_size(frame_size) { /* NOP */ }
+ _lut(lut), _frame_size(frame_size) { completed = true; }
- void commit(size_t len){
- if (_expired) return;
+ void release(void){
completed = false;
- _lut->length = len;
- if (len == 0) libusb_async_cb(_lut);
- else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
- _expired = true;
+ _lut->length = size();
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
}
sptr get_new(const double timeout, size_t &index){
if (wait_for_completion(_ctx, timeout, completed)){
index++;
- _expired = false;
- return make_managed_buffer(this);
+ return make(this, _lut->buffer, _frame_size);
}
return managed_send_buffer::sptr();
}
@@ -145,12 +145,8 @@ public:
bool completed;
private:
- void *get_buff(void) const{return _lut->buffer;}
- size_t get_size(void) const{return _frame_size;}
-
libusb_context *_ctx;
libusb_transfer *_lut;
- bool _expired;
const size_t _frame_size;
};
@@ -181,13 +177,30 @@ public:
_handle->claim_interface(recv_interface);
_handle->claim_interface(send_interface);
+ //flush the buffers out of the recv endpoint
+ //limit the flushing to at most one second
+ for (size_t i = 0; i < 100; i++)
+ {
+ unsigned char buff[512];
+ int transfered = 0;
+ const int status = libusb_bulk_transfer(
+ _handle->get(), // dev_handle
+ (recv_endpoint & 0x7f) | 0x80, // endpoint
+ static_cast<unsigned char *>(buff),
+ sizeof(buff),
+ &transfered, //bytes xfered
+ 10 //timeout ms
+ );
+ if (status == LIBUSB_ERROR_TIMEOUT) break;
+ }
+
//allocate libusb transfer structs and managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _mrb_pool.push_back(boost::shared_ptr<libusb_zero_copy_mrb>(new libusb_zero_copy_mrb(lut, this->get_recv_frame_size())));
+ _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size()));
libusb_fill_bulk_transfer(
lut, // transfer
@@ -210,7 +223,7 @@ public:
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
- _msb_pool.push_back(boost::shared_ptr<libusb_zero_copy_msb>(new libusb_zero_copy_msb(lut, this->get_send_frame_size())));
+ _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size()));
libusb_fill_bulk_transfer(
lut, // transfer
@@ -224,7 +237,6 @@ public:
);
_all_luts.push_back(lut);
- _msb_pool.back()->commit(0);
}
}
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 205c7a3a3..7a1972690 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -23,6 +23,8 @@
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
@@ -31,6 +33,9 @@
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/barrier.hpp>
#include <iostream>
#include <vector>
@@ -73,12 +78,25 @@ public:
set_alignment_failure_threshold(1000);
}
+ ~recv_packet_handler(void){
+ _task_barrier_entry.interrupt();
+ _task_barrier_exit.interrupt();
+ _task_handlers.clear();
+ }
+
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
+ _task_handlers.clear();
_props.resize(size);
//re-initialize all buffers infos by re-creating the vector
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
+ _task_barrier_entry.resize(size);
+ _task_barrier_exit.resize(size);
+ _task_handlers.resize(size);
+ for (size_t i = 1/*skip 0*/; i < size; i++){
+ _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
+ };
}
//! Get the channel width of this handler
@@ -125,7 +143,7 @@ public:
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
- _io_buffs.resize(id.num_outputs);
+ _num_outputs = id.num_outputs;
_converter = uhd::convert::get_converter(id)();
this->set_scale_factor(1/32767.); //update after setting converter
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format);
@@ -207,7 +225,7 @@ private:
handle_overflow_type handle_overflow;
};
std::vector<xport_chan_props_type> _props;
- std::vector<void *> _io_buffs; //used in conversion
+ size_t _num_outputs;
size_t _bytes_per_otw_item; //used in conversion
size_t _bytes_per_cpu_item; //used in conversion
uhd::convert::converter::sptr _converter; //used in conversion
@@ -512,24 +530,19 @@ private:
//extract the number of samples available to copy
const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item;
- const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available);
+ const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);
const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item;
- const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size();
+ const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs;
- size_t buff_index = 0;
- BOOST_FOREACH(per_buffer_info_type &buff_info, info){
+ //setup the data to share with converter threads
+ _convert_nsamps = nsamps_to_copy_per_io_buff;
+ _convert_buffs = &buffs;
+ _convert_buffer_offset_bytes = buffer_offset_bytes;
+ _convert_bytes_to_copy = bytes_to_copy;
- //fill a vector with pointers to the io buffers
- BOOST_FOREACH(void *&io_buff, _io_buffs){
- io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes;
- }
+ //perform N channels of conversion
+ converter_thread_task(0);
- //copy-convert the samples from the recv buffer
- _converter->conv(buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff);
-
- //update the rx copy buffer to reflect the bytes copied
- buff_info.copy_buff += bytes_to_copy;
- }
//update the copy buffer's availability
info.data_bytes_to_copy -= bytes_to_copy;
@@ -538,15 +551,53 @@ private:
metadata.fragment_offset = info.fragment_offset_in_samps;
info.fragment_offset_in_samps += nsamps_to_copy; //set for next call
- //done with buffers? this action releases buffers in-order
- if (not metadata.more_fragments){
- BOOST_FOREACH(per_buffer_info_type &buff_info, info){
- buff_info.buff.reset(); //effectively a release
- }
+ return nsamps_to_copy_per_io_buff;
+ }
+
+ /*******************************************************************
+ * Perform one thread's work of the conversion task.
+ * The entry and exit use a dual synchronization barrier,
+ * to wait for data to become ready and block until completion.
+ ******************************************************************/
+ UHD_INLINE void converter_thread_task(const size_t index)
+ {
+ _task_barrier_entry.wait();
+
+ //shortcut references to local data structures
+ buffers_info_type &buff_info = get_curr_buffer_info();
+ per_buffer_info_type &info = buff_info[index];
+ const rx_streamer::buffs_type &buffs = *_convert_buffs;
+
+ //fill IO buffs with pointers into the output buffer
+ void *io_buffs[4/*max interleave*/];
+ for (size_t i = 0; i < _num_outputs; i++){
+ char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]);
+ io_buffs[i] = b + _convert_buffer_offset_bytes;
}
+ const ref_vector<void *> out_buffs(io_buffs, _num_outputs);
- return nsamps_to_copy_per_io_buff;
+ //perform the conversion operation
+ _converter->conv(info.copy_buff, out_buffs, _convert_nsamps);
+
+ //advance the pointer for the source buffer
+ info.copy_buff += _convert_bytes_to_copy;
+
+ //release the buffer if fully consumed
+ if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
+ info.buff.reset(); //effectively a release
+ }
+
+ _task_barrier_exit.wait();
}
+
+ //! Shared variables for the worker threads
+ reusable_barrier _task_barrier_entry, _task_barrier_exit;
+ std::vector<task::sptr> _task_handlers;
+ size_t _convert_nsamps;
+ const rx_streamer::buffs_type *_convert_buffs;
+ size_t _convert_buffer_offset_bytes;
+ size_t _convert_bytes_to_copy;
+
};
class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 46c98afea..74e893e67 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -23,6 +23,8 @@
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
@@ -58,12 +60,25 @@ public:
this->resize(size);
}
+ ~send_packet_handler(void){
+ _task_barrier_entry.interrupt();
+ _task_barrier_exit.interrupt();
+ _task_handlers.clear();
+ }
+
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
+ _task_handlers.clear();
_props.resize(size);
static const boost::uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
+ _task_barrier_entry.resize(size);
+ _task_barrier_exit.resize(size);
+ _task_handlers.resize(size);
+ for (size_t i = 1/*skip 0*/; i < size; i++){
+ _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
+ };
}
//! Get the channel width of this handler
@@ -77,6 +92,12 @@ public:
_header_offset_words32 = header_offset_words32;
}
+ //! Set the stream ID for a specific channel (or no SID)
+ void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){
+ _props.at(xport_chan).has_sid = has_sid;
+ _props.at(xport_chan).sid = sid;
+ }
+
//! Set the rate of ticks per second
void set_tick_rate(const double rate){
_tick_rate = rate;
@@ -98,7 +119,7 @@ public:
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
- _io_buffs.resize(id.num_inputs);
+ _num_inputs = id.num_inputs;
_converter = uhd::convert::get_converter(id)();
this->set_scale_factor(32767.); //update after setting converter
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format);
@@ -133,7 +154,7 @@ public:
//translate the metadata to vrt if packet info
vrt::if_packet_info_t if_packet_info;
if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
- if_packet_info.has_sid = false;
+ //if_packet_info.has_sid = false; //set per channel
if_packet_info.has_cid = false;
if_packet_info.has_tlr = true;
if_packet_info.has_tsi = false;
@@ -195,10 +216,14 @@ private:
size_t _header_offset_words32;
double _tick_rate, _samp_rate;
struct xport_chan_props_type{
+ xport_chan_props_type(void):has_sid(false){}
get_buff_type get_buff;
+ bool has_sid;
+ boost::uint32_t sid;
+ managed_send_buffer::sptr buff;
};
std::vector<xport_chan_props_type> _props;
- std::vector<const void *> _io_buffs; //used in conversion
+ size_t _num_inputs;
size_t _bytes_per_otw_item; //used in conversion
size_t _bytes_per_cpu_item; //used in conversion
uhd::convert::converter::sptr _converter; //used in conversion
@@ -217,36 +242,77 @@ private:
const size_t buffer_offset_bytes = 0
){
//load the rest of the if_packet_info in here
- if_packet_info.num_payload_bytes = nsamps_per_buff*_io_buffs.size()*_bytes_per_otw_item;
+ if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item;
if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t);
if_packet_info.packet_count = _next_packet_seq;
- size_t buff_index = 0;
+ //get a buffer for each channel or timeout
BOOST_FOREACH(xport_chan_props_type &props, _props){
- managed_send_buffer::sptr buff = props.get_buff(timeout);
- if (buff.get() == NULL) return 0; //timeout
-
- //fill a vector with pointers to the io buffers
- BOOST_FOREACH(const void *&io_buff, _io_buffs){
- io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes;
- }
- boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
-
- //pack metadata into a vrt header
- _vrt_packer(otw_mem, if_packet_info);
- otw_mem += if_packet_info.num_header_words32;
+ if (not props.buff) props.buff = props.get_buff(timeout);
+ if (not props.buff) return 0; //timeout
+ }
- //copy-convert the samples into the send buffer
- _converter->conv(_io_buffs, otw_mem, nsamps_per_buff);
+ //setup the data to share with converter threads
+ _convert_nsamps = nsamps_per_buff;
+ _convert_buffs = &buffs;
+ _convert_buffer_offset_bytes = buffer_offset_bytes;
+ _convert_if_packet_info = &if_packet_info;
- //commit the samples to the zero-copy interface
- size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);
- buff->commit(num_bytes_total);
+ //perform N channels of conversion
+ converter_thread_task(0);
- }
_next_packet_seq++; //increment sequence after commits
return nsamps_per_buff;
}
+
+ /*******************************************************************
+ * Perform one thread's work of the conversion task.
+ * The entry and exit use a dual synchronization barrier,
+ * to wait for data to become ready and block until completion.
+ ******************************************************************/
+ UHD_INLINE void converter_thread_task(const size_t index)
+ {
+ _task_barrier_entry.wait();
+
+ //shortcut references to local data structures
+ managed_send_buffer::sptr &buff = _props[index].buff;
+ vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
+ const tx_streamer::buffs_type &buffs = *_convert_buffs;
+
+ //fill IO buffs with pointers into the output buffer
+ const void *io_buffs[4/*max interleave*/];
+ for (size_t i = 0; i < _num_inputs; i++){
+ const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]);
+ io_buffs[i] = b + _convert_buffer_offset_bytes;
+ }
+ const ref_vector<const void *> in_buffs(io_buffs, _num_inputs);
+
+ //pack metadata into a vrt header
+ boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
+ if_packet_info.has_sid = _props[index].has_sid;
+ if_packet_info.sid = _props[index].sid;
+ _vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
+
+ //perform the conversion operation
+ _converter->conv(in_buffs, otw_mem, _convert_nsamps);
+
+ //commit the samples to the zero-copy interface
+ const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
+ buff->commit(num_vita_words32*sizeof(boost::uint32_t));
+ buff.reset(); //effectively a release
+
+ _task_barrier_exit.wait();
+ }
+
+ //! Shared variables for the worker threads
+ reusable_barrier _task_barrier_entry, _task_barrier_exit;
+ std::vector<task::sptr> _task_handlers;
+ size_t _convert_nsamps;
+ const tx_streamer::buffs_type *_convert_buffs;
+ size_t _convert_buffer_offset_bytes;
+ vrt::if_packet_info_t *_convert_if_packet_info;
+
};
class send_packet_streamer : public send_packet_handler, public tx_streamer{
diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp
new file mode 100644
index 000000000..6fe4e3cad
--- /dev/null
+++ b/host/lib/transport/udp_wsa_zero_copy.cpp
@@ -0,0 +1,300 @@
+//
+// Copyright 2010-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "udp_common.hpp"
+#include <uhd/transport/udp_zero_copy.hpp>
+#include <uhd/transport/udp_simple.hpp> //mtu
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+using namespace uhd;
+using namespace uhd::transport;
+namespace asio = boost::asio;
+
+//A reasonable number of frames for send/recv and async/sync
+static const size_t DEFAULT_NUM_FRAMES = 32;
+
+/***********************************************************************
+ * Check registry for correct fast-path setting (windows only)
+ **********************************************************************/
+#ifdef HAVE_ATLBASE_H
+#define CHECK_REG_SEND_THRESH
+#include <atlbase.h> //CRegKey
+static void check_registry_for_fast_send_threshold(const size_t mtu){
+ static bool warned = false;
+ if (warned) return; //only allow one printed warning per process
+
+ CRegKey reg_key;
+ DWORD threshold = 1024; //system default when threshold is not specified
+ if (
+ reg_key.Open(HKEY_LOCAL_MACHINE, "System\\CurrentControlSet\\Services\\AFD\\Parameters", KEY_READ) != ERROR_SUCCESS or
+ reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) != ERROR_SUCCESS or threshold < mtu
+ ){
+ UHD_MSG(warning) << boost::format(
+ "The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n"
+ "This will negatively affect the transmit performance.\n"
+ "See the transport application notes for more detail.\n"
+ ) % mtu % threshold << std::endl;
+ warned = true;
+ }
+ reg_key.Close();
+}
+#endif /*HAVE_ATLBASE_H*/
+
+/***********************************************************************
+ * Static initialization to take care of WSA init and cleanup
+ **********************************************************************/
+struct uhd_wsa_control{
+ uhd_wsa_control(void){
+ WSADATA wsaData;
+ WSAStartup(MAKEWORD(2, 2), &wsaData); /*windows socket startup */
+ }
+
+ ~uhd_wsa_control(void){
+ WSACleanup();
+ }
+};
+
+/***********************************************************************
+ * Reusable managed receiver buffer:
+ * - Initialize with memory and a release callback.
+ * - Call get new with a length in bytes to re-use.
+ **********************************************************************/
+class udp_zero_copy_asio_mrb : public managed_recv_buffer{
+public:
+ udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
+ _sock_fd(sock_fd), _frame_size(frame_size)
+ {
+ _wsa_buff.buf = reinterpret_cast<char *>(mem);
+ ZeroMemory(&_overlapped, sizeof(_overlapped));
+ _overlapped.hEvent = WSACreateEvent();
+ UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT);
+ this->release(); //makes buffer available via get_new
+ }
+
+ ~udp_zero_copy_asio_mrb(void){
+ WSACloseEvent(_overlapped.hEvent);
+ }
+
+ void release(void){
+ _wsa_buff.len = _frame_size;
+ _flags = 0;
+ WSARecv(_sock_fd, &_wsa_buff, 1, &_wsa_buff.len, &_flags, &_overlapped, NULL);
+ }
+
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ const DWORD result = WSAWaitForMultipleEvents(
+ 1, &_overlapped.hEvent, true, DWORD(timeout*1000), true
+ );
+ if (result == WSA_WAIT_TIMEOUT) return managed_recv_buffer::sptr();
+ index++; //advances the caller's buffer
+
+ WSAGetOverlappedResult(_sock_fd, &_overlapped, &_wsa_buff.len, true, &_flags);
+
+ WSAResetEvent(_overlapped.hEvent);
+ return make(this, _wsa_buff.buf, _wsa_buff.len);
+ }
+
+private:
+ int _sock_fd;
+ const size_t _frame_size;
+ WSAOVERLAPPED _overlapped;
+ WSABUF _wsa_buff;
+ DWORD _flags;
+};
+
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - committing the buffer calls the asynchronous socket send
+ * - getting a new buffer performs the blocking wait for completion
+ **********************************************************************/
+class udp_zero_copy_asio_msb : public managed_send_buffer{
+public:
+ udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
+ _sock_fd(sock_fd), _frame_size(frame_size)
+ {
+ _wsa_buff.buf = reinterpret_cast<char *>(mem);
+ ZeroMemory(&_overlapped, sizeof(_overlapped));
+ _overlapped.hEvent = WSACreateEvent();
+ UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT);
+ WSASetEvent(_overlapped.hEvent); //makes buffer available via get_new
+ }
+
+ ~udp_zero_copy_asio_msb(void){
+ WSACloseEvent(_overlapped.hEvent);
+ }
+
+ void release(void){
+ _wsa_buff.len = size();
+ WSASend(_sock_fd, &_wsa_buff, 1, NULL, 0, &_overlapped, NULL);
+ }
+
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ const DWORD result = WSAWaitForMultipleEvents(
+ 1, &_overlapped.hEvent, true, DWORD(timeout*1000), true
+ );
+ if (result == WSA_WAIT_TIMEOUT) return managed_send_buffer::sptr();
+ index++; //advances the caller's buffer
+
+ WSAResetEvent(_overlapped.hEvent);
+ _wsa_buff.len = _frame_size;
+ return make(this, _wsa_buff.buf, _wsa_buff.len);
+ }
+
+private:
+ int _sock_fd;
+ const size_t _frame_size;
+ WSAOVERLAPPED _overlapped;
+ WSABUF _wsa_buff;
+};
+
+/***********************************************************************
+ * Zero Copy UDP implementation with WSA:
+ *
+ * This is not a true zero copy implementation as each
+ * send and recv requires a copy operation to/from userspace.
+ *
+ * For receive, use a blocking recv() call on the socket.
+ * This has better performance than the overlapped IO.
+ * For send, use overlapped IO to submit async sends.
+ **********************************************************************/
+class udp_zero_copy_wsa_impl : public udp_zero_copy{
+public:
+ typedef boost::shared_ptr<udp_zero_copy_wsa_impl> sptr;
+
+ udp_zero_copy_wsa_impl(
+ const std::string &addr,
+ const std::string &port,
+ const device_addr_t &hints
+ ):
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _next_recv_buff_index(0), _next_send_buff_index(0)
+ {
+ #ifdef CHECK_REG_SEND_THRESH
+ check_registry_for_fast_send_threshold(this->get_send_frame_size());
+ #endif /*CHECK_REG_SEND_THRESH*/
+
+ UHD_MSG(status) << boost::format("Creating WSA UDP transport for %s:%s") % addr % port << std::endl;
+ static uhd_wsa_control uhd_wsa; //makes wsa start happen via lazy initialization
+
+ UHD_ASSERT_THROW(_num_send_frames <= WSA_MAXIMUM_WAIT_EVENTS);
+
+ //resolve the address
+ asio::io_service io_service;
+ asio::ip::udp::resolver resolver(io_service);
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
+ asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+
+ //create the socket
+ _sock_fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
+ if (_sock_fd == INVALID_SOCKET){
+ const DWORD error = WSAGetLastError();
+ throw uhd::os_error(str(boost::format("WSASocket() failed with error %d") % error));
+ }
+
+ //set the socket non-blocking for recv
+ //u_long mode = 1;
+ //ioctlsocket(_sock_fd, FIONBIO, &mode);
+
+ //resize the socket buffers
+ const int recv_buff_size = int(hints.cast<double>("recv_buff_size", 0.0));
+ const int send_buff_size = int(hints.cast<double>("send_buff_size", 0.0));
+ if (recv_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_RCVBUF, (const char *)&recv_buff_size, sizeof(recv_buff_size));
+ if (send_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_SNDBUF, (const char *)&send_buff_size, sizeof(send_buff_size));
+
+ //connect the socket so we can send/recv
+ const asio::ip::udp::endpoint::data_type &servaddr = *receiver_endpoint.data();
+ if (WSAConnect(_sock_fd, (const struct sockaddr *)&servaddr, sizeof(servaddr), NULL, NULL, NULL, NULL) != 0){
+ const DWORD error = WSAGetLastError();
+ closesocket(_sock_fd);
+ throw uhd::os_error(str(boost::format("WSAConnect() failed with error %d") % error));
+ }
+
+ //allocate re-usable managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+ _mrb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_mrb>(
+ new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size())
+ ));
+ }
+
+ //allocate re-usable managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+ _msb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_msb>(
+ new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), _sock_fd, get_send_frame_size())
+ ));
+ }
+ }
+
+ ~udp_zero_copy_wsa_impl(void){
+ closesocket(_sock_fd);
+ }
+
+ /*******************************************************************
+ * Receive implementation:
+ * Block on the managed buffer's get call and advance the index.
+ ******************************************************************/
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
+ }
+
+ size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+ size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+
+ /*******************************************************************
+ * Send implementation:
+ * Block on the managed buffer's get call and advance the index.
+ ******************************************************************/
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
+ }
+
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+ size_t get_send_frame_size(void) const {return _send_frame_size;}
+
+private:
+ //memory management -> buffers and fifos
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
+
+ //socket guts
+ SOCKET _sock_fd;
+};
+
+/***********************************************************************
+ * UDP zero copy make function
+ **********************************************************************/
+udp_zero_copy::sptr udp_zero_copy::make(
+ const std::string &addr,
+ const std::string &port,
+ const device_addr_t &hints
+){
+ return sptr(new udp_zero_copy_wsa_impl(addr, port, hints));
+}
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp
index 0ccc92b82..166177177 100644
--- a/host/lib/transport/udp_zero_copy.cpp
+++ b/host/lib/transport/udp_zero_copy.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2011 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -18,12 +18,14 @@
#include "udp_common.hpp"
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
-#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/atomic.hpp>
#include <boost/format.hpp>
-#include <list>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp> //sleep
+#include <vector>
using namespace uhd;
using namespace uhd::transport;
@@ -61,66 +63,84 @@ static void check_registry_for_fast_send_threshold(const size_t mtu){
/***********************************************************************
* Reusable managed receiver buffer:
- * - Initialize with memory and a release callback.
- * - Call get new with a length in bytes to re-use.
+ * - get_new performs the recv operation
**********************************************************************/
class udp_zero_copy_asio_mrb : public managed_recv_buffer{
public:
- udp_zero_copy_asio_mrb(void *mem, bounded_buffer<udp_zero_copy_asio_mrb *> &pending):
- _mem(mem), _len(0), _pending(pending){/* NOP */}
+ udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
void release(void){
- if (_len == 0) return;
- _pending.push_with_haste(this);
- _len = 0;
+ _claimer.release();
}
- sptr get_new(size_t len){
- _len = len;
- return make_managed_buffer(this);
- }
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ if (not _claimer.claim_with_wait(timeout)) return sptr();
- template <class T> T cast(void) const{return static_cast<T>(_mem);}
+ #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
+ _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT);
+ if (_len > 0){
+ index++; //advances the caller's buffer
+ return make(this, _mem, size_t(_len));
+ }
+ #endif
-private:
- const void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
+ if (wait_for_recv_ready(_sock_fd, timeout)){
+ _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0);
+ index++; //advances the caller's buffer
+ return make(this, _mem, size_t(_len));
+ }
+ _claimer.release(); //undo claim
+ return sptr(); //null for timeout
+ }
+
+private:
void *_mem;
- size_t _len;
- bounded_buffer<udp_zero_copy_asio_mrb *> &_pending;
+ int _sock_fd;
+ size_t _frame_size;
+ ssize_t _len;
+ simple_claimer _claimer;
};
/***********************************************************************
* Reusable managed send buffer:
- * - Initialize with memory and a commit callback.
- * - Call get new with a length in bytes to re-use.
+ * - commit performs the send operation
**********************************************************************/
class udp_zero_copy_asio_msb : public managed_send_buffer{
public:
- udp_zero_copy_asio_msb(void *mem, bounded_buffer<udp_zero_copy_asio_msb *> &pending, int sock_fd):
- _mem(mem), _len(0), _pending(pending), _sock_fd(sock_fd){/* NOP */}
-
- void commit(size_t len){
- if (_len == 0) return;
- ::send(_sock_fd, this->cast<const char *>(), len, 0);
- _pending.push_with_haste(this);
- _len = 0;
+ udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
+
+ void release(void){
+ //Retry logic because send may fail with ENOBUFS.
+ //This is known to occur at least on some OSX systems.
+ //But it should be safe to always check for the error.
+ while (true)
+ {
+ const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0);
+ if (ret == ssize_t(size())) break;
+ if (ret == -1 and errno == ENOBUFS)
+ {
+ boost::this_thread::sleep(boost::posix_time::microseconds(1));
+ continue; //try to send again
+ }
+ UHD_ASSERT_THROW(ret == ssize_t(size()));
+ }
+ _claimer.release();
}
- sptr get_new(size_t len){
- _len = len;
- return make_managed_buffer(this);
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ if (not _claimer.claim_with_wait(timeout)) return sptr();
+ index++; //advances the caller's buffer
+ return make(this, _mem, _frame_size);
}
private:
- void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
-
void *_mem;
- size_t _len;
- bounded_buffer<udp_zero_copy_asio_msb *> &_pending;
int _sock_fd;
+ size_t _frame_size;
+ simple_claimer _claimer;
};
/***********************************************************************
@@ -145,8 +165,7 @@ public:
_num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _pending_recv_buffs(_num_recv_frames),
- _pending_send_buffs(_num_send_frames)
+ _next_recv_buff_index(0), _next_send_buff_index(0)
{
UHD_LOG << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -167,18 +186,16 @@ public:
//allocate re-usable managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
- _mrb_pool.push_back(udp_zero_copy_asio_mrb(
- _recv_buffer_pool->at(i), _pending_recv_buffs
+ _mrb_pool.push_back(boost::make_shared<udp_zero_copy_asio_mrb>(
+ _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size()
));
- _pending_recv_buffs.push_with_haste(&_mrb_pool.back());
}
//allocate re-usable managed send buffers
for (size_t i = 0; i < get_num_send_frames(); i++){
- _msb_pool.push_back(udp_zero_copy_asio_msb(
- _send_buffer_pool->at(i), _pending_send_buffs, _sock_fd
+ _msb_pool.push_back(boost::make_shared<udp_zero_copy_asio_msb>(
+ _send_buffer_pool->at(i), _sock_fd, get_send_frame_size()
));
- _pending_send_buffs.push_with_haste(&_msb_pool.back());
}
}
@@ -198,29 +215,11 @@ public:
/*******************************************************************
* Receive implementation:
- *
- * Perform a non-blocking receive for performance,
- * and then fall back to a blocking receive with timeout.
- * Return the managed receive buffer with the new length.
- * When the caller is finished with the managed buffer,
- * the managed receive buffer is released back into the queue.
+ * Block on the managed buffer's get call and advance the index.
******************************************************************/
managed_recv_buffer::sptr get_recv_buff(double timeout){
- udp_zero_copy_asio_mrb *mrb = NULL;
- if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
-
- #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
- ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, MSG_DONTWAIT);
- if (ret > 0) return mrb->get_new(ret);
- #endif
-
- if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new(
- ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)
- );
-
- _pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue
- }
- return managed_recv_buffer::sptr();
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
}
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
@@ -228,18 +227,11 @@ public:
/*******************************************************************
* Send implementation:
- *
- * Get a managed receive buffer immediately with max length set.
- * The caller will fill the buffer and commit it when finished.
- * The commit routine will perform a blocking send operation,
- * and push the managed send buffer back into the queue.
+ * Block on the managed buffer's get call and advance the index.
******************************************************************/
managed_send_buffer::sptr get_send_buff(double timeout){
- udp_zero_copy_asio_msb *msb = NULL;
- if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
- return msb->get_new(_send_frame_size);
- }
- return managed_send_buffer::sptr();
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
}
size_t get_num_send_frames(void) const {return _num_send_frames;}
@@ -250,10 +242,9 @@ private:
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;
- bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs;
- std::list<udp_zero_copy_asio_msb> _msb_pool;
- std::list<udp_zero_copy_asio_mrb> _mrb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
//asio guts -> socket and service
asio::io_service _io_service;
diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp
index 3571ed856..d04244ca9 100644
--- a/host/lib/transport/usb_zero_copy_wrapper.cpp
+++ b/host/lib/transport/usb_zero_copy_wrapper.cpp
@@ -16,45 +16,64 @@
//
#include <uhd/transport/usb_zero_copy.hpp>
-#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/bind.hpp>
#include <vector>
#include <iostream>
+using namespace uhd;
using namespace uhd::transport;
+static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1));
+
/***********************************************************************
* USB zero copy wrapper - managed receive buffer
**********************************************************************/
class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{
public:
- usb_zero_copy_wrapper_mrb(bounded_buffer<usb_zero_copy_wrapper_mrb *> &queue):
- _queue(queue){/*NOP*/}
+ usb_zero_copy_wrapper_mrb(void){/*NOP*/}
void release(void){
- if (not _mrb) return;
_mrb.reset(); //decrement ref count, other MRB's may hold a ref
- _queue.push_with_haste(this);
+ _claimer.release();
}
- UHD_INLINE sptr get_new(managed_recv_buffer::sptr mrb, const void *mem, size_t len){
+ UHD_INLINE sptr get_new(
+ managed_recv_buffer::sptr &mrb, size_t &offset_bytes,
+ const double timeout, size_t &index
+ ){
+ if (not mrb or not _claimer.claim_with_wait(timeout)) return sptr();
+
+ index++; //advances the caller's buffer
+
+ //hold a copy of the buffer shared pointer
_mrb = mrb;
- _mem = mem;
- _len = len;
- return make_managed_buffer(this);
+
+ //extract this packet's memory address and length in bytes
+ char *mem = mrb->cast<char *>() + offset_bytes;
+ const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem);
+ const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header)
+ const size_t len = words32*sizeof(boost::uint32_t); //length in bytes
+
+ //check if this receive buffer has been exhausted
+ offset_bytes += len;
+ if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref
+ else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset();
+
+ return make(this, mem, len);
}
private:
- const void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
-
- bounded_buffer<usb_zero_copy_wrapper_mrb *> &_queue;
- const void *_mem;
- size_t _len;
managed_recv_buffer::sptr _mrb;
+ simple_claimer _claimer;
};
/***********************************************************************
@@ -63,16 +82,27 @@ private:
class usb_zero_copy_wrapper_msb : public managed_send_buffer{
public:
usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size):
- _internal(internal), _fragmentation_size(fragmentation_size){/*NOP*/}
+ _internal(internal), _fragmentation_size(fragmentation_size)
+ {
+ _ok_to_auto_flush = false;
+ _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this));
+ }
- void commit(size_t len){
- if (len == 0) return;
+ ~usb_zero_copy_wrapper_msb(void)
+ {
+ //ensure the task has exited before anything auto deconstructs
+ _task.reset();
+ }
+
+ void release(void){
+ boost::mutex::scoped_lock lock(_mutex);
+ _ok_to_auto_flush = true;
//get a reference to the VITA header before incrementing
const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0];
- _bytes_in_buffer += len;
- _mem_buffer_tip += len;
+ _bytes_in_buffer += size();
+ _mem_buffer_tip += size();
//extract VITA end of packet flag, we must force flush under eof conditions
const bool eop = (uhd::wtohx(vita_header) & (0x1 << 24)) != 0;
@@ -80,28 +110,53 @@ public:
if (eop or full){
_last_send_buff->commit(_bytes_in_buffer);
_last_send_buff.reset();
+
+ //notify the auto-flusher to restart its timed_wait
+ lock.unlock(); _cond.notify_one();
}
}
UHD_INLINE sptr get_new(const double timeout){
+ boost::mutex::scoped_lock lock(_mutex);
+ _ok_to_auto_flush = false;
+
if (not _last_send_buff){
_last_send_buff = _internal->get_send_buff(timeout);
if (not _last_send_buff) return sptr();
_mem_buffer_tip = _last_send_buff->cast<char *>();
_bytes_in_buffer = 0;
}
- return make_managed_buffer(this);
+
+ return make(this, _mem_buffer_tip, _fragmentation_size);
}
private:
- void *get_buff(void) const{return reinterpret_cast<void *>(_mem_buffer_tip);}
- size_t get_size(void) const{return _fragmentation_size;}
-
usb_zero_copy::sptr _internal;
const size_t _fragmentation_size;
managed_send_buffer::sptr _last_send_buff;
size_t _bytes_in_buffer;
char *_mem_buffer_tip;
+
+ //private variables for auto flusher
+ boost::mutex _mutex;
+ boost::condition_variable _cond;
+ uhd::task::sptr _task;
+ bool _ok_to_auto_flush;
+
+ /*!
+ * The auto flusher ensures that buffers are force committed when
+ * the user has not called get_new() within a certain time window.
+ */
+ void auto_flush(void)
+ {
+ boost::mutex::scoped_lock lock(_mutex);
+ const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT);
+ if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0)
+ {
+ _last_send_buff->commit(_bytes_in_buffer);
+ _last_send_buff.reset();
+ }
+ }
};
/***********************************************************************
@@ -112,44 +167,26 @@ public:
usb_zero_copy_wrapper(sptr usb_zc, const size_t frame_boundary):
_internal_zc(usb_zc),
_frame_boundary(frame_boundary),
- _available_recv_buffs(this->get_num_recv_frames()),
- _mrb_pool(this->get_num_recv_frames(), usb_zero_copy_wrapper_mrb(_available_recv_buffs)),
- _the_only_msb(usb_zero_copy_wrapper_msb(usb_zc, frame_boundary))
+ _next_recv_buff_index(0)
{
- BOOST_FOREACH(usb_zero_copy_wrapper_mrb &mrb, _mrb_pool){
- _available_recv_buffs.push_with_haste(&mrb);
+ for (size_t i = 0; i < this->get_num_recv_frames(); i++){
+ _mrb_pool.push_back(boost::make_shared<usb_zero_copy_wrapper_mrb>());
}
+ _the_only_msb = boost::make_shared<usb_zero_copy_wrapper_msb>(usb_zc, frame_boundary);
}
managed_recv_buffer::sptr get_recv_buff(double timeout){
//attempt to get a managed recv buffer
- if (not _last_recv_buff.get()){
+ if (not _last_recv_buff){
_last_recv_buff = _internal_zc->get_recv_buff(timeout);
- _last_recv_offset = 0;
+ _last_recv_offset = 0; //reset offset into buffer
}
- //attempt to get a wrapper for a managed recv buffer
- usb_zero_copy_wrapper_mrb *wmrb = NULL;
- if (_last_recv_buff.get() and _available_recv_buffs.pop_with_timed_wait(wmrb, timeout)){
- //extract this packet's memory address and length in bytes
- const char *mem = _last_recv_buff->cast<const char *>() + _last_recv_offset;
- const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem);
- const size_t len = (uhd::wtohx(mem32[0]) & 0xffff)*sizeof(boost::uint32_t); //length in bytes (from VRT header)
-
- managed_recv_buffer::sptr recv_buff; //the buffer to be returned to the user
- recv_buff = wmrb->get_new(_last_recv_buff, mem, len);
- _last_recv_offset += len;
-
- //check if this receive buffer has been exhausted
- if (_last_recv_offset >= _last_recv_buff->size()) {
- _last_recv_buff.reset();
- }
-
- return recv_buff;
- }
-
- //otherwise return a null sptr for failure
- return managed_recv_buffer::sptr();
+ //get the buffer to be returned to the user
+ if (_next_recv_buff_index == _mrb_pool.size()) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(
+ _last_recv_buff, _last_recv_offset, timeout, _next_recv_buff_index
+ );
}
size_t get_num_recv_frames(void) const{
@@ -161,7 +198,7 @@ public:
}
managed_send_buffer::sptr get_send_buff(double timeout){
- return _the_only_msb.get_new(timeout);
+ return _the_only_msb->get_new(timeout);
}
size_t get_num_send_frames(void) const{
@@ -175,16 +212,13 @@ public:
private:
sptr _internal_zc;
size_t _frame_boundary;
- bounded_buffer<usb_zero_copy_wrapper_mrb *> _available_recv_buffs;
- std::vector<usb_zero_copy_wrapper_mrb> _mrb_pool;
- usb_zero_copy_wrapper_msb _the_only_msb;
-
- //buffer to store partially-received VRT packets in
- buffer_pool::sptr _fragment_mem;
+ std::vector<boost::shared_ptr<usb_zero_copy_wrapper_mrb> > _mrb_pool;
+ boost::shared_ptr<usb_zero_copy_wrapper_msb> _the_only_msb;
//state for last recv buffer to create multiple managed buffers
managed_recv_buffer::sptr _last_recv_buff;
size_t _last_recv_offset;
+ size_t _next_recv_buff_index;
};
/***********************************************************************