diff options
| -rw-r--r-- | host/docs/transport.rst | 5 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 270 | 
2 files changed, 117 insertions, 158 deletions
| diff --git a/host/docs/transport.rst b/host/docs/transport.rst index 018f909c1..6b9d28bfa 100644 --- a/host/docs/transport.rst +++ b/host/docs/transport.rst @@ -34,10 +34,9 @@ The following parameters can be used to alter the transport's default behavior:  * **num_recv_frames:** The number of receive buffers to allocate  * **send_frame_size:** The size of a single send buffer in bytes  * **num_send_frames:** The number of send buffers to allocate -* **concurrency_hint:** The number of threads to run the IO service -**Note:** num_send_frames will not have an effect -as the asynchronous send implementation is currently disabled. +**Note:** num_recv_frames and num_send_frames will not have an effect +as the asynchronous send implementation is currently unimplemented.  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  Flow control parameters diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 90cd2a9ce..33c7dd94e 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -19,38 +19,99 @@  #include <uhd/transport/udp_simple.hpp> //mtu  #include <uhd/transport/bounded_buffer.hpp>  #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp>  #include <uhd/utils/warning.hpp>  #include <boost/asio.hpp>  #include <boost/format.hpp> -#include <boost/thread/thread.hpp> -#include <boost/enable_shared_from_this.hpp>  #include <iostream> +#include <vector>  using namespace uhd;  using namespace uhd::transport;  namespace asio = boost::asio; -//Define this to the the boost async io calls to perform receive. -//Otherwise, get_recv_buff uses a blocking receive with timeout. -//#define USE_ASIO_ASYNC_RECV +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; -//Define this to the the boost async io calls to perform send. -//Otherwise, the commit callback uses a blocking send. -//#define USE_ASIO_ASYNC_SEND +/*********************************************************************** + * Reusable managed receiver buffer: + *  - Initialize with memory and a release callback. + *  - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: +    typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr; +    typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type; -//The number of service threads to spawn for async ASIO: -//A single concurrent thread for io_service seems to be the fastest. -//Threads are disabled when no async implementations are enabled. -#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) -static const size_t CONCURRENCY_HINT = 1; -#else -static const size_t CONCURRENCY_HINT = 0; -#endif +    udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): +        _mem(mem), _release_cb(release_cb){/* NOP */} -//A reasonable number of frames for send/recv and async/sync -static const size_t DEFAULT_NUM_FRAMES = 32; +    void release(void){ +        if (_expired) return; +        this->_release_cb(this); +        _expired = true; +    } + +    sptr get_new(size_t len){ +        _expired = false; +        _len = len; +        return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); +    } + +    void *get(void) const{return _mem;} + +private: +    static void fake_deleter(void *obj){ +        static_cast<udp_zero_copy_asio_mrb *>(obj)->release(); +    } + +    const void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _len;} + +    bool _expired; +    void *_mem; +    size_t _len; +    release_cb_type _release_cb; +}; + +/*********************************************************************** + * Reusable managed send buffer: + *  - Initialize with memory and a commit callback. + *  - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: +    typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr; +    typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type; + +    udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): +        _mem(mem), _commit_cb(commit_cb){/* NOP */} + +    void commit(size_t len){ +        if (_expired) return; +        this->_commit_cb(this, len); +        _expired = true; +    } + +    sptr get_new(size_t len){ +        _expired = false; +        _len = len; +        return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); +    } + +private: +    static void fake_deleter(void *obj){ +        static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0); +    } + +    void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _len;} + +    bool _expired; +    void *_mem; +    size_t _len; +    commit_cb_type _commit_cb; +};  /***********************************************************************   * Zero Copy UDP implementation with ASIO: @@ -59,7 +120,7 @@ static const size_t DEFAULT_NUM_FRAMES = 32;   *   However, it is not a true zero copy implementation as each   *   send and recv requires a copy operation to/from userspace.   **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { +class udp_zero_copy_asio_impl : public udp_zero_copy{  public:      typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; @@ -74,9 +135,7 @@ public:          _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),          _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),          _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), -        _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames), -        _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), -        _io_service(_concurrency_hint) +        _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames)      {          //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -90,35 +149,28 @@ public:          _socket->open(asio::ip::udp::v4());          _socket->connect(receiver_endpoint);          _sock_fd = _socket->native(); -    } - -    ~udp_zero_copy_asio_impl(void){ -        delete _work; //allow io_service run to complete -        _thread_group.join_all(); //wait for service threads to exit -        delete _socket; -    } -    void init(void){ -        //release recv frames for use +        //allocate re-usable managed receive buffers          for (size_t i = 0; i < get_num_recv_frames(); i++){ -            release(_recv_buffer_pool->at(i)); +            _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( +                new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), +                boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) +            )); +            handle_recv(_mrb_pool.back().get());          } -        //push send frames into the fifo +        //allocate re-usable managed send buffers          for (size_t i = 0; i < get_num_send_frames(); i++){ -            handle_send(_send_buffer_pool->at(i)); +            _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( +                new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), +                boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) +            )); +            handle_send(_msb_pool.back().get());          } - -        //spawn the service threads that will run the io service -        _work = new asio::io_service::work(_io_service); //new work to delete later -        for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( -            boost::bind(&udp_zero_copy_asio_impl::service, this) -        );      } -    void service(void){ -        set_thread_priority_safe(); -        _io_service.run(); +    ~udp_zero_copy_asio_impl(void){ +        delete _socket;      }      //get size for internal socket buffer @@ -135,47 +187,12 @@ public:          return get_buff_size<Opt>();      } -    //! handle a recv callback -> push the filled memory into the fifo -    UHD_INLINE void handle_recv(void *mem, size_t len){ -        _pending_recv_buffs.push_with_pop_on_full(boost::asio::buffer(mem, len)); -    } - -    //////////////////////////////////////////////////////////////////// -    #ifdef USE_ASIO_ASYNC_RECV -    //////////////////////////////////////////////////////////////////// -    //! pop a filled recv buffer off of the fifo and bind with the release callback -    managed_recv_buffer::sptr get_recv_buff(double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        asio::mutable_buffer buff; -        if (_pending_recv_buffs.pop_with_timed_wait(buff, timeout)){ -            return managed_recv_buffer::make_safe( -                asio::buffer_cast<const void *>(buff), -                asio::buffer_size(buff), boost::bind( -                    &udp_zero_copy_asio_impl::release, this, -                    asio::buffer_cast<void*>(buff) -                ) -            ); -        } -        return managed_recv_buffer::sptr(); +    UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ +        _pending_recv_buffs.push_with_pop_on_full(mrb);      } -    //! release a recv buffer -> start an async recv on the buffer -    void release(void *mem){ -        _socket->async_receive( -            boost::asio::buffer(mem, this->get_recv_frame_size()), -            boost::bind( -                &udp_zero_copy_asio_impl::handle_recv, -                shared_from_this(), mem, -                asio::placeholders::bytes_transferred -            ) -        ); -    } - -    //////////////////////////////////////////////////////////////////// -    #else /*USE_ASIO_ASYNC_RECV*/ -    ////////////////////////////////////////////////////////////////////      managed_recv_buffer::sptr get_recv_buff(double timeout){ -        asio::mutable_buffer buff; +        udp_zero_copy_asio_mrb *mrb;          //setup timeval for timeout          timeval tv; @@ -191,108 +208,53 @@ public:          //if the condition is true, call receive and return the managed buffer          if (              ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 -            and _pending_recv_buffs.pop_with_haste(buff) +            and _pending_recv_buffs.pop_with_haste(mrb)          ){ -            return managed_recv_buffer::make_safe( -                boost::asio::buffer_cast<void *>(buff), -                _socket->receive(asio::buffer(buff)), -                boost::bind( -                    &udp_zero_copy_asio_impl::release, this, -                    asio::buffer_cast<void*>(buff) -                ) -            ); +            return mrb->get_new(::recv(_sock_fd, mrb->get(), _recv_frame_size, 0));          }          return managed_recv_buffer::sptr();      } -    void release(void *mem){ -        handle_recv(mem, this->get_recv_frame_size()); +    void release(udp_zero_copy_asio_mrb *mrb){ +        handle_recv(mrb);      } -    //////////////////////////////////////////////////////////////////// -    #endif /*USE_ASIO_ASYNC_RECV*/ -    //////////////////////////////////////////////////////////////////// -      size_t get_num_recv_frames(void) const {return _num_recv_frames;}      size_t get_recv_frame_size(void) const {return _recv_frame_size;} -    //! handle a send callback -> push the emptied memory into the fifo -    UHD_INLINE void handle_send(void *mem){ -        _pending_send_buffs.push_with_pop_on_full(boost::asio::buffer(mem, this->get_send_frame_size())); -    } - -    //////////////////////////////////////////////////////////////////// -    #ifdef USE_ASIO_ASYNC_SEND -    //////////////////////////////////////////////////////////////////// -    //! pop an empty send buffer off of the fifo and bind with the commit callback -    managed_send_buffer::sptr get_send_buff(double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        asio::mutable_buffer buff; -        if (_pending_send_buffs.pop_with_timed_wait(buff, timeout)){ -            return managed_send_buffer::make_safe( -                asio::buffer_cast<void *>(buff), -                asio::buffer_size(buff), boost::bind( -                    &udp_zero_copy_asio_impl::commit, this, -                    asio::buffer_cast<void*>(buff), _1 -                ) -            ); -        } -        return managed_send_buffer::sptr(); +    UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ +        _pending_send_buffs.push_with_pop_on_full(msb);      } -    //! commit a send buffer -> start an async send on the buffer -    void commit(void *mem, size_t len){ -        _socket->async_send( -            boost::asio::buffer(mem, len), -            boost::bind( -                &udp_zero_copy_asio_impl::handle_send, -                shared_from_this(), mem -            ) -        ); -    } - -    //////////////////////////////////////////////////////////////////// -    #else /*USE_ASIO_ASYNC_SEND*/ -    ////////////////////////////////////////////////////////////////////      managed_send_buffer::sptr get_send_buff(double){ -        asio::mutable_buffer buff; -        if (_pending_send_buffs.pop_with_haste(buff)){ -            return managed_send_buffer::make_safe( -                asio::buffer_cast<void *>(buff), -                asio::buffer_size(buff), boost::bind( -                    &udp_zero_copy_asio_impl::commit, this, -                    asio::buffer_cast<void*>(buff), _1 -                ) -            ); +        udp_zero_copy_asio_msb *msb; +        if (_pending_send_buffs.pop_with_haste(msb)){ +            return msb->get_new(_send_frame_size);          }          return managed_send_buffer::sptr();      } -    void commit(void *mem, size_t len){ -        _socket->send(asio::buffer(mem, len)); -        handle_send(mem); +    void commit(udp_zero_copy_asio_msb *msb, size_t len){ +        ::send(_sock_fd, msb->cast<const void *>(), len, 0); +        handle_send(msb);      } -    //////////////////////////////////////////////////////////////////// -    #endif /*USE_ASIO_ASYNC_SEND*/ -    //////////////////////////////////////////////////////////////////// -      size_t get_num_send_frames(void) const {return _num_send_frames;}      size_t get_send_frame_size(void) const {return _send_frame_size;}  private:      //memory management -> buffers and fifos -    boost::thread_group _thread_group;      const size_t _recv_frame_size, _num_recv_frames;      const size_t _send_frame_size, _num_send_frames;      buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; -    bounded_buffer<asio::mutable_buffer> _pending_recv_buffs, _pending_send_buffs; +    bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; +    bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; +    std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; +    std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool;      //asio guts -> socket and service -    size_t                  _concurrency_hint;      asio::io_service        _io_service;      asio::ip::udp::socket   *_socket; -    asio::io_service::work  *_work;      int                     _sock_fd;  }; @@ -345,7 +307,5 @@ udp_zero_copy::sptr udp_zero_copy::make(      resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");      resize_buff_helper<asio::socket_base::send_buffer_size>   (udp_trans, send_buff_size, "send"); -    udp_trans->init(); //buffers resized -> call init() to use -      return udp_trans;  } | 
