diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 7 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 66 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 95 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 112 | ||||
-rw-r--r-- | host/lib/transport/udp_wsa_zero_copy.cpp | 300 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 153 | ||||
-rw-r--r-- | host/lib/transport/usb_zero_copy_wrapper.cpp | 154 |
7 files changed, 673 insertions, 214 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 8e8ea5ea8..6524a8412 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -82,7 +82,11 @@ SET_SOURCE_FILES_PROPERTIES( ######################################################################## # Setup UDP ######################################################################## -LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp) +IF(WIN32) + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp) +ELSE() + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp) +ENDIF() #On windows, the boost asio implementation uses the winsock2 library. #Note: we exclude the .lib extension for cygwin and mingw platforms. @@ -97,6 +101,7 @@ CHECK_INCLUDE_FILE_CXX(atlbase.h HAVE_ATLBASE_H) IF(HAVE_ATLBASE_H) SET_SOURCE_FILES_PROPERTIES( ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp PROPERTIES COMPILE_DEFINITIONS "HAVE_ATLBASE_H" ) ENDIF(HAVE_ATLBASE_H) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 3e67264cd..28bff9709 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010-2011 Ettus Research LLC +// Copyright 2010-2012 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -21,6 +21,7 @@ #include <uhd/utils/msg.hpp> #include <uhd/exception.hpp> #include <boost/foreach.hpp> +#include <boost/make_shared.hpp> #include <boost/thread/thread.hpp> #include <list> @@ -61,8 +62,18 @@ static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ * \return true for completion, false for timeout */ UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){ - const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); + //already completed by a previous call? + if (completed) return true; + + //perform a non-blocking event handle + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + libusb_handle_events_timeout(ctx, &tv); + if (completed) return true; + //finish the rest with a timeout loop + const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000)); while (not completed and (boost::get_system_time() < timeout_time)){ timeval tv; tv.tv_sec = 0; @@ -82,21 +93,18 @@ class libusb_zero_copy_mrb : public managed_recv_buffer{ public: libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size): _ctx(libusb::session::get_global_session()->get_context()), - _lut(lut), _expired(false), _frame_size(frame_size) { /* NOP */ } + _lut(lut), _frame_size(frame_size) { /* NOP */ } void release(void){ - if (_expired) return; completed = false; _lut->length = _frame_size; //always reset length UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); - _expired = true; } sptr get_new(const double timeout, size_t &index){ if (wait_for_completion(_ctx, timeout, completed)){ index++; - _expired = false; - return make_managed_buffer(this); + return make(this, _lut->buffer, _lut->actual_length); } return managed_recv_buffer::sptr(); } @@ -104,12 +112,8 @@ public: bool completed; private: - const void *get_buff(void) const{return _lut->buffer;} - size_t get_size(void) const{return _lut->actual_length;} - libusb_context *_ctx; libusb_transfer *_lut; - bool _expired; const size_t _frame_size; }; @@ -122,22 +126,18 @@ class libusb_zero_copy_msb : public managed_send_buffer{ public: libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size): _ctx(libusb::session::get_global_session()->get_context()), - _lut(lut), _expired(false), _frame_size(frame_size) { /* NOP */ } + _lut(lut), _frame_size(frame_size) { completed = true; } - void commit(size_t len){ - if (_expired) return; + void release(void){ completed = false; - _lut->length = len; - if (len == 0) libusb_async_cb(_lut); - else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); - _expired = true; + _lut->length = size(); + UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); } sptr get_new(const double timeout, size_t &index){ if (wait_for_completion(_ctx, timeout, completed)){ index++; - _expired = false; - return make_managed_buffer(this); + return make(this, _lut->buffer, _frame_size); } return managed_send_buffer::sptr(); } @@ -145,12 +145,8 @@ public: bool completed; private: - void *get_buff(void) const{return _lut->buffer;} - size_t get_size(void) const{return _frame_size;} - libusb_context *_ctx; libusb_transfer *_lut; - bool _expired; const size_t _frame_size; }; @@ -181,13 +177,30 @@ public: _handle->claim_interface(recv_interface); _handle->claim_interface(send_interface); + //flush the buffers out of the recv endpoint + //limit the flushing to at most one second + for (size_t i = 0; i < 100; i++) + { + unsigned char buff[512]; + int transfered = 0; + const int status = libusb_bulk_transfer( + _handle->get(), // dev_handle + (recv_endpoint & 0x7f) | 0x80, // endpoint + static_cast<unsigned char *>(buff), + sizeof(buff), + &transfered, //bytes xfered + 10 //timeout ms + ); + if (status == LIBUSB_ERROR_TIMEOUT) break; + } + //allocate libusb transfer structs and managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); - _mrb_pool.push_back(boost::shared_ptr<libusb_zero_copy_mrb>(new libusb_zero_copy_mrb(lut, this->get_recv_frame_size()))); + _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size())); libusb_fill_bulk_transfer( lut, // transfer @@ -210,7 +223,7 @@ public: libusb_transfer *lut = libusb_alloc_transfer(0); UHD_ASSERT_THROW(lut != NULL); - _msb_pool.push_back(boost::shared_ptr<libusb_zero_copy_msb>(new libusb_zero_copy_msb(lut, this->get_send_frame_size()))); + _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size())); libusb_fill_bulk_transfer( lut, // transfer @@ -224,7 +237,6 @@ public: ); _all_luts.push_back(lut); - _msb_pool.back()->commit(0); } } diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 205c7a3a3..7a1972690 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -23,6 +23,8 @@ #include <uhd/convert.hpp> #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> @@ -31,6 +33,9 @@ #include <boost/foreach.hpp> #include <boost/function.hpp> #include <boost/format.hpp> +#include <boost/bind.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/barrier.hpp> #include <iostream> #include <vector> @@ -73,12 +78,25 @@ public: set_alignment_failure_threshold(1000); } + ~recv_packet_handler(void){ + _task_barrier_entry.interrupt(); + _task_barrier_exit.interrupt(); + _task_handlers.clear(); + } + //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; + _task_handlers.clear(); _props.resize(size); //re-initialize all buffers infos by re-creating the vector _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); + _task_barrier_entry.resize(size); + _task_barrier_exit.resize(size); + _task_handlers.resize(size); + for (size_t i = 1/*skip 0*/; i < size; i++){ + _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); + }; } //! Get the channel width of this handler @@ -125,7 +143,7 @@ public: //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ - _io_buffs.resize(id.num_outputs); + _num_outputs = id.num_outputs; _converter = uhd::convert::get_converter(id)(); this->set_scale_factor(1/32767.); //update after setting converter _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format); @@ -207,7 +225,7 @@ private: handle_overflow_type handle_overflow; }; std::vector<xport_chan_props_type> _props; - std::vector<void *> _io_buffs; //used in conversion + size_t _num_outputs; size_t _bytes_per_otw_item; //used in conversion size_t _bytes_per_cpu_item; //used in conversion uhd::convert::converter::sptr _converter; //used in conversion @@ -512,24 +530,19 @@ private: //extract the number of samples available to copy const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item; - const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available); + const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available); const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item; - const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size(); + const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs; - size_t buff_index = 0; - BOOST_FOREACH(per_buffer_info_type &buff_info, info){ + //setup the data to share with converter threads + _convert_nsamps = nsamps_to_copy_per_io_buff; + _convert_buffs = &buffs; + _convert_buffer_offset_bytes = buffer_offset_bytes; + _convert_bytes_to_copy = bytes_to_copy; - //fill a vector with pointers to the io buffers - BOOST_FOREACH(void *&io_buff, _io_buffs){ - io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes; - } + //perform N channels of conversion + converter_thread_task(0); - //copy-convert the samples from the recv buffer - _converter->conv(buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff); - - //update the rx copy buffer to reflect the bytes copied - buff_info.copy_buff += bytes_to_copy; - } //update the copy buffer's availability info.data_bytes_to_copy -= bytes_to_copy; @@ -538,15 +551,53 @@ private: metadata.fragment_offset = info.fragment_offset_in_samps; info.fragment_offset_in_samps += nsamps_to_copy; //set for next call - //done with buffers? this action releases buffers in-order - if (not metadata.more_fragments){ - BOOST_FOREACH(per_buffer_info_type &buff_info, info){ - buff_info.buff.reset(); //effectively a release - } + return nsamps_to_copy_per_io_buff; + } + + /******************************************************************* + * Perform one thread's work of the conversion task. + * The entry and exit use a dual synchronization barrier, + * to wait for data to become ready and block until completion. + ******************************************************************/ + UHD_INLINE void converter_thread_task(const size_t index) + { + _task_barrier_entry.wait(); + + //shortcut references to local data structures + buffers_info_type &buff_info = get_curr_buffer_info(); + per_buffer_info_type &info = buff_info[index]; + const rx_streamer::buffs_type &buffs = *_convert_buffs; + + //fill IO buffs with pointers into the output buffer + void *io_buffs[4/*max interleave*/]; + for (size_t i = 0; i < _num_outputs; i++){ + char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]); + io_buffs[i] = b + _convert_buffer_offset_bytes; } + const ref_vector<void *> out_buffs(io_buffs, _num_outputs); - return nsamps_to_copy_per_io_buff; + //perform the conversion operation + _converter->conv(info.copy_buff, out_buffs, _convert_nsamps); + + //advance the pointer for the source buffer + info.copy_buff += _convert_bytes_to_copy; + + //release the buffer if fully consumed + if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){ + info.buff.reset(); //effectively a release + } + + _task_barrier_exit.wait(); } + + //! Shared variables for the worker threads + reusable_barrier _task_barrier_entry, _task_barrier_exit; + std::vector<task::sptr> _task_handlers; + size_t _convert_nsamps; + const rx_streamer::buffs_type *_convert_buffs; + size_t _convert_buffer_offset_bytes; + size_t _convert_bytes_to_copy; + }; class recv_packet_streamer : public recv_packet_handler, public rx_streamer{ diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 46c98afea..74e893e67 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -23,6 +23,8 @@ #include <uhd/convert.hpp> #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> @@ -58,12 +60,25 @@ public: this->resize(size); } + ~send_packet_handler(void){ + _task_barrier_entry.interrupt(); + _task_barrier_exit.interrupt(); + _task_handlers.clear(); + } + //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; + _task_handlers.clear(); _props.resize(size); static const boost::uint64_t zero = 0; _zero_buffs.resize(size, &zero); + _task_barrier_entry.resize(size); + _task_barrier_exit.resize(size); + _task_handlers.resize(size); + for (size_t i = 1/*skip 0*/; i < size; i++){ + _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); + }; } //! Get the channel width of this handler @@ -77,6 +92,12 @@ public: _header_offset_words32 = header_offset_words32; } + //! Set the stream ID for a specific channel (or no SID) + void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){ + _props.at(xport_chan).has_sid = has_sid; + _props.at(xport_chan).sid = sid; + } + //! Set the rate of ticks per second void set_tick_rate(const double rate){ _tick_rate = rate; @@ -98,7 +119,7 @@ public: //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ - _io_buffs.resize(id.num_inputs); + _num_inputs = id.num_inputs; _converter = uhd::convert::get_converter(id)(); this->set_scale_factor(32767.); //update after setting converter _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format); @@ -133,7 +154,7 @@ public: //translate the metadata to vrt if packet info vrt::if_packet_info_t if_packet_info; if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; - if_packet_info.has_sid = false; + //if_packet_info.has_sid = false; //set per channel if_packet_info.has_cid = false; if_packet_info.has_tlr = true; if_packet_info.has_tsi = false; @@ -195,10 +216,14 @@ private: size_t _header_offset_words32; double _tick_rate, _samp_rate; struct xport_chan_props_type{ + xport_chan_props_type(void):has_sid(false){} get_buff_type get_buff; + bool has_sid; + boost::uint32_t sid; + managed_send_buffer::sptr buff; }; std::vector<xport_chan_props_type> _props; - std::vector<const void *> _io_buffs; //used in conversion + size_t _num_inputs; size_t _bytes_per_otw_item; //used in conversion size_t _bytes_per_cpu_item; //used in conversion uhd::convert::converter::sptr _converter; //used in conversion @@ -217,36 +242,77 @@ private: const size_t buffer_offset_bytes = 0 ){ //load the rest of the if_packet_info in here - if_packet_info.num_payload_bytes = nsamps_per_buff*_io_buffs.size()*_bytes_per_otw_item; + if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item; if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t); if_packet_info.packet_count = _next_packet_seq; - size_t buff_index = 0; + //get a buffer for each channel or timeout BOOST_FOREACH(xport_chan_props_type &props, _props){ - managed_send_buffer::sptr buff = props.get_buff(timeout); - if (buff.get() == NULL) return 0; //timeout - - //fill a vector with pointers to the io buffers - BOOST_FOREACH(const void *&io_buff, _io_buffs){ - io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes; - } - boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; - - //pack metadata into a vrt header - _vrt_packer(otw_mem, if_packet_info); - otw_mem += if_packet_info.num_header_words32; + if (not props.buff) props.buff = props.get_buff(timeout); + if (not props.buff) return 0; //timeout + } - //copy-convert the samples into the send buffer - _converter->conv(_io_buffs, otw_mem, nsamps_per_buff); + //setup the data to share with converter threads + _convert_nsamps = nsamps_per_buff; + _convert_buffs = &buffs; + _convert_buffer_offset_bytes = buffer_offset_bytes; + _convert_if_packet_info = &if_packet_info; - //commit the samples to the zero-copy interface - size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); - buff->commit(num_bytes_total); + //perform N channels of conversion + converter_thread_task(0); - } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } + + /******************************************************************* + * Perform one thread's work of the conversion task. + * The entry and exit use a dual synchronization barrier, + * to wait for data to become ready and block until completion. + ******************************************************************/ + UHD_INLINE void converter_thread_task(const size_t index) + { + _task_barrier_entry.wait(); + + //shortcut references to local data structures + managed_send_buffer::sptr &buff = _props[index].buff; + vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; + const tx_streamer::buffs_type &buffs = *_convert_buffs; + + //fill IO buffs with pointers into the output buffer + const void *io_buffs[4/*max interleave*/]; + for (size_t i = 0; i < _num_inputs; i++){ + const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]); + io_buffs[i] = b + _convert_buffer_offset_bytes; + } + const ref_vector<const void *> in_buffs(io_buffs, _num_inputs); + + //pack metadata into a vrt header + boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; + if_packet_info.has_sid = _props[index].has_sid; + if_packet_info.sid = _props[index].sid; + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //perform the conversion operation + _converter->conv(in_buffs, otw_mem, _convert_nsamps); + + //commit the samples to the zero-copy interface + const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; + buff->commit(num_vita_words32*sizeof(boost::uint32_t)); + buff.reset(); //effectively a release + + _task_barrier_exit.wait(); + } + + //! Shared variables for the worker threads + reusable_barrier _task_barrier_entry, _task_barrier_exit; + std::vector<task::sptr> _task_handlers; + size_t _convert_nsamps; + const tx_streamer::buffs_type *_convert_buffs; + size_t _convert_buffer_offset_bytes; + vrt::if_packet_info_t *_convert_if_packet_info; + }; class send_packet_streamer : public send_packet_handler, public tx_streamer{ diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp new file mode 100644 index 000000000..6fe4e3cad --- /dev/null +++ b/host/lib/transport/udp_wsa_zero_copy.cpp @@ -0,0 +1,300 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include "udp_common.hpp" +#include <uhd/transport/udp_zero_copy.hpp> +#include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/log.hpp> +#include <boost/format.hpp> +#include <vector> + +using namespace uhd; +using namespace uhd::transport; +namespace asio = boost::asio; + +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; + +/*********************************************************************** + * Check registry for correct fast-path setting (windows only) + **********************************************************************/ +#ifdef HAVE_ATLBASE_H +#define CHECK_REG_SEND_THRESH +#include <atlbase.h> //CRegKey +static void check_registry_for_fast_send_threshold(const size_t mtu){ + static bool warned = false; + if (warned) return; //only allow one printed warning per process + + CRegKey reg_key; + DWORD threshold = 1024; //system default when threshold is not specified + if ( + reg_key.Open(HKEY_LOCAL_MACHINE, "System\\CurrentControlSet\\Services\\AFD\\Parameters", KEY_READ) != ERROR_SUCCESS or + reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) != ERROR_SUCCESS or threshold < mtu + ){ + UHD_MSG(warning) << boost::format( + "The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n" + "This will negatively affect the transmit performance.\n" + "See the transport application notes for more detail.\n" + ) % mtu % threshold << std::endl; + warned = true; + } + reg_key.Close(); +} +#endif /*HAVE_ATLBASE_H*/ + +/*********************************************************************** + * Static initialization to take care of WSA init and cleanup + **********************************************************************/ +struct uhd_wsa_control{ + uhd_wsa_control(void){ + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); /*windows socket startup */ + } + + ~uhd_wsa_control(void){ + WSACleanup(); + } +}; + +/*********************************************************************** + * Reusable managed receiver buffer: + * - Initialize with memory and a release callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: + udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size): + _sock_fd(sock_fd), _frame_size(frame_size) + { + _wsa_buff.buf = reinterpret_cast<char *>(mem); + ZeroMemory(&_overlapped, sizeof(_overlapped)); + _overlapped.hEvent = WSACreateEvent(); + UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT); + this->release(); //makes buffer available via get_new + } + + ~udp_zero_copy_asio_mrb(void){ + WSACloseEvent(_overlapped.hEvent); + } + + void release(void){ + _wsa_buff.len = _frame_size; + _flags = 0; + WSARecv(_sock_fd, &_wsa_buff, 1, &_wsa_buff.len, &_flags, &_overlapped, NULL); + } + + UHD_INLINE sptr get_new(const double timeout, size_t &index){ + const DWORD result = WSAWaitForMultipleEvents( + 1, &_overlapped.hEvent, true, DWORD(timeout*1000), true + ); + if (result == WSA_WAIT_TIMEOUT) return managed_recv_buffer::sptr(); + index++; //advances the caller's buffer + + WSAGetOverlappedResult(_sock_fd, &_overlapped, &_wsa_buff.len, true, &_flags); + + WSAResetEvent(_overlapped.hEvent); + return make(this, _wsa_buff.buf, _wsa_buff.len); + } + +private: + int _sock_fd; + const size_t _frame_size; + WSAOVERLAPPED _overlapped; + WSABUF _wsa_buff; + DWORD _flags; +}; + +/*********************************************************************** + * Reusable managed send buffer: + * - committing the buffer calls the asynchronous socket send + * - getting a new buffer performs the blocking wait for completion + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: + udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size): + _sock_fd(sock_fd), _frame_size(frame_size) + { + _wsa_buff.buf = reinterpret_cast<char *>(mem); + ZeroMemory(&_overlapped, sizeof(_overlapped)); + _overlapped.hEvent = WSACreateEvent(); + UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT); + WSASetEvent(_overlapped.hEvent); //makes buffer available via get_new + } + + ~udp_zero_copy_asio_msb(void){ + WSACloseEvent(_overlapped.hEvent); + } + + void release(void){ + _wsa_buff.len = size(); + WSASend(_sock_fd, &_wsa_buff, 1, NULL, 0, &_overlapped, NULL); + } + + UHD_INLINE sptr get_new(const double timeout, size_t &index){ + const DWORD result = WSAWaitForMultipleEvents( + 1, &_overlapped.hEvent, true, DWORD(timeout*1000), true + ); + if (result == WSA_WAIT_TIMEOUT) return managed_send_buffer::sptr(); + index++; //advances the caller's buffer + + WSAResetEvent(_overlapped.hEvent); + _wsa_buff.len = _frame_size; + return make(this, _wsa_buff.buf, _wsa_buff.len); + } + +private: + int _sock_fd; + const size_t _frame_size; + WSAOVERLAPPED _overlapped; + WSABUF _wsa_buff; +}; + +/*********************************************************************** + * Zero Copy UDP implementation with WSA: + * + * This is not a true zero copy implementation as each + * send and recv requires a copy operation to/from userspace. + * + * For receive, use a blocking recv() call on the socket. + * This has better performance than the overlapped IO. + * For send, use overlapped IO to submit async sends. + **********************************************************************/ +class udp_zero_copy_wsa_impl : public udp_zero_copy{ +public: + typedef boost::shared_ptr<udp_zero_copy_wsa_impl> sptr; + + udp_zero_copy_wsa_impl( + const std::string &addr, + const std::string &port, + const device_addr_t &hints + ): + _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))), + _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), + _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), + _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), + _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), + _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), + _next_recv_buff_index(0), _next_send_buff_index(0) + { + #ifdef CHECK_REG_SEND_THRESH + check_registry_for_fast_send_threshold(this->get_send_frame_size()); + #endif /*CHECK_REG_SEND_THRESH*/ + + UHD_MSG(status) << boost::format("Creating WSA UDP transport for %s:%s") % addr % port << std::endl; + static uhd_wsa_control uhd_wsa; //makes wsa start happen via lazy initialization + + UHD_ASSERT_THROW(_num_send_frames <= WSA_MAXIMUM_WAIT_EVENTS); + + //resolve the address + asio::io_service io_service; + asio::ip::udp::resolver resolver(io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + + //create the socket + _sock_fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED); + if (_sock_fd == INVALID_SOCKET){ + const DWORD error = WSAGetLastError(); + throw uhd::os_error(str(boost::format("WSASocket() failed with error %d") % error)); + } + + //set the socket non-blocking for recv + //u_long mode = 1; + //ioctlsocket(_sock_fd, FIONBIO, &mode); + + //resize the socket buffers + const int recv_buff_size = int(hints.cast<double>("recv_buff_size", 0.0)); + const int send_buff_size = int(hints.cast<double>("send_buff_size", 0.0)); + if (recv_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_RCVBUF, (const char *)&recv_buff_size, sizeof(recv_buff_size)); + if (send_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_SNDBUF, (const char *)&send_buff_size, sizeof(send_buff_size)); + + //connect the socket so we can send/recv + const asio::ip::udp::endpoint::data_type &servaddr = *receiver_endpoint.data(); + if (WSAConnect(_sock_fd, (const struct sockaddr *)&servaddr, sizeof(servaddr), NULL, NULL, NULL, NULL) != 0){ + const DWORD error = WSAGetLastError(); + closesocket(_sock_fd); + throw uhd::os_error(str(boost::format("WSAConnect() failed with error %d") % error)); + } + + //allocate re-usable managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++){ + _mrb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_mrb>( + new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size()) + )); + } + + //allocate re-usable managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++){ + _msb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_msb>( + new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), _sock_fd, get_send_frame_size()) + )); + } + } + + ~udp_zero_copy_wsa_impl(void){ + closesocket(_sock_fd); + } + + /******************************************************************* + * Receive implementation: + * Block on the managed buffer's get call and advance the index. + ******************************************************************/ + managed_recv_buffer::sptr get_recv_buff(double timeout){ + if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; + return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); + } + + size_t get_num_recv_frames(void) const {return _num_recv_frames;} + size_t get_recv_frame_size(void) const {return _recv_frame_size;} + + /******************************************************************* + * Send implementation: + * Block on the managed buffer's get call and advance the index. + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout){ + if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; + return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); + } + + size_t get_num_send_frames(void) const {return _num_send_frames;} + size_t get_send_frame_size(void) const {return _send_frame_size;} + +private: + //memory management -> buffers and fifos + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; + buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool; + size_t _next_recv_buff_index, _next_send_buff_index; + + //socket guts + SOCKET _sock_fd; +}; + +/*********************************************************************** + * UDP zero copy make function + **********************************************************************/ +udp_zero_copy::sptr udp_zero_copy::make( + const std::string &addr, + const std::string &port, + const device_addr_t &hints +){ + return sptr(new udp_zero_copy_wsa_impl(addr, port, hints)); +} diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index 0ccc92b82..166177177 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010-2011 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -18,12 +18,14 @@ #include "udp_common.hpp" #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/transport/udp_simple.hpp> //mtu -#include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/log.hpp> +#include <uhd/utils/atomic.hpp> #include <boost/format.hpp> -#include <list> +#include <boost/make_shared.hpp> +#include <boost/thread/thread.hpp> //sleep +#include <vector> using namespace uhd; using namespace uhd::transport; @@ -61,66 +63,84 @@ static void check_registry_for_fast_send_threshold(const size_t mtu){ /*********************************************************************** * Reusable managed receiver buffer: - * - Initialize with memory and a release callback. - * - Call get new with a length in bytes to re-use. + * - get_new performs the recv operation **********************************************************************/ class udp_zero_copy_asio_mrb : public managed_recv_buffer{ public: - udp_zero_copy_asio_mrb(void *mem, bounded_buffer<udp_zero_copy_asio_mrb *> &pending): - _mem(mem), _len(0), _pending(pending){/* NOP */} + udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size): + _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } void release(void){ - if (_len == 0) return; - _pending.push_with_haste(this); - _len = 0; + _claimer.release(); } - sptr get_new(size_t len){ - _len = len; - return make_managed_buffer(this); - } + UHD_INLINE sptr get_new(const double timeout, size_t &index){ + if (not _claimer.claim_with_wait(timeout)) return sptr(); - template <class T> T cast(void) const{return static_cast<T>(_mem);} + #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported + _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT); + if (_len > 0){ + index++; //advances the caller's buffer + return make(this, _mem, size_t(_len)); + } + #endif -private: - const void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} + if (wait_for_recv_ready(_sock_fd, timeout)){ + _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0); + index++; //advances the caller's buffer + return make(this, _mem, size_t(_len)); + } + _claimer.release(); //undo claim + return sptr(); //null for timeout + } + +private: void *_mem; - size_t _len; - bounded_buffer<udp_zero_copy_asio_mrb *> &_pending; + int _sock_fd; + size_t _frame_size; + ssize_t _len; + simple_claimer _claimer; }; /*********************************************************************** * Reusable managed send buffer: - * - Initialize with memory and a commit callback. - * - Call get new with a length in bytes to re-use. + * - commit performs the send operation **********************************************************************/ class udp_zero_copy_asio_msb : public managed_send_buffer{ public: - udp_zero_copy_asio_msb(void *mem, bounded_buffer<udp_zero_copy_asio_msb *> &pending, int sock_fd): - _mem(mem), _len(0), _pending(pending), _sock_fd(sock_fd){/* NOP */} - - void commit(size_t len){ - if (_len == 0) return; - ::send(_sock_fd, this->cast<const char *>(), len, 0); - _pending.push_with_haste(this); - _len = 0; + udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size): + _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } + + void release(void){ + //Retry logic because send may fail with ENOBUFS. + //This is known to occur at least on some OSX systems. + //But it should be safe to always check for the error. + while (true) + { + const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0); + if (ret == ssize_t(size())) break; + if (ret == -1 and errno == ENOBUFS) + { + boost::this_thread::sleep(boost::posix_time::microseconds(1)); + continue; //try to send again + } + UHD_ASSERT_THROW(ret == ssize_t(size())); + } + _claimer.release(); } - sptr get_new(size_t len){ - _len = len; - return make_managed_buffer(this); + UHD_INLINE sptr get_new(const double timeout, size_t &index){ + if (not _claimer.claim_with_wait(timeout)) return sptr(); + index++; //advances the caller's buffer + return make(this, _mem, _frame_size); } private: - void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} - void *_mem; - size_t _len; - bounded_buffer<udp_zero_copy_asio_msb *> &_pending; int _sock_fd; + size_t _frame_size; + simple_claimer _claimer; }; /*********************************************************************** @@ -145,8 +165,7 @@ public: _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _pending_recv_buffs(_num_recv_frames), - _pending_send_buffs(_num_send_frames) + _next_recv_buff_index(0), _next_send_buff_index(0) { UHD_LOG << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -167,18 +186,16 @@ public: //allocate re-usable managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ - _mrb_pool.push_back(udp_zero_copy_asio_mrb( - _recv_buffer_pool->at(i), _pending_recv_buffs + _mrb_pool.push_back(boost::make_shared<udp_zero_copy_asio_mrb>( + _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size() )); - _pending_recv_buffs.push_with_haste(&_mrb_pool.back()); } //allocate re-usable managed send buffers for (size_t i = 0; i < get_num_send_frames(); i++){ - _msb_pool.push_back(udp_zero_copy_asio_msb( - _send_buffer_pool->at(i), _pending_send_buffs, _sock_fd + _msb_pool.push_back(boost::make_shared<udp_zero_copy_asio_msb>( + _send_buffer_pool->at(i), _sock_fd, get_send_frame_size() )); - _pending_send_buffs.push_with_haste(&_msb_pool.back()); } } @@ -198,29 +215,11 @@ public: /******************************************************************* * Receive implementation: - * - * Perform a non-blocking receive for performance, - * and then fall back to a blocking receive with timeout. - * Return the managed receive buffer with the new length. - * When the caller is finished with the managed buffer, - * the managed receive buffer is released back into the queue. + * Block on the managed buffer's get call and advance the index. ******************************************************************/ managed_recv_buffer::sptr get_recv_buff(double timeout){ - udp_zero_copy_asio_mrb *mrb = NULL; - if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - - #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported - ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, MSG_DONTWAIT); - if (ret > 0) return mrb->get_new(ret); - #endif - - if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new( - ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0) - ); - - _pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue - } - return managed_recv_buffer::sptr(); + if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; + return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); } size_t get_num_recv_frames(void) const {return _num_recv_frames;} @@ -228,18 +227,11 @@ public: /******************************************************************* * Send implementation: - * - * Get a managed receive buffer immediately with max length set. - * The caller will fill the buffer and commit it when finished. - * The commit routine will perform a blocking send operation, - * and push the managed send buffer back into the queue. + * Block on the managed buffer's get call and advance the index. ******************************************************************/ managed_send_buffer::sptr get_send_buff(double timeout){ - udp_zero_copy_asio_msb *msb = NULL; - if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ - return msb->get_new(_send_frame_size); - } - return managed_send_buffer::sptr(); + if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; + return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); } size_t get_num_send_frames(void) const {return _num_send_frames;} @@ -250,10 +242,9 @@ private: const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; - bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; - std::list<udp_zero_copy_asio_msb> _msb_pool; - std::list<udp_zero_copy_asio_mrb> _mrb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool; + size_t _next_recv_buff_index, _next_send_buff_index; //asio guts -> socket and service asio::io_service _io_service; diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp index 3571ed856..d04244ca9 100644 --- a/host/lib/transport/usb_zero_copy_wrapper.cpp +++ b/host/lib/transport/usb_zero_copy_wrapper.cpp @@ -16,45 +16,64 @@ // #include <uhd/transport/usb_zero_copy.hpp> -#include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/atomic.hpp> #include <boost/foreach.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/bind.hpp> #include <vector> #include <iostream> +using namespace uhd; using namespace uhd::transport; +static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1)); + /*********************************************************************** * USB zero copy wrapper - managed receive buffer **********************************************************************/ class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{ public: - usb_zero_copy_wrapper_mrb(bounded_buffer<usb_zero_copy_wrapper_mrb *> &queue): - _queue(queue){/*NOP*/} + usb_zero_copy_wrapper_mrb(void){/*NOP*/} void release(void){ - if (not _mrb) return; _mrb.reset(); //decrement ref count, other MRB's may hold a ref - _queue.push_with_haste(this); + _claimer.release(); } - UHD_INLINE sptr get_new(managed_recv_buffer::sptr mrb, const void *mem, size_t len){ + UHD_INLINE sptr get_new( + managed_recv_buffer::sptr &mrb, size_t &offset_bytes, + const double timeout, size_t &index + ){ + if (not mrb or not _claimer.claim_with_wait(timeout)) return sptr(); + + index++; //advances the caller's buffer + + //hold a copy of the buffer shared pointer _mrb = mrb; - _mem = mem; - _len = len; - return make_managed_buffer(this); + + //extract this packet's memory address and length in bytes + char *mem = mrb->cast<char *>() + offset_bytes; + const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem); + const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header) + const size_t len = words32*sizeof(boost::uint32_t); //length in bytes + + //check if this receive buffer has been exhausted + offset_bytes += len; + if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref + else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset(); + + return make(this, mem, len); } private: - const void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} - - bounded_buffer<usb_zero_copy_wrapper_mrb *> &_queue; - const void *_mem; - size_t _len; managed_recv_buffer::sptr _mrb; + simple_claimer _claimer; }; /*********************************************************************** @@ -63,16 +82,27 @@ private: class usb_zero_copy_wrapper_msb : public managed_send_buffer{ public: usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size): - _internal(internal), _fragmentation_size(fragmentation_size){/*NOP*/} + _internal(internal), _fragmentation_size(fragmentation_size) + { + _ok_to_auto_flush = false; + _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this)); + } - void commit(size_t len){ - if (len == 0) return; + ~usb_zero_copy_wrapper_msb(void) + { + //ensure the task has exited before anything auto deconstructs + _task.reset(); + } + + void release(void){ + boost::mutex::scoped_lock lock(_mutex); + _ok_to_auto_flush = true; //get a reference to the VITA header before incrementing const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0]; - _bytes_in_buffer += len; - _mem_buffer_tip += len; + _bytes_in_buffer += size(); + _mem_buffer_tip += size(); //extract VITA end of packet flag, we must force flush under eof conditions const bool eop = (uhd::wtohx(vita_header) & (0x1 << 24)) != 0; @@ -80,28 +110,53 @@ public: if (eop or full){ _last_send_buff->commit(_bytes_in_buffer); _last_send_buff.reset(); + + //notify the auto-flusher to restart its timed_wait + lock.unlock(); _cond.notify_one(); } } UHD_INLINE sptr get_new(const double timeout){ + boost::mutex::scoped_lock lock(_mutex); + _ok_to_auto_flush = false; + if (not _last_send_buff){ _last_send_buff = _internal->get_send_buff(timeout); if (not _last_send_buff) return sptr(); _mem_buffer_tip = _last_send_buff->cast<char *>(); _bytes_in_buffer = 0; } - return make_managed_buffer(this); + + return make(this, _mem_buffer_tip, _fragmentation_size); } private: - void *get_buff(void) const{return reinterpret_cast<void *>(_mem_buffer_tip);} - size_t get_size(void) const{return _fragmentation_size;} - usb_zero_copy::sptr _internal; const size_t _fragmentation_size; managed_send_buffer::sptr _last_send_buff; size_t _bytes_in_buffer; char *_mem_buffer_tip; + + //private variables for auto flusher + boost::mutex _mutex; + boost::condition_variable _cond; + uhd::task::sptr _task; + bool _ok_to_auto_flush; + + /*! + * The auto flusher ensures that buffers are force committed when + * the user has not called get_new() within a certain time window. + */ + void auto_flush(void) + { + boost::mutex::scoped_lock lock(_mutex); + const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT); + if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0) + { + _last_send_buff->commit(_bytes_in_buffer); + _last_send_buff.reset(); + } + } }; /*********************************************************************** @@ -112,44 +167,26 @@ public: usb_zero_copy_wrapper(sptr usb_zc, const size_t frame_boundary): _internal_zc(usb_zc), _frame_boundary(frame_boundary), - _available_recv_buffs(this->get_num_recv_frames()), - _mrb_pool(this->get_num_recv_frames(), usb_zero_copy_wrapper_mrb(_available_recv_buffs)), - _the_only_msb(usb_zero_copy_wrapper_msb(usb_zc, frame_boundary)) + _next_recv_buff_index(0) { - BOOST_FOREACH(usb_zero_copy_wrapper_mrb &mrb, _mrb_pool){ - _available_recv_buffs.push_with_haste(&mrb); + for (size_t i = 0; i < this->get_num_recv_frames(); i++){ + _mrb_pool.push_back(boost::make_shared<usb_zero_copy_wrapper_mrb>()); } + _the_only_msb = boost::make_shared<usb_zero_copy_wrapper_msb>(usb_zc, frame_boundary); } managed_recv_buffer::sptr get_recv_buff(double timeout){ //attempt to get a managed recv buffer - if (not _last_recv_buff.get()){ + if (not _last_recv_buff){ _last_recv_buff = _internal_zc->get_recv_buff(timeout); - _last_recv_offset = 0; + _last_recv_offset = 0; //reset offset into buffer } - //attempt to get a wrapper for a managed recv buffer - usb_zero_copy_wrapper_mrb *wmrb = NULL; - if (_last_recv_buff.get() and _available_recv_buffs.pop_with_timed_wait(wmrb, timeout)){ - //extract this packet's memory address and length in bytes - const char *mem = _last_recv_buff->cast<const char *>() + _last_recv_offset; - const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem); - const size_t len = (uhd::wtohx(mem32[0]) & 0xffff)*sizeof(boost::uint32_t); //length in bytes (from VRT header) - - managed_recv_buffer::sptr recv_buff; //the buffer to be returned to the user - recv_buff = wmrb->get_new(_last_recv_buff, mem, len); - _last_recv_offset += len; - - //check if this receive buffer has been exhausted - if (_last_recv_offset >= _last_recv_buff->size()) { - _last_recv_buff.reset(); - } - - return recv_buff; - } - - //otherwise return a null sptr for failure - return managed_recv_buffer::sptr(); + //get the buffer to be returned to the user + if (_next_recv_buff_index == _mrb_pool.size()) _next_recv_buff_index = 0; + return _mrb_pool[_next_recv_buff_index]->get_new( + _last_recv_buff, _last_recv_offset, timeout, _next_recv_buff_index + ); } size_t get_num_recv_frames(void) const{ @@ -161,7 +198,7 @@ public: } managed_send_buffer::sptr get_send_buff(double timeout){ - return _the_only_msb.get_new(timeout); + return _the_only_msb->get_new(timeout); } size_t get_num_send_frames(void) const{ @@ -175,16 +212,13 @@ public: private: sptr _internal_zc; size_t _frame_boundary; - bounded_buffer<usb_zero_copy_wrapper_mrb *> _available_recv_buffs; - std::vector<usb_zero_copy_wrapper_mrb> _mrb_pool; - usb_zero_copy_wrapper_msb _the_only_msb; - - //buffer to store partially-received VRT packets in - buffer_pool::sptr _fragment_mem; + std::vector<boost::shared_ptr<usb_zero_copy_wrapper_mrb> > _mrb_pool; + boost::shared_ptr<usb_zero_copy_wrapper_msb> _the_only_msb; //state for last recv buffer to create multiple managed buffers managed_recv_buffer::sptr _last_recv_buff; size_t _last_recv_offset; + size_t _next_recv_buff_index; }; /*********************************************************************** |