aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/transport/zero_copy.hpp4
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp492
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp41
3 files changed, 188 insertions, 349 deletions
diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp
index d5a536b27..18eb3fb6d 100644
--- a/host/include/uhd/transport/zero_copy.hpp
+++ b/host/include/uhd/transport/zero_copy.hpp
@@ -30,7 +30,7 @@ namespace uhd{ namespace transport{
* Contains a reference to transport-managed memory,
* and a method to release the memory after reading.
*/
- class UHD_API managed_recv_buffer : boost::noncopyable{
+ class UHD_API managed_recv_buffer{
public:
typedef boost::shared_ptr<managed_recv_buffer> sptr;
typedef boost::function<void(void)> release_fcn_t;
@@ -81,7 +81,7 @@ namespace uhd{ namespace transport{
* Contains a reference to transport-managed memory,
* and a method to commit the memory after writing.
*/
- class UHD_API managed_send_buffer : boost::noncopyable{
+ class UHD_API managed_send_buffer{
public:
typedef boost::shared_ptr<managed_send_buffer> sptr;
typedef boost::function<void(size_t)> commit_fcn_t;
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 6fab5ae6f..60f290fcf 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -21,258 +21,88 @@
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
+#include <boost/function.hpp>
#include <boost/foreach.hpp>
-#include <boost/thread.hpp>
-#include <vector>
+#include <boost/thread/thread.hpp>
+#include <list>
#include <iostream>
using namespace uhd;
using namespace uhd::transport;
-static const double CLEANUP_TIMEOUT = 0.2; //seconds
static const size_t DEFAULT_NUM_XFERS = 16; //num xfers
static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
/***********************************************************************
- * Helper functions
- ***********************************************************************/
-/*
- * Print the values of a libusb_transfer struct
- * http://libusb.sourceforge.net/api-1.0/structlibusb__transfer.html
- */
-void pp_transfer(libusb_transfer *lut)
-{
- std::cout << "Libusb transfer" << std::endl;
- std::cout << " flags: 0x" << std::hex << (unsigned int) lut->flags << std::endl;
- std::cout << " endpoint: 0x" << std::hex << (unsigned int) lut->endpoint << std::endl;
- std::cout << " type: 0x" << std::hex << (unsigned int) lut->type << std::endl;
- std::cout << " timeout: " << std::dec << lut->timeout << std::endl;
- std::cout << " status: 0x" << std::hex << lut->status << std::endl;
- std::cout << " length: " << std::dec << lut->length << std::endl;
- std::cout << " actual_length: " << std::dec << lut->actual_length << std::endl;
-}
-
-/***********************************************************************
- * USB asynchronous zero_copy endpoint
- * This endpoint implementation provides asynchronous I/O to libusb-1.0
- * devices. Each endpoint is directional and two can be combined to
- * create a bidirectional interface. It is a zero copy implementation
- * with respect to libusb, however, each send and recv requires a copy
- * operation from kernel to userspace; this is due to the usbfs
- * interface provided by the kernel.
+ * Reusable managed receiver buffer:
+ * - Associated with a particular libusb transfer struct.
+ * - Submits the transfer to libusb in the release method.
**********************************************************************/
-class usb_endpoint {
+class libusb_zero_copy_mrb : public managed_recv_buffer{
public:
- typedef boost::shared_ptr<usb_endpoint> sptr;
-
- usb_endpoint(
- libusb::device_handle::sptr handle,
- int endpoint,
- bool input,
- size_t transfer_size,
- size_t num_transfers
- );
-
- ~usb_endpoint(void);
+ libusb_zero_copy_mrb(libusb_transfer *lut):
+ _lut(lut), _expired(true) { /* NOP */ }
- // Exposed interface for submitting / retrieving transfer buffers
-
- //! Submit a new transfer that was presumably just filled or emptied.
- void submit(libusb_transfer *lut);
-
- /*!
- * Get an available transfer:
- * For inputs, this is a just filled transfer.
- * For outputs, this is a just emptied transfer.
- * \param timeout the timeout to wait for a lut
- * \return the transfer pointer or NULL if timeout
- */
- libusb_transfer *get_lut_with_wait(double timeout);
+ void release(void){
+ if (_expired) return;
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ _expired = true;
+ }
- //Callback use only
- void callback_handle_transfer(libusb_transfer *lut);
+ sptr get_new(void){
+ _expired = false;
+ return sptr(this, &libusb_zero_copy_mrb::fake_deleter);
+ }
private:
- libusb::device_handle::sptr _handle;
- int _endpoint;
- bool _input;
-
- //! hold a bounded buffer of completed transfers
- bounded_buffer<libusb_transfer *> _completed_list;
-
- //! a list of all transfer structs we allocated
- std::vector<libusb_transfer *> _all_luts;
+ static void fake_deleter(void *obj){
+ static_cast<libusb_zero_copy_mrb *>(obj)->release();
+ }
- //! memory allocated for the transfer buffers
- buffer_pool::sptr _buffer_pool;
+ const void *get_buff(void) const{return _lut->buffer;}
+ size_t get_size(void) const{return _lut->actual_length;}
- // Calls for processing asynchronous I/O
- libusb_transfer *allocate_transfer(void *mem, size_t len);
- void print_transfer_status(libusb_transfer *lut);
+ libusb_transfer *_lut;
+ bool _expired;
};
-
-/*
- * Callback function called when submitted transfers complete.
- * The endpoint upon which the transfer is part of is recovered
- * and the transfer moved from pending to completed state.
- * Callbacks occur during the reaping calls where libusb_handle_events()
- * is used. The callback only modifies the transfer state by moving
- * it from the pending to completed status list.
- * \param lut pointer to libusb_transfer
- */
-static void callback(libusb_transfer *lut){
- usb_endpoint *endpoint = (usb_endpoint *) lut->user_data;
- endpoint->callback_handle_transfer(lut);
-}
-
-
-/*
- * Accessor call to allow list access from callback space
- * \param pointer to libusb_transfer
- */
-void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){
- _completed_list.push_with_haste(lut);
-}
-
-
-/*
- * Constructor
- * Allocate libusb transfers and mark as free. For IN endpoints,
- * submit the transfers so that they're ready to return when
- * data is available.
- */
-usb_endpoint::usb_endpoint(
- libusb::device_handle::sptr handle,
- int endpoint,
- bool input,
- size_t transfer_size,
- size_t num_transfers
-):
- _handle(handle),
- _endpoint(endpoint),
- _input(input),
- _completed_list(num_transfers)
-{
- _buffer_pool = buffer_pool::make(num_transfers, transfer_size);
- for (size_t i = 0; i < num_transfers; i++){
- _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size));
-
- //input luts are immediately submitted to be filled
- //output luts go into the completed list as free buffers
- if (_input) this->submit(_all_luts.back());
- else _completed_list.push_with_haste(_all_luts.back());
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - Associated with a particular libusb transfer struct.
+ * - Submits the transfer to libusb in the commit method.
+ **********************************************************************/
+class libusb_zero_copy_msb : public managed_send_buffer{
+public:
+ libusb_zero_copy_msb(libusb_transfer *lut):
+ _lut(lut), _expired(true) { /* NOP */ }
+
+ void commit(size_t len){
+ if (_expired) return;
+ _lut->length = len;
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ _expired = true;
}
-}
-
-/*
- * Destructor
- * Make sure all the memory is freed. Cancel any pending transfers.
- * When all completed transfers are moved to the free list, release
- * the transfers. Libusb will deallocate the data buffer held by
- * each transfer.
- */
-usb_endpoint::~usb_endpoint(void){
- //cancel all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
- libusb_cancel_transfer(lut);
+ sptr get_new(void){
+ _expired = false;
+ return sptr(this, &libusb_zero_copy_msb::fake_deleter);
}
- //collect canceled transfers (drain the queue)
- while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){};
-
- //free all transfers
- BOOST_FOREACH(libusb_transfer *lut, _all_luts){
- libusb_free_transfer(lut);
+private:
+ static void fake_deleter(void *obj){
+ static_cast<libusb_zero_copy_msb *>(obj)->commit(0);
}
-}
-
-
-/*
- * Allocate a libusb transfer
- * The allocated transfer - and buffer it contains - is repeatedly
- * submitted, reaped, and reused and should not be freed until shutdown.
- * \param mem a pointer to the buffer memory
- * \param len size of the individual buffer
- * \return pointer to an allocated libusb_transfer
- */
-libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){
- libusb_transfer *lut = libusb_alloc_transfer(0);
- UHD_ASSERT_THROW(lut != NULL);
-
- unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0));
- unsigned char *buff = reinterpret_cast<unsigned char *>(mem);
- libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);
-
- libusb_fill_bulk_transfer(lut, // transfer
- _handle->get(), // dev_handle
- endpoint, // endpoint
- buff, // buffer
- len, // length
- lut_callback, // callback
- this, // user_data
- 0); // timeout
- return lut;
-}
+ void *get_buff(void) const{return _lut->buffer;}
+ size_t get_size(void) const{return _lut->length;}
-/*
- * Asynchonous transfer submission
- * Submit a libusb transfer to libusb add pending status
- * \param lut pointer to libusb_transfer
- * \return true on success or false on error
- */
-void usb_endpoint::submit(libusb_transfer *lut){
- UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0);
-}
-
-/*
- * Print status errors of a completed transfer
- * \param lut pointer to an libusb_transfer
- */
-void usb_endpoint::print_transfer_status(libusb_transfer *lut){
- std::cout << "here " << lut->status << std::endl;
- switch (lut->status) {
- case LIBUSB_TRANSFER_COMPLETED:
- if (lut->actual_length < lut->length) {
- std::cerr << "USB: transfer completed with short write,"
- << " length = " << lut->length
- << " actual = " << lut->actual_length << std::endl;
- }
-
- if ((lut->actual_length < 0) || (lut->length < 0)) {
- std::cerr << "USB: transfer completed with invalid response"
- << std::endl;
- }
- break;
- case LIBUSB_TRANSFER_CANCELLED:
- break;
- case LIBUSB_TRANSFER_NO_DEVICE:
- std::cerr << "USB: device was disconnected" << std::endl;
- break;
- case LIBUSB_TRANSFER_OVERFLOW:
- std::cerr << "USB: device sent more data than requested" << std::endl;
- break;
- case LIBUSB_TRANSFER_TIMED_OUT:
- std::cerr << "USB: transfer timed out" << std::endl;
- break;
- case LIBUSB_TRANSFER_STALL:
- std::cerr << "USB: halt condition detected (stalled)" << std::endl;
- break;
- case LIBUSB_TRANSFER_ERROR:
- std::cerr << "USB: transfer failed" << std::endl;
- break;
- default:
- std::cerr << "USB: received unknown transfer status" << std::endl;
- }
-}
+ libusb_transfer *_lut;
+ bool _expired;
+};
-libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- libusb_transfer *lut = NULL;
- if (_completed_list.pop_with_timed_wait(lut, timeout)) return lut;
- return NULL;
+//! helper function: handles all async callbacks
+static void libusb_async_cb(libusb_transfer *lut){
+ (*static_cast<boost::function<void()> *>(lut->user_data))();
}
/***********************************************************************
@@ -286,16 +116,107 @@ public:
size_t recv_endpoint,
size_t send_endpoint,
const device_addr_t &hints
- );
+ ):
+ _handle(handle),
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _pending_recv_buffs(_num_recv_frames),
+ _pending_send_buffs(_num_send_frames)
+ {
+ _handle->claim_interface(2 /*in interface*/);
+ _handle->claim_interface(1 /*out interface*/);
+
+ //allocate libusb transfer structs and managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+
+ libusb_transfer *lut = libusb_alloc_transfer(0);
+ UHD_ASSERT_THROW(lut != NULL);
+
+ _mrb_pool.push_back(libusb_zero_copy_mrb(lut));
+ _callbacks.push_back(boost::bind(
+ &libusb_zero_copy_impl::handle_recv, this, &_mrb_pool.back()
+ ));
+
+ libusb_fill_bulk_transfer(
+ lut, // transfer
+ _handle->get(), // dev_handle
+ (recv_endpoint & 0x7f) | 0x80, // endpoint
+ static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer
+ this->get_recv_frame_size(), // length
+ static_cast<libusb_transfer_cb_fn>(&libusb_async_cb), // callback
+ static_cast<void *>(&_callbacks.back()), // user_data
+ 0 // timeout
+ );
+
+ _all_luts.push_back(lut);
+ _mrb_pool.back().get_new();
+ }
+
+ //allocate libusb transfer structs and managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+
+ libusb_transfer *lut = libusb_alloc_transfer(0);
+ UHD_ASSERT_THROW(lut != NULL);
+
+ _msb_pool.push_back(libusb_zero_copy_msb(lut));
+ _callbacks.push_back(boost::bind(
+ &libusb_zero_copy_impl::handle_send, this, &_msb_pool.back()
+ ));
+
+ libusb_fill_bulk_transfer(
+ lut, // transfer
+ _handle->get(), // dev_handle
+ (send_endpoint & 0x7f) | 0x00, // endpoint
+ static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer
+ this->get_send_frame_size(), // length
+ static_cast<libusb_transfer_cb_fn>(&libusb_async_cb), // callback
+ static_cast<void *>(&_callbacks.back()), // user_data
+ 0 // timeout
+ );
+
+ _all_luts.push_back(lut);
+ libusb_async_cb(lut);
+ }
+
+ //spawn the event handler threads
+ size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
+ for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
+ boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
+ );
+ }
~libusb_zero_copy_impl(void){
+ //shutdown the threads
_threads_running = false;
_thread_group.interrupt_all();
_thread_group.join_all();
+
+ //cancel and free all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_cancel_transfer(lut);
+ libusb_free_transfer(lut);
+ }
}
- managed_recv_buffer::sptr get_recv_buff(double);
- managed_send_buffer::sptr get_send_buff(double);
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ libusb_zero_copy_mrb *mrb = NULL;
+ if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
+ return mrb->get_new();
+ }
+ return managed_recv_buffer::sptr();
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ libusb_zero_copy_msb *msb = NULL;
+ if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
+ return msb->get_new();
+ }
+ return managed_send_buffer::sptr();
+ }
size_t get_num_recv_frames(void) const { return _num_recv_frames; }
size_t get_num_send_frames(void) const { return _num_send_frames; }
@@ -304,125 +225,50 @@ public:
size_t get_send_frame_size(void) const { return _send_frame_size; }
private:
- void release(libusb_transfer *lut){
- _recv_ep->submit(lut);
+ //! Handle a bound async callback for recv
+ void handle_recv(libusb_zero_copy_mrb *mrb){
+ _pending_recv_buffs.push_with_haste(mrb);
}
- void commit(libusb_transfer *lut, size_t num_bytes){
- lut->length = num_bytes;
- try{
- _send_ep->submit(lut);
- }
- catch(const std::exception &e){
- std::cerr << "Error in commit: " << e.what() << std::endl;
- }
+ //! Handle a bound async callback for send
+ void handle_send(libusb_zero_copy_msb *msb){
+ _pending_send_buffs.push_with_haste(msb);
}
libusb::device_handle::sptr _handle;
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
- usb_endpoint::sptr _recv_ep, _send_ep;
- //event handler threads
+ //! Storage for transfer related objects
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ bounded_buffer<libusb_zero_copy_mrb *> _pending_recv_buffs;
+ bounded_buffer<libusb_zero_copy_msb *> _pending_send_buffs;
+ std::list<libusb_zero_copy_mrb> _mrb_pool;
+ std::list<libusb_zero_copy_msb> _msb_pool;
+ std::list<boost::function<void()> > _callbacks;
+
+ //! a list of all transfer structs we allocated
+ std::list<libusb_transfer *> _all_luts;
+
+ //! event handler threads
boost::thread_group _thread_group;
bool _threads_running;
void run_event_loop(void){
set_thread_priority_safe();
- libusb::session::sptr session = libusb::session::get_global_session();
+ libusb_context *context = libusb::session::get_global_session()->get_context();
_threads_running = true;
try{
while(_threads_running){
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100000; //100ms
- libusb_handle_events_timeout(session->get_context(), &tv);
+ libusb_handle_events_timeout(context, &tv);
}
} catch(const boost::thread_interrupted &){}
}
-};
-
-/*
- * Constructor
- * Initializes libusb, opens devices, and sets up interfaces for I/O.
- * Finally, creates endpoints for asynchronous I/O.
- */
-libusb_zero_copy_impl::libusb_zero_copy_impl(
- libusb::device_handle::sptr handle,
- size_t recv_endpoint,
- size_t send_endpoint,
- const device_addr_t &hints
-):
- _handle(handle),
- _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
- _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS)))
-{
- _handle->claim_interface(2 /*in interface*/);
- _handle->claim_interface(1 /*out interface*/);
-
- _recv_ep = usb_endpoint::sptr(new usb_endpoint(
- _handle, // libusb device_handle
- recv_endpoint, // USB endpoint number
- true, // IN endpoint
- this->get_recv_frame_size(), // buffer size per transfer
- this->get_num_recv_frames() // number of libusb transfers
- ));
-
- _send_ep = usb_endpoint::sptr(new usb_endpoint(
- _handle, // libusb device_handle
- send_endpoint, // USB endpoint number
- false, // OUT endpoint
- this->get_send_frame_size(), // buffer size per transfer
- this->get_num_send_frames() // number of libusb transfers
- ));
-
- //spawn the event handler threads
- size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
- for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
- boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
- );
-}
-
-/*
- * Construct a managed receive buffer from a completed libusb transfer
- * (happy with buffer full of data) obtained from the receive endpoint.
- * Return empty pointer if no transfer is available (timeout or error).
- * \return pointer to a managed receive buffer
- */
-managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){
- libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout);
- if (lut == NULL) {
- return managed_recv_buffer::sptr();
- }
- else {
- return managed_recv_buffer::make_safe(
- lut->buffer, lut->actual_length,
- boost::bind(&libusb_zero_copy_impl::release, this, lut)
- );
- }
-}
-
-/*
- * Construct a managed send buffer from a free libusb transfer (with
- * empty buffer). Return empty pointer of no transfer is available
- * (timeout or error).
- * \return pointer to a managed send buffer
- */
-managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){
- libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout);
- if (lut == NULL) {
- return managed_send_buffer::sptr();
- }
- else {
- return managed_send_buffer::make_safe(
- lut->buffer, this->get_send_frame_size(),
- boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1)
- );
- }
-}
+};
/***********************************************************************
* USB zero_copy make functions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 2794d383c..05352ffce 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -24,7 +24,7 @@
#include <boost/asio.hpp>
#include <boost/format.hpp>
#include <iostream>
-#include <vector>
+#include <list>
using namespace uhd;
using namespace uhd::transport;
@@ -40,20 +40,18 @@ static const size_t DEFAULT_NUM_FRAMES = 32;
**********************************************************************/
class udp_zero_copy_asio_mrb : public managed_recv_buffer{
public:
- typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr;
typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type;
udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb):
- _mem(mem), _release_cb(release_cb){/* NOP */}
+ _mem(mem), _len(0), _release_cb(release_cb){/* NOP */}
void release(void){
- if (_expired) return;
+ if (_len == 0) return;
this->_release_cb(this);
- _expired = true;
+ _len = 0;
}
sptr get_new(size_t len){
- _expired = false;
_len = len;
return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter);
}
@@ -68,7 +66,6 @@ private:
const void *get_buff(void) const{return _mem;}
size_t get_size(void) const{return _len;}
- bool _expired;
void *_mem;
size_t _len;
release_cb_type _release_cb;
@@ -81,20 +78,18 @@ private:
**********************************************************************/
class udp_zero_copy_asio_msb : public managed_send_buffer{
public:
- typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr;
typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type;
udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb):
- _mem(mem), _commit_cb(commit_cb){/* NOP */}
+ _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */}
void commit(size_t len){
- if (_expired) return;
+ if (_len == 0) return;
this->_commit_cb(this, len);
- _expired = true;
+ _len = 0;
}
sptr get_new(size_t len){
- _expired = false;
_len = len;
return sptr(this, &udp_zero_copy_asio_msb::fake_deleter);
}
@@ -107,7 +102,6 @@ private:
void *get_buff(void) const{return _mem;}
size_t get_size(void) const{return _len;}
- bool _expired;
void *_mem;
size_t _len;
commit_cb_type _commit_cb;
@@ -135,7 +129,8 @@ public:
_num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames)
+ _pending_recv_buffs(_num_recv_frames),
+ _pending_send_buffs(_num_send_frames)
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -152,20 +147,18 @@ public:
//allocate re-usable managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
- _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr(
- new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),
+ _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),
boost::bind(&udp_zero_copy_asio_impl::release, this, _1))
- ));
- handle_recv(_mrb_pool.back().get());
+ );
+ handle_recv(&_mrb_pool.back());
}
//allocate re-usable managed send buffers
for (size_t i = 0; i < get_num_send_frames(); i++){
- _msb_pool.push_back(udp_zero_copy_asio_msb::sptr(
- new udp_zero_copy_asio_msb(_send_buffer_pool->at(i),
+ _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i),
boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2))
- ));
- handle_send(_msb_pool.back().get());
+ );
+ handle_send(&_msb_pool.back());
}
}
@@ -264,8 +257,8 @@ private:
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;
bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs;
- std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool;
- std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool;
+ std::list<udp_zero_copy_asio_msb> _msb_pool;
+ std::list<udp_zero_copy_asio_mrb> _mrb_pool;
//asio guts -> socket and service
asio::io_service _io_service;