diff options
-rw-r--r-- | host/lib/transport/usb_zero_copy_wrapper.cpp | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp index 87e001fed..843b20f11 100644 --- a/host/lib/transport/usb_zero_copy_wrapper.cpp +++ b/host/lib/transport/usb_zero_copy_wrapper.cpp @@ -20,13 +20,19 @@ #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> #include <boost/foreach.hpp> #include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/bind.hpp> #include <vector> #include <iostream> using namespace uhd::transport; +static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1)); + /*********************************************************************** * USB zero copy wrapper - managed receive buffer **********************************************************************/ @@ -53,11 +59,13 @@ public: //extract this packet's memory address and length in bytes char *mem = mrb->cast<char *>() + offset_bytes; const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem); - size_t len = (uhd::wtohx(mem32[0]) & 0xffff)*sizeof(boost::uint32_t); //length in bytes (from VRT header) + const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header) + const size_t len = words32*sizeof(boost::uint32_t); //length in bytes //check if this receive buffer has been exhausted offset_bytes += len; if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref + else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset(); return make(this, mem, len); } @@ -73,9 +81,16 @@ private: class usb_zero_copy_wrapper_msb : public managed_send_buffer{ public: usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size): - _internal(internal), _fragmentation_size(fragmentation_size){/*NOP*/} + _internal(internal), _fragmentation_size(fragmentation_size) + { + _ok_to_auto_flush = false; + _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this)); + } void release(void){ + boost::mutex::scoped_lock lock(_mutex); + _ok_to_auto_flush = true; + //get a reference to the VITA header before incrementing const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0]; @@ -88,16 +103,23 @@ public: if (eop or full){ _last_send_buff->commit(_bytes_in_buffer); _last_send_buff.reset(); + + //notify the auto-flusher to restart its timed_wait + lock.unlock(); _cond.notify_one(); } } UHD_INLINE sptr get_new(const double timeout){ + boost::mutex::scoped_lock lock(_mutex); + _ok_to_auto_flush = false; + if (not _last_send_buff){ _last_send_buff = _internal->get_send_buff(timeout); if (not _last_send_buff) return sptr(); _mem_buffer_tip = _last_send_buff->cast<char *>(); _bytes_in_buffer = 0; } + return make(this, _mem_buffer_tip, _fragmentation_size); } @@ -107,6 +129,27 @@ private: managed_send_buffer::sptr _last_send_buff; size_t _bytes_in_buffer; char *_mem_buffer_tip; + + //private variables for auto flusher + boost::mutex _mutex; + boost::condition_variable _cond; + uhd::task::sptr _task; + bool _ok_to_auto_flush; + + /*! + * The auto flusher ensures that buffers are force committed when + * the user has not called get_new() within a certain time window. + */ + void auto_flush(void) + { + boost::mutex::scoped_lock lock(_mutex); + const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT); + if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0) + { + _last_send_buff->commit(_bytes_in_buffer); + _last_send_buff.reset(); + } + } }; /*********************************************************************** |