diff options
| -rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 4 | ||||
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 492 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 41 | 
3 files changed, 188 insertions, 349 deletions
| diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index d5a536b27..18eb3fb6d 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -30,7 +30,7 @@ namespace uhd{ namespace transport{       * Contains a reference to transport-managed memory,       * and a method to release the memory after reading.       */ -    class UHD_API managed_recv_buffer : boost::noncopyable{ +    class UHD_API managed_recv_buffer{      public:          typedef boost::shared_ptr<managed_recv_buffer> sptr;          typedef boost::function<void(void)> release_fcn_t; @@ -81,7 +81,7 @@ namespace uhd{ namespace transport{       * Contains a reference to transport-managed memory,       * and a method to commit the memory after writing.       */ -    class UHD_API managed_send_buffer : boost::noncopyable{ +    class UHD_API managed_send_buffer{      public:          typedef boost::shared_ptr<managed_send_buffer> sptr;          typedef boost::function<void(size_t)> commit_fcn_t; diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 6fab5ae6f..60f290fcf 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,258 +21,88 @@  #include <uhd/transport/buffer_pool.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp> +#include <boost/function.hpp>  #include <boost/foreach.hpp> -#include <boost/thread.hpp> -#include <vector> +#include <boost/thread/thread.hpp> +#include <list>  #include <iostream>  using namespace uhd;  using namespace uhd::transport; -static const double CLEANUP_TIMEOUT   = 0.2;    //seconds  static const size_t DEFAULT_NUM_XFERS = 16;     //num xfers  static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes  /*********************************************************************** - * Helper functions - ***********************************************************************/ -/* - * Print the values of a libusb_transfer struct - * http://libusb.sourceforge.net/api-1.0/structlibusb__transfer.html - */ -void pp_transfer(libusb_transfer *lut) -{ -    std::cout << "Libusb transfer"       << std::endl; -    std::cout << "    flags:         0x" << std::hex << (unsigned int) lut->flags << std::endl; -    std::cout << "    endpoint:      0x" << std::hex << (unsigned int) lut->endpoint << std::endl; -    std::cout << "    type:          0x" << std::hex << (unsigned int) lut->type << std::endl; -    std::cout << "    timeout:       "   << std::dec << lut->timeout << std::endl; -    std::cout << "    status:        0x" << std::hex << lut->status << std::endl; -    std::cout << "    length:        "   << std::dec << lut->length << std::endl; -    std::cout << "    actual_length: "   << std::dec << lut->actual_length << std::endl; -} - -/*********************************************************************** - * USB asynchronous zero_copy endpoint - *   This endpoint implementation provides asynchronous I/O to libusb-1.0 - *   devices. Each endpoint is directional and two can be combined to - *   create a bidirectional interface. It is a zero copy implementation - *   with respect to libusb, however, each send and recv requires a copy - *   operation from kernel to userspace; this is due to the usbfs - *   interface provided by the kernel. + * Reusable managed receiver buffer: + *  - Associated with a particular libusb transfer struct. + *  - Submits the transfer to libusb in the release method.   **********************************************************************/ -class usb_endpoint { +class libusb_zero_copy_mrb : public managed_recv_buffer{  public: -    typedef boost::shared_ptr<usb_endpoint> sptr; - -    usb_endpoint( -        libusb::device_handle::sptr handle, -        int endpoint, -        bool input, -        size_t transfer_size, -        size_t num_transfers -    ); - -    ~usb_endpoint(void); +    libusb_zero_copy_mrb(libusb_transfer *lut): +        _lut(lut), _expired(true) { /* NOP */ } -    // Exposed interface for submitting / retrieving transfer buffers - -    //! Submit a new transfer that was presumably just filled or emptied. -    void submit(libusb_transfer *lut); - -    /*! -     * Get an available transfer: -     * For inputs, this is a just filled transfer. -     * For outputs, this is a just emptied transfer. -     * \param timeout the timeout to wait for a lut -     * \return the transfer pointer or NULL if timeout -     */ -    libusb_transfer *get_lut_with_wait(double timeout); +    void release(void){ +        if (_expired) return; +        UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); +        _expired = true; +    } -    //Callback use only -    void callback_handle_transfer(libusb_transfer *lut); +    sptr get_new(void){ +        _expired = false; +        return sptr(this, &libusb_zero_copy_mrb::fake_deleter); +    }  private: -    libusb::device_handle::sptr _handle; -    int  _endpoint; -    bool _input; - -    //! hold a bounded buffer of completed transfers -    bounded_buffer<libusb_transfer *> _completed_list; - -    //! a list of all transfer structs we allocated -    std::vector<libusb_transfer *> _all_luts; +    static void fake_deleter(void *obj){ +        static_cast<libusb_zero_copy_mrb *>(obj)->release(); +    } -    //! memory allocated for the transfer buffers -    buffer_pool::sptr _buffer_pool; +    const void *get_buff(void) const{return _lut->buffer;} +    size_t get_size(void) const{return _lut->actual_length;} -    // Calls for processing asynchronous I/O -    libusb_transfer *allocate_transfer(void *mem, size_t len); -    void print_transfer_status(libusb_transfer *lut); +    libusb_transfer *_lut; +    bool _expired;  }; - -/* - * Callback function called when submitted transfers complete. - * The endpoint upon which the transfer is part of is recovered - * and the transfer moved from pending to completed state. - * Callbacks occur during the reaping calls where libusb_handle_events() - * is used. The callback only modifies the transfer state by moving - * it from the pending to completed status list. - * \param lut pointer to libusb_transfer - */ -static void callback(libusb_transfer *lut){ -    usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; -    endpoint->callback_handle_transfer(lut); -} - - -/* - * Accessor call to allow list access from callback space - * \param pointer to libusb_transfer - */ -void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ -    _completed_list.push_with_haste(lut); -} - - -/* - * Constructor - * Allocate libusb transfers and mark as free.  For IN endpoints, - * submit the transfers so that they're ready to return when - * data is available. - */ -usb_endpoint::usb_endpoint( -    libusb::device_handle::sptr handle, -    int endpoint, -    bool input, -    size_t transfer_size, -    size_t num_transfers -): -    _handle(handle), -    _endpoint(endpoint), -    _input(input), -    _completed_list(num_transfers) -{ -    _buffer_pool = buffer_pool::make(num_transfers, transfer_size); -    for (size_t i = 0; i < num_transfers; i++){ -        _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size)); - -        //input luts are immediately submitted to be filled -        //output luts go into the completed list as free buffers -        if (_input) this->submit(_all_luts.back()); -        else _completed_list.push_with_haste(_all_luts.back()); +/*********************************************************************** + * Reusable managed send buffer: + *  - Associated with a particular libusb transfer struct. + *  - Submits the transfer to libusb in the commit method. + **********************************************************************/ +class libusb_zero_copy_msb : public managed_send_buffer{ +public: +    libusb_zero_copy_msb(libusb_transfer *lut): +        _lut(lut), _expired(true) { /* NOP */ } + +    void commit(size_t len){ +        if (_expired) return; +        _lut->length = len; +        UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); +        _expired = true;      } -} - -/* - * Destructor - * Make sure all the memory is freed. Cancel any pending transfers. - * When all completed transfers are moved to the free list, release - * the transfers. Libusb will deallocate the data buffer held by - * each transfer. - */ -usb_endpoint::~usb_endpoint(void){ -    //cancel all transfers -    BOOST_FOREACH(libusb_transfer *lut, _all_luts){ -        libusb_cancel_transfer(lut); +    sptr get_new(void){ +        _expired = false; +        return sptr(this, &libusb_zero_copy_msb::fake_deleter);      } -    //collect canceled transfers (drain the queue) -    while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){}; - -    //free all transfers -    BOOST_FOREACH(libusb_transfer *lut, _all_luts){ -        libusb_free_transfer(lut); +private: +    static void fake_deleter(void *obj){ +        static_cast<libusb_zero_copy_msb *>(obj)->commit(0);      } -} - - -/* - * Allocate a libusb transfer - * The allocated transfer - and buffer it contains - is repeatedly - * submitted, reaped, and reused and should not be freed until shutdown. - * \param mem a pointer to the buffer memory - * \param len size of the individual buffer - * \return pointer to an allocated libusb_transfer - */ -libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){ -    libusb_transfer *lut = libusb_alloc_transfer(0); -    UHD_ASSERT_THROW(lut != NULL); - -    unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); -    unsigned char *buff = reinterpret_cast<unsigned char *>(mem); -    libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); - -    libusb_fill_bulk_transfer(lut,                // transfer -                              _handle->get(),     // dev_handle -                              endpoint,           // endpoint -                              buff,               // buffer -                              len,                // length -                              lut_callback,       // callback -                              this,               // user_data -                              0);                 // timeout -    return lut; -} +    void *get_buff(void) const{return _lut->buffer;} +    size_t get_size(void) const{return _lut->length;} -/* - * Asynchonous transfer submission - * Submit a libusb transfer to libusb add pending status - * \param lut pointer to libusb_transfer - * \return true on success or false on error - */ -void usb_endpoint::submit(libusb_transfer *lut){ -    UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); -} - -/* - * Print status errors of a completed transfer - * \param lut pointer to an libusb_transfer - */ -void usb_endpoint::print_transfer_status(libusb_transfer *lut){ -    std::cout << "here " << lut->status << std::endl; -    switch (lut->status) { -    case LIBUSB_TRANSFER_COMPLETED: -        if (lut->actual_length < lut->length) { -            std::cerr << "USB: transfer completed with short write," -                      << " length = " << lut->length -                      << " actual = " << lut->actual_length << std::endl; -        } - -        if ((lut->actual_length < 0) || (lut->length < 0)) { -            std::cerr << "USB: transfer completed with invalid response" -                      << std::endl; -        } -        break; -    case LIBUSB_TRANSFER_CANCELLED: -        break; -    case LIBUSB_TRANSFER_NO_DEVICE: -        std::cerr << "USB: device was disconnected" << std::endl; -        break; -    case LIBUSB_TRANSFER_OVERFLOW: -        std::cerr << "USB: device sent more data than requested" << std::endl; -        break; -    case LIBUSB_TRANSFER_TIMED_OUT: -        std::cerr << "USB: transfer timed out" << std::endl; -        break; -    case LIBUSB_TRANSFER_STALL: -        std::cerr << "USB: halt condition detected (stalled)" << std::endl; -        break; -    case LIBUSB_TRANSFER_ERROR: -        std::cerr << "USB: transfer failed" << std::endl; -        break; -    default: -        std::cerr << "USB: received unknown transfer status" << std::endl; -    } -} +    libusb_transfer *_lut; +    bool _expired; +}; -libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ -    boost::this_thread::disable_interruption di; //disable because the wait can throw -    libusb_transfer *lut = NULL; -    if (_completed_list.pop_with_timed_wait(lut, timeout)) return lut; -    return NULL; +//! helper function: handles all async callbacks +static void libusb_async_cb(libusb_transfer *lut){ +    (*static_cast<boost::function<void()> *>(lut->user_data))();  }  /*********************************************************************** @@ -286,16 +116,107 @@ public:          size_t recv_endpoint,          size_t send_endpoint,          const device_addr_t &hints -    ); +    ): +        _handle(handle), +        _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), +        _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), +        _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), +        _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))), +        _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), +        _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), +        _pending_recv_buffs(_num_recv_frames), +        _pending_send_buffs(_num_send_frames) +    { +        _handle->claim_interface(2 /*in interface*/); +        _handle->claim_interface(1 /*out interface*/); + +        //allocate libusb transfer structs and managed receive buffers +        for (size_t i = 0; i < get_num_recv_frames(); i++){ + +            libusb_transfer *lut = libusb_alloc_transfer(0); +            UHD_ASSERT_THROW(lut != NULL); + +            _mrb_pool.push_back(libusb_zero_copy_mrb(lut)); +            _callbacks.push_back(boost::bind( +                &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back() +            )); + +            libusb_fill_bulk_transfer( +                lut,                                                    // transfer +                _handle->get(),                                         // dev_handle +                (recv_endpoint & 0x7f) | 0x80,                          // endpoint +                static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer +                this->get_recv_frame_size(),                            // length +                static_cast<libusb_transfer_cb_fn>(&libusb_async_cb),   // callback +                static_cast<void *>(&_callbacks.back()),                // user_data +                0                                                       // timeout +            ); + +            _all_luts.push_back(lut); +            _mrb_pool.back().get_new(); +        } + +        //allocate libusb transfer structs and managed send buffers +        for (size_t i = 0; i < get_num_send_frames(); i++){ + +            libusb_transfer *lut = libusb_alloc_transfer(0); +            UHD_ASSERT_THROW(lut != NULL); + +            _msb_pool.push_back(libusb_zero_copy_msb(lut)); +            _callbacks.push_back(boost::bind( +                &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back() +            )); + +            libusb_fill_bulk_transfer( +                lut,                                                    // transfer +                _handle->get(),                                         // dev_handle +                (send_endpoint & 0x7f) | 0x00,                          // endpoint +                static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer +                this->get_send_frame_size(),                            // length +                static_cast<libusb_transfer_cb_fn>(&libusb_async_cb),   // callback +                static_cast<void *>(&_callbacks.back()),                // user_data +                0                                                       // timeout +            ); + +            _all_luts.push_back(lut); +            libusb_async_cb(lut); +        } + +        //spawn the event handler threads +        size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); +        for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( +            boost::bind(&libusb_zero_copy_impl::run_event_loop, this) +        ); +    }      ~libusb_zero_copy_impl(void){ +        //shutdown the threads          _threads_running = false;          _thread_group.interrupt_all();          _thread_group.join_all(); + +        //cancel and free all transfers +        BOOST_FOREACH(libusb_transfer *lut, _all_luts){ +            libusb_cancel_transfer(lut); +            libusb_free_transfer(lut); +        }      } -    managed_recv_buffer::sptr get_recv_buff(double); -    managed_send_buffer::sptr get_send_buff(double); +    managed_recv_buffer::sptr get_recv_buff(double timeout){ +        libusb_zero_copy_mrb *mrb = NULL; +        if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ +            return mrb->get_new(); +        } +        return managed_recv_buffer::sptr(); +    } + +    managed_send_buffer::sptr get_send_buff(double timeout){ +        libusb_zero_copy_msb *msb = NULL; +        if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ +            return msb->get_new(); +        } +        return managed_send_buffer::sptr(); +    }      size_t get_num_recv_frames(void) const { return _num_recv_frames; }      size_t get_num_send_frames(void) const { return _num_send_frames; } @@ -304,125 +225,50 @@ public:      size_t get_send_frame_size(void) const { return _send_frame_size; }  private: -    void release(libusb_transfer *lut){ -        _recv_ep->submit(lut); +    //! Handle a bound async callback for recv +    void handle_recv(libusb_zero_copy_mrb *mrb){ +        _pending_recv_buffs.push_with_haste(mrb);      } -    void commit(libusb_transfer *lut, size_t num_bytes){ -        lut->length = num_bytes; -        try{ -            _send_ep->submit(lut); -        } -        catch(const std::exception &e){ -            std::cerr << "Error in commit: " << e.what() << std::endl; -        } +    //! Handle a bound async callback for send +    void handle_send(libusb_zero_copy_msb *msb){ +        _pending_send_buffs.push_with_haste(msb);      }      libusb::device_handle::sptr _handle;      const size_t _recv_frame_size, _num_recv_frames;      const size_t _send_frame_size, _num_send_frames; -    usb_endpoint::sptr _recv_ep, _send_ep; -    //event handler threads +    //! Storage for transfer related objects +    buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; +    bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs; +    bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs; +    std::list<libusb_zero_copy_mrb> _mrb_pool; +    std::list<libusb_zero_copy_msb> _msb_pool; +    std::list<boost::function<void()> > _callbacks; + +    //! a list of all transfer structs we allocated +    std::list<libusb_transfer *> _all_luts; + +    //! event handler threads      boost::thread_group _thread_group;      bool _threads_running;      void run_event_loop(void){          set_thread_priority_safe(); -        libusb::session::sptr session = libusb::session::get_global_session(); +        libusb_context *context = libusb::session::get_global_session()->get_context();          _threads_running = true;          try{              while(_threads_running){                  timeval tv;                  tv.tv_sec = 0;                  tv.tv_usec = 100000; //100ms -                libusb_handle_events_timeout(session->get_context(), &tv); +                libusb_handle_events_timeout(context, &tv);              }          } catch(const boost::thread_interrupted &){}      } -}; - -/* - * Constructor - * Initializes libusb, opens devices, and sets up interfaces for I/O. - * Finally, creates endpoints for asynchronous I/O. - */ -libusb_zero_copy_impl::libusb_zero_copy_impl( -    libusb::device_handle::sptr handle, -    size_t recv_endpoint, -    size_t send_endpoint, -    const device_addr_t &hints -): -    _handle(handle), -    _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), -    _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), -    _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), -    _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))) -{ -    _handle->claim_interface(2 /*in interface*/); -    _handle->claim_interface(1 /*out interface*/); - -    _recv_ep = usb_endpoint::sptr(new usb_endpoint( -                              _handle,         // libusb device_handle -                              recv_endpoint,   // USB endpoint number -                              true,            // IN endpoint -                              this->get_recv_frame_size(),  // buffer size per transfer -                              this->get_num_recv_frames()   // number of libusb transfers -    )); - -    _send_ep = usb_endpoint::sptr(new usb_endpoint( -                              _handle,         // libusb device_handle -                              send_endpoint,   // USB endpoint number -                              false,           // OUT endpoint -                              this->get_send_frame_size(),  // buffer size per transfer -                              this->get_num_send_frames()   // number of libusb transfers -    )); - -    //spawn the event handler threads -    size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); -    for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( -        boost::bind(&libusb_zero_copy_impl::run_event_loop, this) -    ); -} - -/* - * Construct a managed receive buffer from a completed libusb transfer - * (happy with buffer full of data) obtained from the receive endpoint. - * Return empty pointer if no transfer is available (timeout or error). - * \return pointer to a managed receive buffer - */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ -    libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout); -    if (lut == NULL) { -        return managed_recv_buffer::sptr(); -    } -    else { -        return managed_recv_buffer::make_safe( -            lut->buffer, lut->actual_length, -            boost::bind(&libusb_zero_copy_impl::release, this, lut) -        ); -    } -} - -/* - * Construct a managed send buffer from a free libusb transfer (with - * empty buffer). Return empty pointer of no transfer is available - * (timeout or error). - * \return pointer to a managed send buffer - */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ -    libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout); -    if (lut == NULL) { -        return managed_send_buffer::sptr(); -    } -    else { -        return managed_send_buffer::make_safe( -            lut->buffer, this->get_send_frame_size(), -            boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1) -        ); -    } -} +};  /***********************************************************************   * USB zero_copy make functions diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 2794d383c..05352ffce 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -24,7 +24,7 @@  #include <boost/asio.hpp>  #include <boost/format.hpp>  #include <iostream> -#include <vector> +#include <list>  using namespace uhd;  using namespace uhd::transport; @@ -40,20 +40,18 @@ static const size_t DEFAULT_NUM_FRAMES = 32;   **********************************************************************/  class udp_zero_copy_asio_mrb : public managed_recv_buffer{  public: -    typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr;      typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type;      udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): -        _mem(mem), _release_cb(release_cb){/* NOP */} +        _mem(mem), _len(0), _release_cb(release_cb){/* NOP */}      void release(void){ -        if (_expired) return; +        if (_len == 0) return;          this->_release_cb(this); -        _expired = true; +        _len = 0;      }      sptr get_new(size_t len){ -        _expired = false;          _len = len;          return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter);      } @@ -68,7 +66,6 @@ private:      const void *get_buff(void) const{return _mem;}      size_t get_size(void) const{return _len;} -    bool _expired;      void *_mem;      size_t _len;      release_cb_type _release_cb; @@ -81,20 +78,18 @@ private:   **********************************************************************/  class udp_zero_copy_asio_msb : public managed_send_buffer{  public: -    typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr;      typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type;      udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): -        _mem(mem), _commit_cb(commit_cb){/* NOP */} +        _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */}      void commit(size_t len){ -        if (_expired) return; +        if (_len == 0) return;          this->_commit_cb(this, len); -        _expired = true; +        _len = 0;      }      sptr get_new(size_t len){ -        _expired = false;          _len = len;          return sptr(this, &udp_zero_copy_asio_msb::fake_deleter);      } @@ -107,7 +102,6 @@ private:      void *get_buff(void) const{return _mem;}      size_t get_size(void) const{return _len;} -    bool _expired;      void *_mem;      size_t _len;      commit_cb_type _commit_cb; @@ -135,7 +129,8 @@ public:          _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),          _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),          _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), -        _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames) +        _pending_recv_buffs(_num_recv_frames), +        _pending_send_buffs(_num_send_frames)      {          //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -152,20 +147,18 @@ public:          //allocate re-usable managed receive buffers          for (size_t i = 0; i < get_num_recv_frames(); i++){ -            _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( -                new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), +            _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),                  boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) -            )); -            handle_recv(_mrb_pool.back().get()); +            ); +            handle_recv(&_mrb_pool.back());          }          //allocate re-usable managed send buffers          for (size_t i = 0; i < get_num_send_frames(); i++){ -            _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( -                new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), +            _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i),                  boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) -            )); -            handle_send(_msb_pool.back().get()); +            ); +            handle_send(&_msb_pool.back());          }      } @@ -264,8 +257,8 @@ private:      buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;      bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;      bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; -    std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; -    std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool; +    std::list<udp_zero_copy_asio_msb> _msb_pool; +    std::list<udp_zero_copy_asio_mrb> _mrb_pool;      //asio guts -> socket and service      asio::io_service        _io_service; | 
