path: root/host/lib/transport
diff options
Diffstat (limited to 'host/lib/transport')
1 files changed, 140 insertions, 52 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 7e28caf2d..d84aeefdd 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -35,6 +35,14 @@ namespace asio = boost::asio;
* Constants
+//Define this to the the boost async io calls to perform receive.
+//Otherwise, get_recv_buff uses a blocking receive with timeout.
+//Define this to the the boost async io calls to perform send.
+//Otherwise, the commit callback uses a blocking send.
//enough buffering for half a second of samples at full rate on usrp2
static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
@@ -43,8 +51,21 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
//but may change with host-based flow control.
static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
-//the number of async frames to allocate for each send and recv
-static const size_t DEFAULT_NUM_FRAMES = 32;
+//The number of async frames to allocate for each send and recv:
+//The non-async recv can have a very large number of recv frames
+//because the CPU overhead is independent of the number of frames.
+static const size_t DEFAULT_NUM_RECV_FRAMES = 32;
+static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu;
+//The non-async send only ever requires a single frame
+//because the buffer will be committed before a new get.
+static const size_t DEFAULT_NUM_SEND_FRAMES = 32;
+static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;;
//a single concurrent thread for io_service seems to be the fastest
static const size_t CONCURRENCY_HINT = 1;
@@ -67,9 +88,9 @@ public:
_io_service(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
_recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
- _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))),
_send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES)))
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES)))
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -82,6 +103,13 @@ public:
_socket = new asio::ip::udp::socket(_io_service);
+ _sock_fd = _socket->native();
+ }
+ ~udp_zero_copy_asio_impl(void){
+ delete _work; //allow io_service run to complete
+ _thread_group.join_all(); //wait for service threads to exit
+ delete _socket;
void init(void){
@@ -106,10 +134,9 @@ public:
- ~udp_zero_copy_asio_impl(void){
- delete _work; //allow io_service run to complete
- _thread_group.join_all(); //wait for service threads to exit
- delete _socket;
+ void service(void){
+ set_thread_priority_safe();
+ _io_service.run();
//get size for internal socket buffer
@@ -126,6 +153,15 @@ public:
return get_buff_size<Opt>();
+ //! handle a recv callback -> push the filled memory into the fifo
+ UHD_INLINE void handle_recv(void *mem, size_t len){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
+ }
+ ////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////
//! pop a filled recv buffer off of the fifo and bind with the release callback
managed_recv_buffer::sptr get_recv_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -142,9 +178,74 @@ public:
return managed_recv_buffer::sptr();
+ //! release a recv buffer -> start an async recv on the buffer
+ void release(void *mem){
+ _socket->async_receive(
+ boost::asio::buffer(mem, this->get_recv_frame_size()),
+ boost::bind(
+ &udp_zero_copy_asio_impl::handle_recv,
+ shared_from_this(), mem,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ }
+ ////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = long(timeout*1e6);
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(_sock_fd, &rset);
+ //call select to perform timed wait and grab an available buffer with wait
+ //if the condition is true, call receive and return the managed buffer
+ if (
+ ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and
+ _pending_recv_buffs->pop_with_timed_wait(buff, timeout)
+ ){
+ return managed_recv_buffer::make_safe(
+ asio::buffer(
+ boost::asio::buffer_cast<void *>(buff),
+ _socket->receive(asio::buffer(buff))
+ ),
+ boost::bind(
+ &udp_zero_copy_asio_impl::release,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff)
+ )
+ );
+ }
+ return managed_recv_buffer::sptr();
+ }
+ void release(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ handle_recv(mem, this->get_recv_frame_size());
+ }
+ ////////////////////////////////////////////////////////////////////
+ #endif /*USE_ASIO_ASYNC_RECV*/
+ ////////////////////////////////////////////////////////////////////
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+ //! handle a send callback -> push the emptied memory into the fifo
+ UHD_INLINE void handle_send(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size()));
+ }
//! pop an empty send buffer off of the fifo and bind with the commit callback
managed_send_buffer::sptr get_send_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -161,43 +262,9 @@ public:
return managed_send_buffer::sptr();
- size_t get_num_send_frames(void) const {return _num_send_frames;}
- size_t get_send_frame_size(void) const {return _send_frame_size;}
- void service(void){
- set_thread_priority_safe();
- _io_service.run();
- }
- /*******************************************************************
- * The async send and receive callbacks
- ******************************************************************/
- //! handle a recv callback -> push the filled memory into the fifo
- void handle_recv(void *mem, size_t len){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
- }
- //! release a recv buffer -> start an async recv on the buffer
- void release(void *mem){
- _socket->async_receive(
- boost::asio::buffer(mem, _recv_frame_size),
- boost::bind(
- &udp_zero_copy_asio_impl::handle_recv,
- shared_from_this(), mem,
- asio::placeholders::bytes_transferred
- )
- );
- }
- //! handle a send callback -> push the emptied memory into the fifo
- void handle_send(void *mem){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size));
- }
+ ////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////
//! commit a send buffer -> start an async send on the buffer
void commit(void *mem, size_t len){
@@ -209,10 +276,27 @@ private:
+ ////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////
+ void commit(void *mem, size_t len){
+ _socket->send(asio::buffer(mem, len));
+ handle_send(mem);
+ }
+ ////////////////////////////////////////////////////////////////////
+ #endif /*USE_ASIO_ASYNC_SEND*/
+ ////////////////////////////////////////////////////////////////////
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+ size_t get_send_frame_size(void) const {return _send_frame_size;}
//asio guts -> socket and service
asio::ip::udp::socket *_socket;
asio::io_service _io_service;
asio::io_service::work *_work;
+ int _sock_fd;
//memory management -> buffers and fifos
boost::thread_group _thread_group;
@@ -235,6 +319,13 @@ template<typename Opt> static void resize_buff_helper(
if (name == "recv") min_sock_buff_size = MIN_RECV_SOCK_BUFF_SIZE;
if (name == "send") min_sock_buff_size = MIN_SEND_SOCK_BUFF_SIZE;
+ std::string help_message;
+ #if defined(UHD_PLATFORM_LINUX)
+ help_message = str(boost::format(
+ "Please run: sudo sysctl -w net.core.%smem_max=%d\n"
+ ) % ((name == "recv")?"r":"w") % min_sock_buff_size);
+ #endif /*defined(UHD_PLATFORM_LINUX)*/
//resize the buffer if size was provided
if (target_size > 0){
size_t actual_size = udp_trans->resize_buff<Opt>(target_size);
@@ -246,13 +337,10 @@ template<typename Opt> static void resize_buff_helper(
"Current %s sock buff size: %d bytes"
) % name % actual_size << std::endl;
if (actual_size < target_size) uhd::print_warning(str(boost::format(
- "The %1% buffer is smaller than the requested size.\n"
- "The minimum recommended buffer size is %2% bytes.\n"
- "See the transport application notes on buffer resizing.\n"
- #if defined(UHD_PLATFORM_LINUX)
- "Please run: sudo sysctl -w net.core.rmem_max=%2%\n"
- #endif /*defined(UHD_PLATFORM_LINUX)*/
- ) % name % min_sock_buff_size));
+ "The %s buffer is smaller than the requested size.\n"
+ "The minimum recommended buffer size is %d bytes.\n"
+ "See the transport application notes on buffer resizing.\n%s"
+ ) % name % min_sock_buff_size % help_message));
//only enable on platforms that are happy with the large buffer resize