diff options
author | Josh Blum <josh@joshknows.com> | 2011-02-15 10:31:48 -0800 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2011-02-15 10:34:37 -0800 |
commit | 9bc6fbe685579d505e8352e1de5e118a5a3ea163 (patch) | |
tree | 5a256e83bf339ee76e50fbfa8a16d52cbc7efd64 /host | |
parent | 9f3ecefbc595f4386a652b4bed4a4bb0bc139e0b (diff) | |
download | uhd-9bc6fbe685579d505e8352e1de5e118a5a3ea163.tar.gz uhd-9bc6fbe685579d505e8352e1de5e118a5a3ea163.tar.bz2 uhd-9bc6fbe685579d505e8352e1de5e118a5a3ea163.zip |
uhd: reusable buffers for libusb zero copy implementation
tweaks on udp implementation to simplify
Diffstat (limited to 'host')
-rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 4 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 492 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 41 |
3 files changed, 188 insertions, 349 deletions
diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index d5a536b27..18eb3fb6d 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -30,7 +30,7 @@ namespace uhd{ namespace transport{ * Contains a reference to transport-managed memory, * and a method to release the memory after reading. */ - class UHD_API managed_recv_buffer : boost::noncopyable{ + class UHD_API managed_recv_buffer{ public: typedef boost::shared_ptr<managed_recv_buffer> sptr; typedef boost::function<void(void)> release_fcn_t; @@ -81,7 +81,7 @@ namespace uhd{ namespace transport{ * Contains a reference to transport-managed memory, * and a method to commit the memory after writing. */ - class UHD_API managed_send_buffer : boost::noncopyable{ + class UHD_API managed_send_buffer{ public: typedef boost::shared_ptr<managed_send_buffer> sptr; typedef boost::function<void(size_t)> commit_fcn_t; diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 6fab5ae6f..60f290fcf 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,258 +21,88 @@ #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/thread_priority.hpp> #include <uhd/utils/assert.hpp> +#include <boost/function.hpp> #include <boost/foreach.hpp> -#include <boost/thread.hpp> -#include <vector> +#include <boost/thread/thread.hpp> +#include <list> #include <iostream> using namespace uhd; using namespace uhd::transport; -static const double CLEANUP_TIMEOUT = 0.2; //seconds static const size_t DEFAULT_NUM_XFERS = 16; //num xfers static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes /*********************************************************************** - * Helper functions - ***********************************************************************/ -/* - * Print the values of a libusb_transfer struct - * http://libusb.sourceforge.net/api-1.0/structlibusb__transfer.html - */ -void pp_transfer(libusb_transfer *lut) -{ - std::cout << "Libusb transfer" << std::endl; - std::cout << " flags: 0x" << std::hex << (unsigned int) lut->flags << std::endl; - std::cout << " endpoint: 0x" << std::hex << (unsigned int) lut->endpoint << std::endl; - std::cout << " type: 0x" << std::hex << (unsigned int) lut->type << std::endl; - std::cout << " timeout: " << std::dec << lut->timeout << std::endl; - std::cout << " status: 0x" << std::hex << lut->status << std::endl; - std::cout << " length: " << std::dec << lut->length << std::endl; - std::cout << " actual_length: " << std::dec << lut->actual_length << std::endl; -} - -/*********************************************************************** - * USB asynchronous zero_copy endpoint - * This endpoint implementation provides asynchronous I/O to libusb-1.0 - * devices. Each endpoint is directional and two can be combined to - * create a bidirectional interface. It is a zero copy implementation - * with respect to libusb, however, each send and recv requires a copy - * operation from kernel to userspace; this is due to the usbfs - * interface provided by the kernel. + * Reusable managed receiver buffer: + * - Associated with a particular libusb transfer struct. + * - Submits the transfer to libusb in the release method. **********************************************************************/ -class usb_endpoint { +class libusb_zero_copy_mrb : public managed_recv_buffer{ public: - typedef boost::shared_ptr<usb_endpoint> sptr; - - usb_endpoint( - libusb::device_handle::sptr handle, - int endpoint, - bool input, - size_t transfer_size, - size_t num_transfers - ); - - ~usb_endpoint(void); + libusb_zero_copy_mrb(libusb_transfer *lut): + _lut(lut), _expired(true) { /* NOP */ } - // Exposed interface for submitting / retrieving transfer buffers - - //! Submit a new transfer that was presumably just filled or emptied. - void submit(libusb_transfer *lut); - - /*! - * Get an available transfer: - * For inputs, this is a just filled transfer. - * For outputs, this is a just emptied transfer. - * \param timeout the timeout to wait for a lut - * \return the transfer pointer or NULL if timeout - */ - libusb_transfer *get_lut_with_wait(double timeout); + void release(void){ + if (_expired) return; + UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); + _expired = true; + } - //Callback use only - void callback_handle_transfer(libusb_transfer *lut); + sptr get_new(void){ + _expired = false; + return sptr(this, &libusb_zero_copy_mrb::fake_deleter); + } private: - libusb::device_handle::sptr _handle; - int _endpoint; - bool _input; - - //! hold a bounded buffer of completed transfers - bounded_buffer<libusb_transfer *> _completed_list; - - //! a list of all transfer structs we allocated - std::vector<libusb_transfer *> _all_luts; + static void fake_deleter(void *obj){ + static_cast<libusb_zero_copy_mrb *>(obj)->release(); + } - //! memory allocated for the transfer buffers - buffer_pool::sptr _buffer_pool; + const void *get_buff(void) const{return _lut->buffer;} + size_t get_size(void) const{return _lut->actual_length;} - // Calls for processing asynchronous I/O - libusb_transfer *allocate_transfer(void *mem, size_t len); - void print_transfer_status(libusb_transfer *lut); + libusb_transfer *_lut; + bool _expired; }; - -/* - * Callback function called when submitted transfers complete. - * The endpoint upon which the transfer is part of is recovered - * and the transfer moved from pending to completed state. - * Callbacks occur during the reaping calls where libusb_handle_events() - * is used. The callback only modifies the transfer state by moving - * it from the pending to completed status list. - * \param lut pointer to libusb_transfer - */ -static void callback(libusb_transfer *lut){ - usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; - endpoint->callback_handle_transfer(lut); -} - - -/* - * Accessor call to allow list access from callback space - * \param pointer to libusb_transfer - */ -void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ - _completed_list.push_with_haste(lut); -} - - -/* - * Constructor - * Allocate libusb transfers and mark as free. For IN endpoints, - * submit the transfers so that they're ready to return when - * data is available. - */ -usb_endpoint::usb_endpoint( - libusb::device_handle::sptr handle, - int endpoint, - bool input, - size_t transfer_size, - size_t num_transfers -): - _handle(handle), - _endpoint(endpoint), - _input(input), - _completed_list(num_transfers) -{ - _buffer_pool = buffer_pool::make(num_transfers, transfer_size); - for (size_t i = 0; i < num_transfers; i++){ - _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size)); - - //input luts are immediately submitted to be filled - //output luts go into the completed list as free buffers - if (_input) this->submit(_all_luts.back()); - else _completed_list.push_with_haste(_all_luts.back()); +/*********************************************************************** + * Reusable managed send buffer: + * - Associated with a particular libusb transfer struct. + * - Submits the transfer to libusb in the commit method. + **********************************************************************/ +class libusb_zero_copy_msb : public managed_send_buffer{ +public: + libusb_zero_copy_msb(libusb_transfer *lut): + _lut(lut), _expired(true) { /* NOP */ } + + void commit(size_t len){ + if (_expired) return; + _lut->length = len; + UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); + _expired = true; } -} - -/* - * Destructor - * Make sure all the memory is freed. Cancel any pending transfers. - * When all completed transfers are moved to the free list, release - * the transfers. Libusb will deallocate the data buffer held by - * each transfer. - */ -usb_endpoint::~usb_endpoint(void){ - //cancel all transfers - BOOST_FOREACH(libusb_transfer *lut, _all_luts){ - libusb_cancel_transfer(lut); + sptr get_new(void){ + _expired = false; + return sptr(this, &libusb_zero_copy_msb::fake_deleter); } - //collect canceled transfers (drain the queue) - while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){}; - - //free all transfers - BOOST_FOREACH(libusb_transfer *lut, _all_luts){ - libusb_free_transfer(lut); +private: + static void fake_deleter(void *obj){ + static_cast<libusb_zero_copy_msb *>(obj)->commit(0); } -} - - -/* - * Allocate a libusb transfer - * The allocated transfer - and buffer it contains - is repeatedly - * submitted, reaped, and reused and should not be freed until shutdown. - * \param mem a pointer to the buffer memory - * \param len size of the individual buffer - * \return pointer to an allocated libusb_transfer - */ -libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){ - libusb_transfer *lut = libusb_alloc_transfer(0); - UHD_ASSERT_THROW(lut != NULL); - - unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); - unsigned char *buff = reinterpret_cast<unsigned char *>(mem); - libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); - - libusb_fill_bulk_transfer(lut, // transfer - _handle->get(), // dev_handle - endpoint, // endpoint - buff, // buffer - len, // length - lut_callback, // callback - this, // user_data - 0); // timeout - return lut; -} + void *get_buff(void) const{return _lut->buffer;} + size_t get_size(void) const{return _lut->length;} -/* - * Asynchonous transfer submission - * Submit a libusb transfer to libusb add pending status - * \param lut pointer to libusb_transfer - * \return true on success or false on error - */ -void usb_endpoint::submit(libusb_transfer *lut){ - UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); -} - -/* - * Print status errors of a completed transfer - * \param lut pointer to an libusb_transfer - */ -void usb_endpoint::print_transfer_status(libusb_transfer *lut){ - std::cout << "here " << lut->status << std::endl; - switch (lut->status) { - case LIBUSB_TRANSFER_COMPLETED: - if (lut->actual_length < lut->length) { - std::cerr << "USB: transfer completed with short write," - << " length = " << lut->length - << " actual = " << lut->actual_length << std::endl; - } - - if ((lut->actual_length < 0) || (lut->length < 0)) { - std::cerr << "USB: transfer completed with invalid response" - << std::endl; - } - break; - case LIBUSB_TRANSFER_CANCELLED: - break; - case LIBUSB_TRANSFER_NO_DEVICE: - std::cerr << "USB: device was disconnected" << std::endl; - break; - case LIBUSB_TRANSFER_OVERFLOW: - std::cerr << "USB: device sent more data than requested" << std::endl; - break; - case LIBUSB_TRANSFER_TIMED_OUT: - std::cerr << "USB: transfer timed out" << std::endl; - break; - case LIBUSB_TRANSFER_STALL: - std::cerr << "USB: halt condition detected (stalled)" << std::endl; - break; - case LIBUSB_TRANSFER_ERROR: - std::cerr << "USB: transfer failed" << std::endl; - break; - default: - std::cerr << "USB: received unknown transfer status" << std::endl; - } -} + libusb_transfer *_lut; + bool _expired; +}; -libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - libusb_transfer *lut = NULL; - if (_completed_list.pop_with_timed_wait(lut, timeout)) return lut; - return NULL; +//! helper function: handles all async callbacks +static void libusb_async_cb(libusb_transfer *lut){ + (*static_cast<boost::function<void()> *>(lut->user_data))(); } /*********************************************************************** @@ -286,16 +116,107 @@ public: size_t recv_endpoint, size_t send_endpoint, const device_addr_t &hints - ); + ): + _handle(handle), + _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), + _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), + _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), + _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))), + _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), + _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), + _pending_recv_buffs(_num_recv_frames), + _pending_send_buffs(_num_send_frames) + { + _handle->claim_interface(2 /*in interface*/); + _handle->claim_interface(1 /*out interface*/); + + //allocate libusb transfer structs and managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++){ + + libusb_transfer *lut = libusb_alloc_transfer(0); + UHD_ASSERT_THROW(lut != NULL); + + _mrb_pool.push_back(libusb_zero_copy_mrb(lut)); + _callbacks.push_back(boost::bind( + &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back() + )); + + libusb_fill_bulk_transfer( + lut, // transfer + _handle->get(), // dev_handle + (recv_endpoint & 0x7f) | 0x80, // endpoint + static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer + this->get_recv_frame_size(), // length + static_cast<libusb_transfer_cb_fn>(&libusb_async_cb), // callback + static_cast<void *>(&_callbacks.back()), // user_data + 0 // timeout + ); + + _all_luts.push_back(lut); + _mrb_pool.back().get_new(); + } + + //allocate libusb transfer structs and managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++){ + + libusb_transfer *lut = libusb_alloc_transfer(0); + UHD_ASSERT_THROW(lut != NULL); + + _msb_pool.push_back(libusb_zero_copy_msb(lut)); + _callbacks.push_back(boost::bind( + &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back() + )); + + libusb_fill_bulk_transfer( + lut, // transfer + _handle->get(), // dev_handle + (send_endpoint & 0x7f) | 0x00, // endpoint + static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer + this->get_send_frame_size(), // length + static_cast<libusb_transfer_cb_fn>(&libusb_async_cb), // callback + static_cast<void *>(&_callbacks.back()), // user_data + 0 // timeout + ); + + _all_luts.push_back(lut); + libusb_async_cb(lut); + } + + //spawn the event handler threads + size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); + for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( + boost::bind(&libusb_zero_copy_impl::run_event_loop, this) + ); + } ~libusb_zero_copy_impl(void){ + //shutdown the threads _threads_running = false; _thread_group.interrupt_all(); _thread_group.join_all(); + + //cancel and free all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_cancel_transfer(lut); + libusb_free_transfer(lut); + } } - managed_recv_buffer::sptr get_recv_buff(double); - managed_send_buffer::sptr get_send_buff(double); + managed_recv_buffer::sptr get_recv_buff(double timeout){ + libusb_zero_copy_mrb *mrb = NULL; + if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ + return mrb->get_new(); + } + return managed_recv_buffer::sptr(); + } + + managed_send_buffer::sptr get_send_buff(double timeout){ + libusb_zero_copy_msb *msb = NULL; + if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ + return msb->get_new(); + } + return managed_send_buffer::sptr(); + } size_t get_num_recv_frames(void) const { return _num_recv_frames; } size_t get_num_send_frames(void) const { return _num_send_frames; } @@ -304,125 +225,50 @@ public: size_t get_send_frame_size(void) const { return _send_frame_size; } private: - void release(libusb_transfer *lut){ - _recv_ep->submit(lut); + //! Handle a bound async callback for recv + void handle_recv(libusb_zero_copy_mrb *mrb){ + _pending_recv_buffs.push_with_haste(mrb); } - void commit(libusb_transfer *lut, size_t num_bytes){ - lut->length = num_bytes; - try{ - _send_ep->submit(lut); - } - catch(const std::exception &e){ - std::cerr << "Error in commit: " << e.what() << std::endl; - } + //! Handle a bound async callback for send + void handle_send(libusb_zero_copy_msb *msb){ + _pending_send_buffs.push_with_haste(msb); } libusb::device_handle::sptr _handle; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; - usb_endpoint::sptr _recv_ep, _send_ep; - //event handler threads + //! Storage for transfer related objects + buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; + bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs; + bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs; + std::list<libusb_zero_copy_mrb> _mrb_pool; + std::list<libusb_zero_copy_msb> _msb_pool; + std::list<boost::function<void()> > _callbacks; + + //! a list of all transfer structs we allocated + std::list<libusb_transfer *> _all_luts; + + //! event handler threads boost::thread_group _thread_group; bool _threads_running; void run_event_loop(void){ set_thread_priority_safe(); - libusb::session::sptr session = libusb::session::get_global_session(); + libusb_context *context = libusb::session::get_global_session()->get_context(); _threads_running = true; try{ while(_threads_running){ timeval tv; tv.tv_sec = 0; tv.tv_usec = 100000; //100ms - libusb_handle_events_timeout(session->get_context(), &tv); + libusb_handle_events_timeout(context, &tv); } } catch(const boost::thread_interrupted &){} } -}; - -/* - * Constructor - * Initializes libusb, opens devices, and sets up interfaces for I/O. - * Finally, creates endpoints for asynchronous I/O. - */ -libusb_zero_copy_impl::libusb_zero_copy_impl( - libusb::device_handle::sptr handle, - size_t recv_endpoint, - size_t send_endpoint, - const device_addr_t &hints -): - _handle(handle), - _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), - _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), - _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), - _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))) -{ - _handle->claim_interface(2 /*in interface*/); - _handle->claim_interface(1 /*out interface*/); - - _recv_ep = usb_endpoint::sptr(new usb_endpoint( - _handle, // libusb device_handle - recv_endpoint, // USB endpoint number - true, // IN endpoint - this->get_recv_frame_size(), // buffer size per transfer - this->get_num_recv_frames() // number of libusb transfers - )); - - _send_ep = usb_endpoint::sptr(new usb_endpoint( - _handle, // libusb device_handle - send_endpoint, // USB endpoint number - false, // OUT endpoint - this->get_send_frame_size(), // buffer size per transfer - this->get_num_send_frames() // number of libusb transfers - )); - - //spawn the event handler threads - size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); - for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( - boost::bind(&libusb_zero_copy_impl::run_event_loop, this) - ); -} - -/* - * Construct a managed receive buffer from a completed libusb transfer - * (happy with buffer full of data) obtained from the receive endpoint. - * Return empty pointer if no transfer is available (timeout or error). - * \return pointer to a managed receive buffer - */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ - libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout); - if (lut == NULL) { - return managed_recv_buffer::sptr(); - } - else { - return managed_recv_buffer::make_safe( - lut->buffer, lut->actual_length, - boost::bind(&libusb_zero_copy_impl::release, this, lut) - ); - } -} - -/* - * Construct a managed send buffer from a free libusb transfer (with - * empty buffer). Return empty pointer of no transfer is available - * (timeout or error). - * \return pointer to a managed send buffer - */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ - libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout); - if (lut == NULL) { - return managed_send_buffer::sptr(); - } - else { - return managed_send_buffer::make_safe( - lut->buffer, this->get_send_frame_size(), - boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1) - ); - } -} +}; /*********************************************************************** * USB zero_copy make functions diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 2794d383c..05352ffce 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -24,7 +24,7 @@ #include <boost/asio.hpp> #include <boost/format.hpp> #include <iostream> -#include <vector> +#include <list> using namespace uhd; using namespace uhd::transport; @@ -40,20 +40,18 @@ static const size_t DEFAULT_NUM_FRAMES = 32; **********************************************************************/ class udp_zero_copy_asio_mrb : public managed_recv_buffer{ public: - typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr; typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type; udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): - _mem(mem), _release_cb(release_cb){/* NOP */} + _mem(mem), _len(0), _release_cb(release_cb){/* NOP */} void release(void){ - if (_expired) return; + if (_len == 0) return; this->_release_cb(this); - _expired = true; + _len = 0; } sptr get_new(size_t len){ - _expired = false; _len = len; return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); } @@ -68,7 +66,6 @@ private: const void *get_buff(void) const{return _mem;} size_t get_size(void) const{return _len;} - bool _expired; void *_mem; size_t _len; release_cb_type _release_cb; @@ -81,20 +78,18 @@ private: **********************************************************************/ class udp_zero_copy_asio_msb : public managed_send_buffer{ public: - typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr; typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type; udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): - _mem(mem), _commit_cb(commit_cb){/* NOP */} + _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */} void commit(size_t len){ - if (_expired) return; + if (_len == 0) return; this->_commit_cb(this, len); - _expired = true; + _len = 0; } sptr get_new(size_t len){ - _expired = false; _len = len; return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); } @@ -107,7 +102,6 @@ private: void *get_buff(void) const{return _mem;} size_t get_size(void) const{return _len;} - bool _expired; void *_mem; size_t _len; commit_cb_type _commit_cb; @@ -135,7 +129,8 @@ public: _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames) + _pending_recv_buffs(_num_recv_frames), + _pending_send_buffs(_num_send_frames) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -152,20 +147,18 @@ public: //allocate re-usable managed receive buffers for (size_t i = 0; i < get_num_recv_frames(); i++){ - _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( - new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), + _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) - )); - handle_recv(_mrb_pool.back().get()); + ); + handle_recv(&_mrb_pool.back()); } //allocate re-usable managed send buffers for (size_t i = 0; i < get_num_send_frames(); i++){ - _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( - new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), + _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i), boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) - )); - handle_send(_msb_pool.back().get()); + ); + handle_send(&_msb_pool.back()); } } @@ -264,8 +257,8 @@ private: buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; - std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; - std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool; + std::list<udp_zero_copy_asio_msb> _msb_pool; + std::list<udp_zero_copy_asio_mrb> _mrb_pool; //asio guts -> socket and service asio::io_service _io_service; |