summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp140
1 files changed, 101 insertions, 39 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 7e28caf2d..798cc657d 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -35,6 +35,10 @@ namespace asio = boost::asio;
/***********************************************************************
* Constants
**********************************************************************/
+//Define this to the the boost async io calls to perform receive.
+//Otherwise, get_recv_buff uses a blocking receive with timeout.
+//#define USE_ASIO_ASYNC_RECV
+
//enough buffering for half a second of samples at full rate on usrp2
static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
@@ -43,8 +47,15 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
//but may change with host-based flow control.
static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
-//the number of async frames to allocate for each send and recv
-static const size_t DEFAULT_NUM_FRAMES = 32;
+//The number of async frames to allocate for each send and recv:
+//The non-async recv can have a very large number of recv frames
+//because the CPU overhead is independent of the number of frames.
+#ifdef USE_ASIO_ASYNC_RECV
+static const size_t DEFAULT_NUM_RECV_FRAMES = 32;
+#else
+static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu;
+#endif
+static const size_t DEFAULT_NUM_SEND_FRAMES = 32;
//a single concurrent thread for io_service seems to be the fastest
static const size_t CONCURRENCY_HINT = 1;
@@ -67,9 +78,9 @@ public:
):
_io_service(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
_recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))),
_send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES)))
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES)))
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -82,6 +93,13 @@ public:
_socket = new asio::ip::udp::socket(_io_service);
_socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
+ _sock_fd = _socket->native();
+ }
+
+ ~udp_zero_copy_asio_impl(void){
+ delete _work; //allow io_service run to complete
+ _thread_group.join_all(); //wait for service threads to exit
+ delete _socket;
}
void init(void){
@@ -106,10 +124,9 @@ public:
);
}
- ~udp_zero_copy_asio_impl(void){
- delete _work; //allow io_service run to complete
- _thread_group.join_all(); //wait for service threads to exit
- delete _socket;
+ void service(void){
+ set_thread_priority_safe();
+ _io_service.run();
}
//get size for internal socket buffer
@@ -126,6 +143,9 @@ public:
return get_buff_size<Opt>();
}
+ ////////////////////////////////////////////////////////////////////
+ #ifdef USE_ASIO_ASYNC_RECV
+ ////////////////////////////////////////////////////////////////////
//! pop a filled recv buffer off of the fifo and bind with the release callback
managed_recv_buffer::sptr get_recv_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -142,6 +162,74 @@ public:
return managed_recv_buffer::sptr();
}
+ //! handle a recv callback -> push the filled memory into the fifo
+ void handle_recv(void *mem, size_t len){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
+ }
+
+ //! release a recv buffer -> start an async recv on the buffer
+ void release(void *mem){
+ _socket->async_receive(
+ boost::asio::buffer(mem, _recv_frame_size),
+ boost::bind(
+ &udp_zero_copy_asio_impl::handle_recv,
+ shared_from_this(), mem,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ }
+
+ ////////////////////////////////////////////////////////////////////
+ #else /*USE_ASIO_ASYNC_RECV*/
+ ////////////////////////////////////////////////////////////////////
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = timeout*1e6;
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(_sock_fd, &rset);
+
+ //call select to perform timed wait
+ if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0)
+ return managed_recv_buffer::sptr();
+
+ //grab an available buffer
+ asio::mutable_buffer buff;
+ if (not _pending_recv_buffs->pop_with_timed_wait(buff, timeout))
+ return managed_recv_buffer::sptr();
+
+ //receive and return the buffer
+ return managed_recv_buffer::make_safe(
+ asio::buffer(
+ boost::asio::buffer_cast<void *>(buff),
+ _socket->receive(boost::asio::buffer(buff))
+ ),
+ boost::bind(
+ &udp_zero_copy_asio_impl::release,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff)
+ )
+ );
+ }
+
+ void release(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_recv_buffs->push_with_wait(
+ boost::asio::buffer(mem, this->get_recv_frame_size())
+ );
+ }
+
+ ////////////////////////////////////////////////////////////////////
+ #endif /*USE_ASIO_ASYNC_RECV*/
+ ////////////////////////////////////////////////////////////////////
+
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
@@ -161,37 +249,6 @@ public:
return managed_send_buffer::sptr();
}
- size_t get_num_send_frames(void) const {return _num_send_frames;}
- size_t get_send_frame_size(void) const {return _send_frame_size;}
-
-private:
- void service(void){
- set_thread_priority_safe();
- _io_service.run();
- }
-
- /*******************************************************************
- * The async send and receive callbacks
- ******************************************************************/
-
- //! handle a recv callback -> push the filled memory into the fifo
- void handle_recv(void *mem, size_t len){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
- }
-
- //! release a recv buffer -> start an async recv on the buffer
- void release(void *mem){
- _socket->async_receive(
- boost::asio::buffer(mem, _recv_frame_size),
- boost::bind(
- &udp_zero_copy_asio_impl::handle_recv,
- shared_from_this(), mem,
- asio::placeholders::bytes_transferred
- )
- );
- }
-
//! handle a send callback -> push the emptied memory into the fifo
void handle_send(void *mem){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -209,10 +266,15 @@ private:
);
}
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+ size_t get_send_frame_size(void) const {return _send_frame_size;}
+
+private:
//asio guts -> socket and service
asio::ip::udp::socket *_socket;
asio::io_service _io_service;
asio::io_service::work *_work;
+ asio::ip::udp::socket::native_type _sock_fd;
//memory management -> buffers and fifos
boost::thread_group _thread_group;