summaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2011-02-09 16:39:33 -0800
committerJosh Blum <josh@joshknows.com>2011-02-09 16:39:33 -0800
commit55884d630ca675c9f006eb0705bc8670119c4737 (patch)
tree8b9b90344b67652ed046df2ea14ccd34786e74d5 /host/lib
parentaae5cc481ba7bb75c8d97d8f425bc42884e28538 (diff)
downloaduhd-55884d630ca675c9f006eb0705bc8670119c4737.tar.gz
uhd-55884d630ca675c9f006eb0705bc8670119c4737.tar.bz2
uhd-55884d630ca675c9f006eb0705bc8670119c4737.zip
udp: removed asio implementation, created custom managed buffer classes to re-use
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp270
1 files changed, 115 insertions, 155 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 90cd2a9ce..33c7dd94e 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -19,38 +19,99 @@
#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
-#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
#include <uhd/utils/warning.hpp>
#include <boost/asio.hpp>
#include <boost/format.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/enable_shared_from_this.hpp>
#include <iostream>
+#include <vector>
using namespace uhd;
using namespace uhd::transport;
namespace asio = boost::asio;
-//Define this to the the boost async io calls to perform receive.
-//Otherwise, get_recv_buff uses a blocking receive with timeout.
-//#define USE_ASIO_ASYNC_RECV
+//A reasonable number of frames for send/recv and async/sync
+static const size_t DEFAULT_NUM_FRAMES = 32;
-//Define this to the the boost async io calls to perform send.
-//Otherwise, the commit callback uses a blocking send.
-//#define USE_ASIO_ASYNC_SEND
+/***********************************************************************
+ * Reusable managed receiver buffer:
+ * - Initialize with memory and a release callback.
+ * - Call get new with a length in bytes to re-use.
+ **********************************************************************/
+class udp_zero_copy_asio_mrb : public managed_recv_buffer{
+public:
+ typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr;
+ typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type;
-//The number of service threads to spawn for async ASIO:
-//A single concurrent thread for io_service seems to be the fastest.
-//Threads are disabled when no async implementations are enabled.
-#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND)
-static const size_t CONCURRENCY_HINT = 1;
-#else
-static const size_t CONCURRENCY_HINT = 0;
-#endif
+ udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb):
+ _mem(mem), _release_cb(release_cb){/* NOP */}
-//A reasonable number of frames for send/recv and async/sync
-static const size_t DEFAULT_NUM_FRAMES = 32;
+ void release(void){
+ if (_expired) return;
+ this->_release_cb(this);
+ _expired = true;
+ }
+
+ sptr get_new(size_t len){
+ _expired = false;
+ _len = len;
+ return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter);
+ }
+
+ void *get(void) const{return _mem;}
+
+private:
+ static void fake_deleter(void *obj){
+ static_cast<udp_zero_copy_asio_mrb *>(obj)->release();
+ }
+
+ const void *get_buff(void) const{return _mem;}
+ size_t get_size(void) const{return _len;}
+
+ bool _expired;
+ void *_mem;
+ size_t _len;
+ release_cb_type _release_cb;
+};
+
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - Initialize with memory and a commit callback.
+ * - Call get new with a length in bytes to re-use.
+ **********************************************************************/
+class udp_zero_copy_asio_msb : public managed_send_buffer{
+public:
+ typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr;
+ typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type;
+
+ udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb):
+ _mem(mem), _commit_cb(commit_cb){/* NOP */}
+
+ void commit(size_t len){
+ if (_expired) return;
+ this->_commit_cb(this, len);
+ _expired = true;
+ }
+
+ sptr get_new(size_t len){
+ _expired = false;
+ _len = len;
+ return sptr(this, &udp_zero_copy_asio_msb::fake_deleter);
+ }
+
+private:
+ static void fake_deleter(void *obj){
+ static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0);
+ }
+
+ void *get_buff(void) const{return _mem;}
+ size_t get_size(void) const{return _len;}
+
+ bool _expired;
+ void *_mem;
+ size_t _len;
+ commit_cb_type _commit_cb;
+};
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
@@ -59,7 +120,7 @@ static const size_t DEFAULT_NUM_FRAMES = 32;
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
-class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> {
+class udp_zero_copy_asio_impl : public udp_zero_copy{
public:
typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
@@ -74,9 +135,7 @@ public:
_num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames),
- _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
- _io_service(_concurrency_hint)
+ _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames)
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -90,35 +149,28 @@ public:
_socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
_sock_fd = _socket->native();
- }
-
- ~udp_zero_copy_asio_impl(void){
- delete _work; //allow io_service run to complete
- _thread_group.join_all(); //wait for service threads to exit
- delete _socket;
- }
- void init(void){
- //release recv frames for use
+ //allocate re-usable managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
- release(_recv_buffer_pool->at(i));
+ _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr(
+ new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),
+ boost::bind(&udp_zero_copy_asio_impl::release, this, _1))
+ ));
+ handle_recv(_mrb_pool.back().get());
}
- //push send frames into the fifo
+ //allocate re-usable managed send buffers
for (size_t i = 0; i < get_num_send_frames(); i++){
- handle_send(_send_buffer_pool->at(i));
+ _msb_pool.push_back(udp_zero_copy_asio_msb::sptr(
+ new udp_zero_copy_asio_msb(_send_buffer_pool->at(i),
+ boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2))
+ ));
+ handle_send(_msb_pool.back().get());
}
-
- //spawn the service threads that will run the io service
- _work = new asio::io_service::work(_io_service); //new work to delete later
- for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread(
- boost::bind(&udp_zero_copy_asio_impl::service, this)
- );
}
- void service(void){
- set_thread_priority_safe();
- _io_service.run();
+ ~udp_zero_copy_asio_impl(void){
+ delete _socket;
}
//get size for internal socket buffer
@@ -135,47 +187,12 @@ public:
return get_buff_size<Opt>();
}
- //! handle a recv callback -> push the filled memory into the fifo
- UHD_INLINE void handle_recv(void *mem, size_t len){
- _pending_recv_buffs.push_with_pop_on_full(boost::asio::buffer(mem, len));
- }
-
- ////////////////////////////////////////////////////////////////////
- #ifdef USE_ASIO_ASYNC_RECV
- ////////////////////////////////////////////////////////////////////
- //! pop a filled recv buffer off of the fifo and bind with the release callback
- managed_recv_buffer::sptr get_recv_buff(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- asio::mutable_buffer buff;
- if (_pending_recv_buffs.pop_with_timed_wait(buff, timeout)){
- return managed_recv_buffer::make_safe(
- asio::buffer_cast<const void *>(buff),
- asio::buffer_size(buff), boost::bind(
- &udp_zero_copy_asio_impl::release, this,
- asio::buffer_cast<void*>(buff)
- )
- );
- }
- return managed_recv_buffer::sptr();
+ UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){
+ _pending_recv_buffs.push_with_pop_on_full(mrb);
}
- //! release a recv buffer -> start an async recv on the buffer
- void release(void *mem){
- _socket->async_receive(
- boost::asio::buffer(mem, this->get_recv_frame_size()),
- boost::bind(
- &udp_zero_copy_asio_impl::handle_recv,
- shared_from_this(), mem,
- asio::placeholders::bytes_transferred
- )
- );
- }
-
- ////////////////////////////////////////////////////////////////////
- #else /*USE_ASIO_ASYNC_RECV*/
- ////////////////////////////////////////////////////////////////////
managed_recv_buffer::sptr get_recv_buff(double timeout){
- asio::mutable_buffer buff;
+ udp_zero_copy_asio_mrb *mrb;
//setup timeval for timeout
timeval tv;
@@ -191,108 +208,53 @@ public:
//if the condition is true, call receive and return the managed buffer
if (
::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0
- and _pending_recv_buffs.pop_with_haste(buff)
+ and _pending_recv_buffs.pop_with_haste(mrb)
){
- return managed_recv_buffer::make_safe(
- boost::asio::buffer_cast<void *>(buff),
- _socket->receive(asio::buffer(buff)),
- boost::bind(
- &udp_zero_copy_asio_impl::release, this,
- asio::buffer_cast<void*>(buff)
- )
- );
+ return mrb->get_new(::recv(_sock_fd, mrb->get(), _recv_frame_size, 0));
}
return managed_recv_buffer::sptr();
}
- void release(void *mem){
- handle_recv(mem, this->get_recv_frame_size());
+ void release(udp_zero_copy_asio_mrb *mrb){
+ handle_recv(mrb);
}
- ////////////////////////////////////////////////////////////////////
- #endif /*USE_ASIO_ASYNC_RECV*/
- ////////////////////////////////////////////////////////////////////
-
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
- //! handle a send callback -> push the emptied memory into the fifo
- UHD_INLINE void handle_send(void *mem){
- _pending_send_buffs.push_with_pop_on_full(boost::asio::buffer(mem, this->get_send_frame_size()));
- }
-
- ////////////////////////////////////////////////////////////////////
- #ifdef USE_ASIO_ASYNC_SEND
- ////////////////////////////////////////////////////////////////////
- //! pop an empty send buffer off of the fifo and bind with the commit callback
- managed_send_buffer::sptr get_send_buff(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- asio::mutable_buffer buff;
- if (_pending_send_buffs.pop_with_timed_wait(buff, timeout)){
- return managed_send_buffer::make_safe(
- asio::buffer_cast<void *>(buff),
- asio::buffer_size(buff), boost::bind(
- &udp_zero_copy_asio_impl::commit, this,
- asio::buffer_cast<void*>(buff), _1
- )
- );
- }
- return managed_send_buffer::sptr();
+ UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){
+ _pending_send_buffs.push_with_pop_on_full(msb);
}
- //! commit a send buffer -> start an async send on the buffer
- void commit(void *mem, size_t len){
- _socket->async_send(
- boost::asio::buffer(mem, len),
- boost::bind(
- &udp_zero_copy_asio_impl::handle_send,
- shared_from_this(), mem
- )
- );
- }
-
- ////////////////////////////////////////////////////////////////////
- #else /*USE_ASIO_ASYNC_SEND*/
- ////////////////////////////////////////////////////////////////////
managed_send_buffer::sptr get_send_buff(double){
- asio::mutable_buffer buff;
- if (_pending_send_buffs.pop_with_haste(buff)){
- return managed_send_buffer::make_safe(
- asio::buffer_cast<void *>(buff),
- asio::buffer_size(buff), boost::bind(
- &udp_zero_copy_asio_impl::commit, this,
- asio::buffer_cast<void*>(buff), _1
- )
- );
+ udp_zero_copy_asio_msb *msb;
+ if (_pending_send_buffs.pop_with_haste(msb)){
+ return msb->get_new(_send_frame_size);
}
return managed_send_buffer::sptr();
}
- void commit(void *mem, size_t len){
- _socket->send(asio::buffer(mem, len));
- handle_send(mem);
+ void commit(udp_zero_copy_asio_msb *msb, size_t len){
+ ::send(_sock_fd, msb->cast<const void *>(), len, 0);
+ handle_send(msb);
}
- ////////////////////////////////////////////////////////////////////
- #endif /*USE_ASIO_ASYNC_SEND*/
- ////////////////////////////////////////////////////////////////////
-
size_t get_num_send_frames(void) const {return _num_send_frames;}
size_t get_send_frame_size(void) const {return _send_frame_size;}
private:
//memory management -> buffers and fifos
- boost::thread_group _thread_group;
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- bounded_buffer<asio::mutable_buffer> _pending_recv_buffs, _pending_send_buffs;
+ bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;
+ bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs;
+ std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool;
+ std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool;
//asio guts -> socket and service
- size_t _concurrency_hint;
asio::io_service _io_service;
asio::ip::udp::socket *_socket;
- asio::io_service::work *_work;
int _sock_fd;
};
@@ -345,7 +307,5 @@ udp_zero_copy::sptr udp_zero_copy::make(
resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
- udp_trans->init(); //buffers resized -> call init() to use
-
return udp_trans;
}