aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt2
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp6
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp186
3 files changed, 133 insertions, 61 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 2be2c89ec..bf92b0822 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -101,7 +101,7 @@ SET_SOURCE_FILES_PROPERTIES(
LIBUHD_APPEND_SOURCES(
${CMAKE_SOURCE_DIR}/lib/transport/if_addrs.cpp
${CMAKE_SOURCE_DIR}/lib/transport/udp_simple.cpp
- #${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp
${CMAKE_SOURCE_DIR}/lib/transport/vrt_packet_handler.hpp
${CMAKE_SOURCE_DIR}/lib/transport/zero_copy.cpp
)
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index e1cc8398c..c819302b6 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -277,7 +277,7 @@ libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
-class libusb_zero_copy_impl : public usb_zero_copy {
+class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> {
public:
typedef boost::shared_ptr<libusb_zero_copy_impl> sptr;
@@ -378,7 +378,7 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){
else {
return managed_recv_buffer::make_safe(
boost::asio::const_buffer(lut->buffer, lut->actual_length),
- boost::bind(&libusb_zero_copy_impl::release, this, lut)
+ boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut)
);
}
}
@@ -398,7 +398,7 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){
else {
return managed_send_buffer::make_safe(
boost::asio::mutable_buffer(lut->buffer, _send_xfer_size),
- boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1)
+ boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1)
);
}
}
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 3130830a5..70e7514a1 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -17,20 +17,23 @@
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
+#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/utils/assert.hpp>
#include <uhd/utils/warning.hpp>
-#include <boost/cstdint.hpp>
+#include <boost/shared_array.hpp>
#include <boost/asio.hpp>
#include <boost/format.hpp>
+#include <boost/thread.hpp>
#include <iostream>
using namespace uhd::transport;
+namespace asio = boost::asio;
/***********************************************************************
* Constants
**********************************************************************/
//enough buffering for half a second of samples at full rate on usrp2
-static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 25e6 * 0.5);
+static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
//Large buffers cause more underflow at high rates.
//Perhaps this is due to the kernel scheduling,
//but may change with host-based flow control.
@@ -43,39 +46,55 @@ static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
-class udp_zero_copy_impl:
- public phony_zero_copy_recv_if,
- public phony_zero_copy_send_if,
- public udp_zero_copy
-{
+class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> {
public:
- typedef boost::shared_ptr<udp_zero_copy_impl> sptr;
-
- udp_zero_copy_impl(
- const std::string &addr,
- const std::string &port
- ):
- phony_zero_copy_recv_if(udp_simple::mtu),
- phony_zero_copy_send_if(udp_simple::mtu)
- {
+ typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
+
+ udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
// resolve the address
- boost::asio::ip::udp::resolver resolver(_io_service);
- boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
- boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+ asio::ip::udp::resolver resolver(_io_service);
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
+ asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
// create, open, and connect the socket
- _socket = new boost::asio::ip::udp::socket(_io_service);
- _socket->open(boost::asio::ip::udp::v4());
+ _socket = new asio::ip::udp::socket(_io_service);
+ _socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
- _sock_fd = _socket->native();
}
- ~udp_zero_copy_impl(void){
+ ~udp_zero_copy_asio_impl(void){
+ _io_service.stop();
+ _thread_group.join_all();
delete _socket;
}
+ /*!
+ * Init, the second contructor:
+ * Allocate memory and spwan service thread.
+ */
+ void init(void){
+ //allocate all recv frames and release them to begin xfers
+ _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames());
+ for (size_t i = 0; i < this->get_num_recv_frames(); i++){
+ boost::shared_array<char> buff(new char[udp_simple::mtu]);
+ _buffers.push_back(buff); //store a reference to this shared array
+ release(buff.get());
+ }
+
+ //allocate all send frames and push them into the fifo
+ _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames());
+ for (size_t i = 0; i < this->get_num_send_frames(); i++){
+ boost::shared_array<char> buff(new char[udp_simple::mtu]);
+ _buffers.push_back(buff); //store a reference to this shared array
+ handle_send(buff.get());
+ }
+
+ //spawn the service thread that will run the io service
+ _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this));
+ }
+
//get size for internal socket buffer
template <typename Opt> size_t get_buff_size(void) const{
Opt option;
@@ -90,60 +109,111 @@ public:
return get_buff_size<Opt>();
}
-
//The number of frames is approximately the buffer size divided by the max datagram size.
//In reality, this is a phony zero-copy interface and the number of frames is infinite.
//However, its sensible to advertise a frame count that is approximate to buffer size.
//This way, the transport caller will have an idea about how much buffering to create.
size_t get_num_recv_frames(void) const{
- return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/udp_simple::mtu;
+ return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu;
}
size_t get_num_send_frames(void) const{
- return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/udp_simple::mtu;
+ return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu;
+ }
+
+ //! pop a filled recv buffer off of the fifo and bind with the release callback
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
+ if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){
+ return managed_recv_buffer::make_safe(
+ buff, boost::bind(
+ &udp_zero_copy_asio_impl::release,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff)
+ )
+ );
+ }
+ return managed_recv_buffer::sptr();
+ }
+
+ //! pop an empty send buffer off of the fifo and bind with the commit callback
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
+ if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){
+ return managed_send_buffer::make_safe(
+ buff, boost::bind(
+ &udp_zero_copy_asio_impl::commit,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff), _1
+ )
+ );
+ }
+ return managed_send_buffer::sptr();
}
private:
- boost::asio::ip::udp::socket *_socket;
- boost::asio::io_service _io_service;
- int _sock_fd;
-
- ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout){
- //setup timeval for timeout
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = long(timeout*1e6);
-
- //setup rset for timeout
- fd_set rset;
- FD_ZERO(&rset);
- FD_SET(_sock_fd, &rset);
-
- //call select to perform timed wait
- if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0;
-
- return ::recv(
- _sock_fd,
- boost::asio::buffer_cast<char *>(buff),
- boost::asio::buffer_size(buff), 0
+ void service(void){
+ _io_service.run();
+ }
+
+ /*******************************************************************
+ * The async send and receive callbacks
+ ******************************************************************/
+
+ //! handle a recv callback -> push the filled memory into the fifo
+ void handle_recv(void *mem, size_t len){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
+ }
+
+ //! release a recv buffer -> start an async recv on the buffer
+ void release(void *mem){
+ _socket->async_receive(
+ boost::asio::buffer(mem, udp_simple::mtu),
+ boost::bind(
+ &udp_zero_copy_asio_impl::handle_recv,
+ shared_from_this(), mem,
+ asio::placeholders::bytes_transferred
+ )
);
}
- ssize_t send(const boost::asio::const_buffer &buff){
- return ::send(
- _sock_fd,
- boost::asio::buffer_cast<const char *>(buff),
- boost::asio::buffer_size(buff), 0
+ //! handle a send callback -> push the emptied memory into the fifo
+ void handle_send(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu));
+ }
+
+ //! commit a send buffer -> start an async send on the buffer
+ void commit(void *mem, size_t len){
+ _socket->async_send(
+ boost::asio::buffer(mem, len),
+ boost::bind(
+ &udp_zero_copy_asio_impl::handle_send,
+ shared_from_this(), mem
+ )
);
}
+
+ //asio guts -> socket and service
+ asio::ip::udp::socket *_socket;
+ asio::io_service _io_service;
+
+ //memory management -> buffers and fifos
+ boost::thread_group _thread_group;
+ std::vector<boost::shared_array<char> > _buffers;
+ typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;
+ pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
};
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
template<typename Opt> static void resize_buff_helper(
- udp_zero_copy_impl::sptr udp_trans,
+ udp_zero_copy_asio_impl::sptr udp_trans,
size_t target_size,
const std::string &name
){
@@ -183,11 +253,13 @@ udp_zero_copy::sptr udp_zero_copy::make(
size_t recv_buff_size,
size_t send_buff_size
){
- udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port));
+ udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port));
//call the helper to resize send and recv buffers
- resize_buff_helper<boost::asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
- resize_buff_helper<boost::asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
+ resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
+ resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
+
+ udp_trans->init(); //buffers resized -> call init() to use
return udp_trans;
}