diff options
-rw-r--r-- | host/docs/transport.rst | 5 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 270 |
2 files changed, 117 insertions, 158 deletions
diff --git a/host/docs/transport.rst b/host/docs/transport.rst index 018f909c1..6b9d28bfa 100644 --- a/host/docs/transport.rst +++ b/host/docs/transport.rst @@ -34,10 +34,9 @@ The following parameters can be used to alter the transport's default behavior: * **num_recv_frames:** The number of receive buffers to allocate * **send_frame_size:** The size of a single send buffer in bytes * **num_send_frames:** The number of send buffers to allocate -* **concurrency_hint:** The number of threads to run the IO service -**Note:** num_send_frames will not have an effect -as the asynchronous send implementation is currently disabled. +**Note:** num_recv_frames and num_send_frames will not have an effect +as the asynchronous send implementation is currently unimplemented. ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Flow control parameters diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 90cd2a9ce..33c7dd94e 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -19,38 +19,99 @@ #include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp> #include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> #include <boost/asio.hpp> #include <boost/format.hpp> -#include <boost/thread/thread.hpp> -#include <boost/enable_shared_from_this.hpp> #include <iostream> +#include <vector> using namespace uhd; using namespace uhd::transport; namespace asio = boost::asio; -//Define this to the the boost async io calls to perform receive. -//Otherwise, get_recv_buff uses a blocking receive with timeout. -//#define USE_ASIO_ASYNC_RECV +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; -//Define this to the the boost async io calls to perform send. -//Otherwise, the commit callback uses a blocking send. -//#define USE_ASIO_ASYNC_SEND +/*********************************************************************** + * Reusable managed receiver buffer: + * - Initialize with memory and a release callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: + typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr; + typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type; -//The number of service threads to spawn for async ASIO: -//A single concurrent thread for io_service seems to be the fastest. -//Threads are disabled when no async implementations are enabled. -#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) -static const size_t CONCURRENCY_HINT = 1; -#else -static const size_t CONCURRENCY_HINT = 0; -#endif + udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): + _mem(mem), _release_cb(release_cb){/* NOP */} -//A reasonable number of frames for send/recv and async/sync -static const size_t DEFAULT_NUM_FRAMES = 32; + void release(void){ + if (_expired) return; + this->_release_cb(this); + _expired = true; + } + + sptr get_new(size_t len){ + _expired = false; + _len = len; + return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); + } + + void *get(void) const{return _mem;} + +private: + static void fake_deleter(void *obj){ + static_cast<udp_zero_copy_asio_mrb *>(obj)->release(); + } + + const void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bool _expired; + void *_mem; + size_t _len; + release_cb_type _release_cb; +}; + +/*********************************************************************** + * Reusable managed send buffer: + * - Initialize with memory and a commit callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: + typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr; + typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type; + + udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): + _mem(mem), _commit_cb(commit_cb){/* NOP */} + + void commit(size_t len){ + if (_expired) return; + this->_commit_cb(this, len); + _expired = true; + } + + sptr get_new(size_t len){ + _expired = false; + _len = len; + return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); + } + +private: + static void fake_deleter(void *obj){ + static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0); + } + + void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bool _expired; + void *_mem; + size_t _len; + commit_cb_type _commit_cb; +}; /*********************************************************************** * Zero Copy UDP implementation with ASIO: @@ -59,7 +120,7 @@ static const size_t DEFAULT_NUM_FRAMES = 32; * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { +class udp_zero_copy_asio_impl : public udp_zero_copy{ public: typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; @@ -74,9 +135,7 @@ public: _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames), - _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), - _io_service(_concurrency_hint) + _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -90,35 +149,28 @@ public: _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native(); - } - - ~udp_zero_copy_asio_impl(void){ - delete _work; //allow io_service run to complete - _thread_group.join_all(); //wait for service threads to exit - delete _socket; - } - void init(void){ - //release recv frames for use + //allocate re-usable managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ - release(_recv_buffer_pool->at(i)); + _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( + new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) + )); + handle_recv(_mrb_pool.back().get()); } - //push send frames into the fifo + //allocate re-usable managed send buffers for (size_t i = 0; i < get_num_send_frames(); i++){ - handle_send(_send_buffer_pool->at(i)); + _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( + new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) + )); + handle_send(_msb_pool.back().get()); } - - //spawn the service threads that will run the io service - _work = new asio::io_service::work(_io_service); //new work to delete later - for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( - boost::bind(&udp_zero_copy_asio_impl::service, this) - ); } - void service(void){ - set_thread_priority_safe(); - _io_service.run(); + ~udp_zero_copy_asio_impl(void){ + delete _socket; } //get size for internal socket buffer @@ -135,47 +187,12 @@ public: return get_buff_size<Opt>(); } - //! handle a recv callback -> push the filled memory into the fifo - UHD_INLINE void handle_recv(void *mem, size_t len){ - _pending_recv_buffs.push_with_pop_on_full(boost::asio::buffer(mem, len)); - } - - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_RECV - //////////////////////////////////////////////////////////////////// - //! pop a filled recv buffer off of the fifo and bind with the release callback - managed_recv_buffer::sptr get_recv_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - if (_pending_recv_buffs.pop_with_timed_wait(buff, timeout)){ - return managed_recv_buffer::make_safe( - asio::buffer_cast<const void *>(buff), - asio::buffer_size(buff), boost::bind( - &udp_zero_copy_asio_impl::release, this, - asio::buffer_cast<void*>(buff) - ) - ); - } - return managed_recv_buffer::sptr(); + UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ + _pending_recv_buffs.push_with_pop_on_full(mrb); } - //! release a recv buffer -> start an async recv on the buffer - void release(void *mem){ - _socket->async_receive( - boost::asio::buffer(mem, this->get_recv_frame_size()), - boost::bind( - &udp_zero_copy_asio_impl::handle_recv, - shared_from_this(), mem, - asio::placeholders::bytes_transferred - ) - ); - } - - //////////////////////////////////////////////////////////////////// - #else /*USE_ASIO_ASYNC_RECV*/ - //////////////////////////////////////////////////////////////////// managed_recv_buffer::sptr get_recv_buff(double timeout){ - asio::mutable_buffer buff; + udp_zero_copy_asio_mrb *mrb; //setup timeval for timeout timeval tv; @@ -191,108 +208,53 @@ public: //if the condition is true, call receive and return the managed buffer if ( ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 - and _pending_recv_buffs.pop_with_haste(buff) + and _pending_recv_buffs.pop_with_haste(mrb) ){ - return managed_recv_buffer::make_safe( - boost::asio::buffer_cast<void *>(buff), - _socket->receive(asio::buffer(buff)), - boost::bind( - &udp_zero_copy_asio_impl::release, this, - asio::buffer_cast<void*>(buff) - ) - ); + return mrb->get_new(::recv(_sock_fd, mrb->get(), _recv_frame_size, 0)); } return managed_recv_buffer::sptr(); } - void release(void *mem){ - handle_recv(mem, this->get_recv_frame_size()); + void release(udp_zero_copy_asio_mrb *mrb){ + handle_recv(mrb); } - //////////////////////////////////////////////////////////////////// - #endif /*USE_ASIO_ASYNC_RECV*/ - //////////////////////////////////////////////////////////////////// - size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} - //! handle a send callback -> push the emptied memory into the fifo - UHD_INLINE void handle_send(void *mem){ - _pending_send_buffs.push_with_pop_on_full(boost::asio::buffer(mem, this->get_send_frame_size())); - } - - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_SEND - //////////////////////////////////////////////////////////////////// - //! pop an empty send buffer off of the fifo and bind with the commit callback - managed_send_buffer::sptr get_send_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - if (_pending_send_buffs.pop_with_timed_wait(buff, timeout)){ - return managed_send_buffer::make_safe( - asio::buffer_cast<void *>(buff), - asio::buffer_size(buff), boost::bind( - &udp_zero_copy_asio_impl::commit, this, - asio::buffer_cast<void*>(buff), _1 - ) - ); - } - return managed_send_buffer::sptr(); + UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ + _pending_send_buffs.push_with_pop_on_full(msb); } - //! commit a send buffer -> start an async send on the buffer - void commit(void *mem, size_t len){ - _socket->async_send( - boost::asio::buffer(mem, len), - boost::bind( - &udp_zero_copy_asio_impl::handle_send, - shared_from_this(), mem - ) - ); - } - - //////////////////////////////////////////////////////////////////// - #else /*USE_ASIO_ASYNC_SEND*/ - //////////////////////////////////////////////////////////////////// managed_send_buffer::sptr get_send_buff(double){ - asio::mutable_buffer buff; - if (_pending_send_buffs.pop_with_haste(buff)){ - return managed_send_buffer::make_safe( - asio::buffer_cast<void *>(buff), - asio::buffer_size(buff), boost::bind( - &udp_zero_copy_asio_impl::commit, this, - asio::buffer_cast<void*>(buff), _1 - ) - ); + udp_zero_copy_asio_msb *msb; + if (_pending_send_buffs.pop_with_haste(msb)){ + return msb->get_new(_send_frame_size); } return managed_send_buffer::sptr(); } - void commit(void *mem, size_t len){ - _socket->send(asio::buffer(mem, len)); - handle_send(mem); + void commit(udp_zero_copy_asio_msb *msb, size_t len){ + ::send(_sock_fd, msb->cast<const void *>(), len, 0); + handle_send(msb); } - //////////////////////////////////////////////////////////////////// - #endif /*USE_ASIO_ASYNC_SEND*/ - //////////////////////////////////////////////////////////////////// - size_t get_num_send_frames(void) const {return _num_send_frames;} size_t get_send_frame_size(void) const {return _send_frame_size;} private: //memory management -> buffers and fifos - boost::thread_group _thread_group; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - bounded_buffer<asio::mutable_buffer> _pending_recv_buffs, _pending_send_buffs; + bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; + bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; + std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; + std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool; //asio guts -> socket and service - size_t _concurrency_hint; asio::io_service _io_service; asio::ip::udp::socket *_socket; - asio::io_service::work *_work; int _sock_fd; }; @@ -345,7 +307,5 @@ udp_zero_copy::sptr udp_zero_copy::make( resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); - udp_trans->init(); //buffers resized -> call init() to use - return udp_trans; } |