diff options
author | Josh Blum <josh@joshknows.com> | 2011-02-05 00:05:30 -0800 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2011-02-05 00:05:30 -0800 |
commit | 05c2b1b544b76babbdb0210ae6a8d5de19e247bb (patch) | |
tree | 98cbe5d5ecc3146cd0fae0c2f6a90b97f0bd8c8d /host | |
parent | 6c145193a955cfd6295b11bed3cc7cc9828feb9c (diff) | |
download | uhd-05c2b1b544b76babbdb0210ae6a8d5de19e247bb.tar.gz uhd-05c2b1b544b76babbdb0210ae6a8d5de19e247bb.tar.bz2 uhd-05c2b1b544b76babbdb0210ae6a8d5de19e247bb.zip |
udp: simplfy zero copy asio overhead with less shared_from_this, and timed waits when not needed
Diffstat (limited to 'host')
-rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 9 | ||||
-rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 11 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 44 |
3 files changed, 43 insertions, 21 deletions
diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index aca93b071..33ded8cfd 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -66,6 +66,13 @@ namespace uhd{ namespace transport{ virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0; /*! + * Pop an element from the bounded_buffer immediately. + * \param elem the element reference pop to + * \return false when the bounded_buffer is empty + */ + virtual bool pop_with_haste(elem_type &elem) = 0; + + /*! * Pop an element from the bounded_buffer. * Wait until the bounded_buffer becomes non-empty. * \param elem the element reference pop to diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 4fbe3f085..7bf182ee3 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -72,6 +72,15 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ return true; } + UHD_INLINE bool pop_with_haste(elem_type &elem){ + boost::mutex::scoped_lock lock(_mutex); + if(_buffer.empty()) return false; + elem = this->pop_back(); + lock.unlock(); + _full_cond.notify_one(); + return true; + } + UHD_INLINE void pop_with_wait(elem_type &elem){ boost::mutex::scoped_lock lock(_mutex); _empty_cond.wait(lock, _not_empty_fcn); diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 697e172cd..f083b97d8 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -34,7 +34,7 @@ namespace asio = boost::asio; //Define this to the the boost async io calls to perform receive. //Otherwise, get_recv_buff uses a blocking receive with timeout. -#define USE_ASIO_ASYNC_RECV +//#define USE_ASIO_ASYNC_RECV //Define this to the the boost async io calls to perform send. //Otherwise, the commit callback uses a blocking send. @@ -138,8 +138,7 @@ public: //! handle a recv callback -> push the filled memory into the fifo UHD_INLINE void handle_recv(void *mem, size_t len){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + _pending_recv_buffs->push_with_pop_on_full(boost::asio::buffer(mem, len)); } //////////////////////////////////////////////////////////////////// @@ -152,8 +151,7 @@ public: if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ return managed_recv_buffer::make_safe( buff, boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), + &udp_zero_copy_asio_impl::release, this, asio::buffer_cast<void*>(buff) ) ); @@ -177,7 +175,6 @@ public: #else /*USE_ASIO_ASYNC_RECV*/ //////////////////////////////////////////////////////////////////// managed_recv_buffer::sptr get_recv_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw asio::mutable_buffer buff; //setup timeval for timeout @@ -190,11 +187,11 @@ public: FD_ZERO(&rset); FD_SET(_sock_fd, &rset); - //call select to perform timed wait and grab an available buffer with wait + //call select to perform timed wait and grab an available buffer now //if the condition is true, call receive and return the managed buffer if ( - ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and - _pending_recv_buffs->pop_with_timed_wait(buff, timeout) + ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 + and _pending_recv_buffs->pop_with_haste(buff) ){ return managed_recv_buffer::make_safe( asio::buffer( @@ -202,8 +199,7 @@ public: _socket->receive(asio::buffer(buff)) ), boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), + &udp_zero_copy_asio_impl::release, this, asio::buffer_cast<void*>(buff) ) ); @@ -212,7 +208,6 @@ public: } void release(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw handle_recv(mem, this->get_recv_frame_size()); } @@ -225,10 +220,12 @@ public: //! handle a send callback -> push the emptied memory into the fifo UHD_INLINE void handle_send(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); + _pending_send_buffs->push_with_pop_on_full(boost::asio::buffer(mem, this->get_send_frame_size())); } + //////////////////////////////////////////////////////////////////// + #ifdef USE_ASIO_ASYNC_SEND + //////////////////////////////////////////////////////////////////// //! pop an empty send buffer off of the fifo and bind with the commit callback managed_send_buffer::sptr get_send_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -236,8 +233,7 @@ public: if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ return managed_send_buffer::make_safe( buff, boost::bind( - &udp_zero_copy_asio_impl::commit, - shared_from_this(), + &udp_zero_copy_asio_impl::commit, this, asio::buffer_cast<void*>(buff), _1 ) ); @@ -245,9 +241,6 @@ public: return managed_send_buffer::sptr(); } - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_SEND - //////////////////////////////////////////////////////////////////// //! commit a send buffer -> start an async send on the buffer void commit(void *mem, size_t len){ _socket->async_send( @@ -262,6 +255,19 @@ public: //////////////////////////////////////////////////////////////////// #else /*USE_ASIO_ASYNC_SEND*/ //////////////////////////////////////////////////////////////////// + managed_send_buffer::sptr get_send_buff(double){ + asio::mutable_buffer buff; + if (_pending_send_buffs->pop_with_haste(buff)){ + return managed_send_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::commit, this, + asio::buffer_cast<void*>(buff), _1 + ) + ); + } + return managed_send_buffer::sptr(); + } + void commit(void *mem, size_t len){ _socket->send(asio::buffer(mem, len)); handle_send(mem); |