diff options
Diffstat (limited to 'host/lib/transport/udp_zero_copy.cpp')
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 153 |
1 files changed, 72 insertions, 81 deletions
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index 0ccc92b82..166177177 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010-2011 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -18,12 +18,14 @@ #include "udp_common.hpp" #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/transport/udp_simple.hpp> //mtu -#include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/log.hpp> +#include <uhd/utils/atomic.hpp> #include <boost/format.hpp> -#include <list> +#include <boost/make_shared.hpp> +#include <boost/thread/thread.hpp> //sleep +#include <vector> using namespace uhd; using namespace uhd::transport; @@ -61,66 +63,84 @@ static void check_registry_for_fast_send_threshold(const size_t mtu){ /*********************************************************************** * Reusable managed receiver buffer: - * - Initialize with memory and a release callback. - * - Call get new with a length in bytes to re-use. + * - get_new performs the recv operation **********************************************************************/ class udp_zero_copy_asio_mrb : public managed_recv_buffer{ public: - udp_zero_copy_asio_mrb(void *mem, bounded_buffer<udp_zero_copy_asio_mrb *> &pending): - _mem(mem), _len(0), _pending(pending){/* NOP */} + udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size): + _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } void release(void){ - if (_len == 0) return; - _pending.push_with_haste(this); - _len = 0; + _claimer.release(); } - sptr get_new(size_t len){ - _len = len; - return make_managed_buffer(this); - } + UHD_INLINE sptr get_new(const double timeout, size_t &index){ + if (not _claimer.claim_with_wait(timeout)) return sptr(); - template <class T> T cast(void) const{return static_cast<T>(_mem);} + #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported + _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT); + if (_len > 0){ + index++; //advances the caller's buffer + return make(this, _mem, size_t(_len)); + } + #endif -private: - const void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} + if (wait_for_recv_ready(_sock_fd, timeout)){ + _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0); + index++; //advances the caller's buffer + return make(this, _mem, size_t(_len)); + } + _claimer.release(); //undo claim + return sptr(); //null for timeout + } + +private: void *_mem; - size_t _len; - bounded_buffer<udp_zero_copy_asio_mrb *> &_pending; + int _sock_fd; + size_t _frame_size; + ssize_t _len; + simple_claimer _claimer; }; /*********************************************************************** * Reusable managed send buffer: - * - Initialize with memory and a commit callback. - * - Call get new with a length in bytes to re-use. + * - commit performs the send operation **********************************************************************/ class udp_zero_copy_asio_msb : public managed_send_buffer{ public: - udp_zero_copy_asio_msb(void *mem, bounded_buffer<udp_zero_copy_asio_msb *> &pending, int sock_fd): - _mem(mem), _len(0), _pending(pending), _sock_fd(sock_fd){/* NOP */} - - void commit(size_t len){ - if (_len == 0) return; - ::send(_sock_fd, this->cast<const char *>(), len, 0); - _pending.push_with_haste(this); - _len = 0; + udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size): + _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } + + void release(void){ + //Retry logic because send may fail with ENOBUFS. + //This is known to occur at least on some OSX systems. + //But it should be safe to always check for the error. + while (true) + { + const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0); + if (ret == ssize_t(size())) break; + if (ret == -1 and errno == ENOBUFS) + { + boost::this_thread::sleep(boost::posix_time::microseconds(1)); + continue; //try to send again + } + UHD_ASSERT_THROW(ret == ssize_t(size())); + } + _claimer.release(); } - sptr get_new(size_t len){ - _len = len; - return make_managed_buffer(this); + UHD_INLINE sptr get_new(const double timeout, size_t &index){ + if (not _claimer.claim_with_wait(timeout)) return sptr(); + index++; //advances the caller's buffer + return make(this, _mem, _frame_size); } private: - void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} - void *_mem; - size_t _len; - bounded_buffer<udp_zero_copy_asio_msb *> &_pending; int _sock_fd; + size_t _frame_size; + simple_claimer _claimer; }; /*********************************************************************** @@ -145,8 +165,7 @@ public: _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _pending_recv_buffs(_num_recv_frames), - _pending_send_buffs(_num_send_frames) + _next_recv_buff_index(0), _next_send_buff_index(0) { UHD_LOG << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -167,18 +186,16 @@ public: //allocate re-usable managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ - _mrb_pool.push_back(udp_zero_copy_asio_mrb( - _recv_buffer_pool->at(i), _pending_recv_buffs + _mrb_pool.push_back(boost::make_shared<udp_zero_copy_asio_mrb>( + _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size() )); - _pending_recv_buffs.push_with_haste(&_mrb_pool.back()); } //allocate re-usable managed send buffers for (size_t i = 0; i < get_num_send_frames(); i++){ - _msb_pool.push_back(udp_zero_copy_asio_msb( - _send_buffer_pool->at(i), _pending_send_buffs, _sock_fd + _msb_pool.push_back(boost::make_shared<udp_zero_copy_asio_msb>( + _send_buffer_pool->at(i), _sock_fd, get_send_frame_size() )); - _pending_send_buffs.push_with_haste(&_msb_pool.back()); } } @@ -198,29 +215,11 @@ public: /******************************************************************* * Receive implementation: - * - * Perform a non-blocking receive for performance, - * and then fall back to a blocking receive with timeout. - * Return the managed receive buffer with the new length. - * When the caller is finished with the managed buffer, - * the managed receive buffer is released back into the queue. + * Block on the managed buffer's get call and advance the index. ******************************************************************/ managed_recv_buffer::sptr get_recv_buff(double timeout){ - udp_zero_copy_asio_mrb *mrb = NULL; - if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - - #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported - ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, MSG_DONTWAIT); - if (ret > 0) return mrb->get_new(ret); - #endif - - if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new( - ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0) - ); - - _pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue - } - return managed_recv_buffer::sptr(); + if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; + return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); } size_t get_num_recv_frames(void) const {return _num_recv_frames;} @@ -228,18 +227,11 @@ public: /******************************************************************* * Send implementation: - * - * Get a managed receive buffer immediately with max length set. - * The caller will fill the buffer and commit it when finished. - * The commit routine will perform a blocking send operation, - * and push the managed send buffer back into the queue. + * Block on the managed buffer's get call and advance the index. ******************************************************************/ managed_send_buffer::sptr get_send_buff(double timeout){ - udp_zero_copy_asio_msb *msb = NULL; - if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ - return msb->get_new(_send_frame_size); - } - return managed_send_buffer::sptr(); + if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; + return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); } size_t get_num_send_frames(void) const {return _num_send_frames;} @@ -250,10 +242,9 @@ private: const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; - bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; - std::list<udp_zero_copy_asio_msb> _msb_pool; - std::list<udp_zero_copy_asio_mrb> _mrb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool; + size_t _next_recv_buff_index, _next_send_buff_index; //asio guts -> socket and service asio::io_service _io_service; |