diff options
author | Josh Blum <josh@joshknows.com> | 2012-05-30 13:33:09 -0700 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2012-06-05 17:58:53 -0700 |
commit | 8b9ebe0a85fe9d5676a99699a8eeb12519392f3e (patch) | |
tree | 9a5c1c5a067b4d24c5bdc7e70a96df4a5a6007e5 | |
parent | b61a11031fb21bc3f917574660279d54e4a8643e (diff) | |
download | uhd-8b9ebe0a85fe9d5676a99699a8eeb12519392f3e.tar.gz uhd-8b9ebe0a85fe9d5676a99699a8eeb12519392f3e.tar.bz2 uhd-8b9ebe0a85fe9d5676a99699a8eeb12519392f3e.zip |
usb: created autoflusher task in usb_zero_copy_wrapper
Since we dont commit buffers ASAP until filled,
autoflusher will do this if timeout occurs.
-rw-r--r-- | host/lib/transport/usb_zero_copy_wrapper.cpp | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp index 87e001fed..843b20f11 100644 --- a/host/lib/transport/usb_zero_copy_wrapper.cpp +++ b/host/lib/transport/usb_zero_copy_wrapper.cpp @@ -20,13 +20,19 @@ #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> #include <boost/foreach.hpp> #include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/bind.hpp> #include <vector> #include <iostream> using namespace uhd::transport; +static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1)); + /*********************************************************************** * USB zero copy wrapper - managed receive buffer **********************************************************************/ @@ -53,11 +59,13 @@ public: //extract this packet's memory address and length in bytes char *mem = mrb->cast<char *>() + offset_bytes; const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem); - size_t len = (uhd::wtohx(mem32[0]) & 0xffff)*sizeof(boost::uint32_t); //length in bytes (from VRT header) + const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header) + const size_t len = words32*sizeof(boost::uint32_t); //length in bytes //check if this receive buffer has been exhausted offset_bytes += len; if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref + else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset(); return make(this, mem, len); } @@ -73,9 +81,16 @@ private: class usb_zero_copy_wrapper_msb : public managed_send_buffer{ public: usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size): - _internal(internal), _fragmentation_size(fragmentation_size){/*NOP*/} + _internal(internal), _fragmentation_size(fragmentation_size) + { + _ok_to_auto_flush = false; + _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this)); + } void release(void){ + boost::mutex::scoped_lock lock(_mutex); + _ok_to_auto_flush = true; + //get a reference to the VITA header before incrementing const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0]; @@ -88,16 +103,23 @@ public: if (eop or full){ _last_send_buff->commit(_bytes_in_buffer); _last_send_buff.reset(); + + //notify the auto-flusher to restart its timed_wait + lock.unlock(); _cond.notify_one(); } } UHD_INLINE sptr get_new(const double timeout){ + boost::mutex::scoped_lock lock(_mutex); + _ok_to_auto_flush = false; + if (not _last_send_buff){ _last_send_buff = _internal->get_send_buff(timeout); if (not _last_send_buff) return sptr(); _mem_buffer_tip = _last_send_buff->cast<char *>(); _bytes_in_buffer = 0; } + return make(this, _mem_buffer_tip, _fragmentation_size); } @@ -107,6 +129,27 @@ private: managed_send_buffer::sptr _last_send_buff; size_t _bytes_in_buffer; char *_mem_buffer_tip; + + //private variables for auto flusher + boost::mutex _mutex; + boost::condition_variable _cond; + uhd::task::sptr _task; + bool _ok_to_auto_flush; + + /*! + * The auto flusher ensures that buffers are force committed when + * the user has not called get_new() within a certain time window. + */ + void auto_flush(void) + { + boost::mutex::scoped_lock lock(_mutex); + const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT); + if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0) + { + _last_send_buff->commit(_bytes_in_buffer); + _last_send_buff.reset(); + } + } }; /*********************************************************************** |