aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/udp_zero_copy_asio.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/udp_zero_copy_asio.cpp')
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp85
1 files changed, 45 insertions, 40 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 70e7514a1..a1eb516fc 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -18,6 +18,7 @@
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
#include <uhd/utils/warning.hpp>
#include <boost/shared_array.hpp>
@@ -26,6 +27,7 @@
#include <boost/thread.hpp>
#include <iostream>
+using namespace uhd;
using namespace uhd::transport;
namespace asio = boost::asio;
@@ -34,11 +36,15 @@ namespace asio = boost::asio;
**********************************************************************/
//enough buffering for half a second of samples at full rate on usrp2
static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
+
//Large buffers cause more underflow at high rates.
//Perhaps this is due to the kernel scheduling,
//but may change with host-based flow control.
static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
+//the number of async frames to allocate for each send and recv
+static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32;
+
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
* This is the portable zero copy implementation for systems
@@ -50,51 +56,52 @@ class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_share
public:
typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
- udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){
+ udp_zero_copy_asio_impl(
+ const std::string &addr, const std::string &port,
+ size_t recv_frame_size, size_t num_recv_frames,
+ size_t send_frame_size, size_t num_send_frames
+ ):
+ _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames),
+ _send_frame_size(send_frame_size), _num_send_frames(num_send_frames)
+ {
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
- // resolve the address
+ //resolve the address
asio::ip::udp::resolver resolver(_io_service);
asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
- // create, open, and connect the socket
+ //create, open, and connect the socket
_socket = new asio::ip::udp::socket(_io_service);
_socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
}
- ~udp_zero_copy_asio_impl(void){
- _io_service.stop();
- _thread_group.join_all();
- delete _socket;
- }
-
- /*!
- * Init, the second contructor:
- * Allocate memory and spwan service thread.
- */
void init(void){
//allocate all recv frames and release them to begin xfers
- _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames());
- for (size_t i = 0; i < this->get_num_recv_frames(); i++){
- boost::shared_array<char> buff(new char[udp_simple::mtu]);
- _buffers.push_back(buff); //store a reference to this shared array
- release(buff.get());
+ _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames);
+ _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]);
+ for (size_t i = 0; i < _num_recv_frames; i++){
+ release(_recv_buffer.get() + i*_recv_frame_size);
}
//allocate all send frames and push them into the fifo
- _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames());
- for (size_t i = 0; i < this->get_num_send_frames(); i++){
- boost::shared_array<char> buff(new char[udp_simple::mtu]);
- _buffers.push_back(buff); //store a reference to this shared array
- handle_send(buff.get());
+ _pending_send_buffs = pending_buffs_type::make(_num_send_frames);
+ _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]);
+ for (size_t i = 0; i < _num_send_frames; i++){
+ handle_send(_send_buffer.get() + i*_send_frame_size);
}
//spawn the service thread that will run the io service
_thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this));
}
+ ~udp_zero_copy_asio_impl(void){
+ _io_service.stop();
+ _thread_group.join_all();
+ delete _socket;
+ }
+
//get size for internal socket buffer
template <typename Opt> size_t get_buff_size(void) const{
Opt option;
@@ -109,19 +116,6 @@ public:
return get_buff_size<Opt>();
}
- //The number of frames is approximately the buffer size divided by the max datagram size.
- //In reality, this is a phony zero-copy interface and the number of frames is infinite.
- //However, its sensible to advertise a frame count that is approximate to buffer size.
- //This way, the transport caller will have an idea about how much buffering to create.
-
- size_t get_num_recv_frames(void) const{
- return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu;
- }
-
- size_t get_num_send_frames(void) const{
- return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu;
- }
-
//! pop a filled recv buffer off of the fifo and bind with the release callback
managed_recv_buffer::sptr get_recv_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -138,6 +132,8 @@ public:
return managed_recv_buffer::sptr();
}
+ size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+
//! pop an empty send buffer off of the fifo and bind with the commit callback
managed_send_buffer::sptr get_send_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -154,8 +150,11 @@ public:
return managed_send_buffer::sptr();
}
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+
private:
void service(void){
+ set_thread_priority_safe();
_io_service.run();
}
@@ -172,7 +171,7 @@ private:
//! release a recv buffer -> start an async recv on the buffer
void release(void *mem){
_socket->async_receive(
- boost::asio::buffer(mem, udp_simple::mtu),
+ boost::asio::buffer(mem, _recv_frame_size),
boost::bind(
&udp_zero_copy_asio_impl::handle_recv,
shared_from_this(), mem,
@@ -184,7 +183,7 @@ private:
//! handle a send callback -> push the emptied memory into the fifo
void handle_send(void *mem){
boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu));
+ _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size));
}
//! commit a send buffer -> start an async send on the buffer
@@ -204,9 +203,11 @@ private:
//memory management -> buffers and fifos
boost::thread_group _thread_group;
- std::vector<boost::shared_array<char> > _buffers;
+ boost::shared_array<char> _send_buffer, _recv_buffer;
typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;
pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
};
/***********************************************************************
@@ -253,7 +254,11 @@ udp_zero_copy::sptr udp_zero_copy::make(
size_t recv_buff_size,
size_t send_buff_size
){
- udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port));
+ udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(
+ addr, port,
+ udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv
+ udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES //send
+ ));
//call the helper to resize send and recv buffers
resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");